Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream.Writable.toWeb #3927

Open
vjpr opened this issue Aug 2, 2023 · 31 comments
Open

Implement stream.Writable.toWeb #3927

vjpr opened this issue Aug 2, 2023 · 31 comments
Labels
bug Something isn't working good first issue Something that would be good for new contributors node.js Compatibility with Node.js APIs

Comments

@vjpr
Copy link
Contributor

vjpr commented Aug 2, 2023

What version of Bun is running?

0.7.2

What platform is your computer?

No response

What steps can reproduce the bug?

console.log('process.stdout', stream.Writable.toWeb(process.stdout))

TypeError: h().newWritableStreamFromStreamWritable is not a function. (In 'h().newWritableStreamFromStreamWritable(X)', 'h().newWritableStreamFromStreamWritable' is undefined)
      at node:stream:2:106081
      at /xxx/src/index.ts:172:32

What is the expected behavior?

No response

What do you see instead?

No response

Additional information

No response

@vjpr vjpr added the bug Something isn't working label Aug 2, 2023
@robobun robobun added the node.js Compatibility with Node.js APIs label Aug 2, 2023
@paperclover paperclover added the good first issue Something that would be good for new contributors label Aug 3, 2023
@paperclover
Copy link
Member

damn, i cant believe this is just {}
image

though for what youre doing you should use Bun.stdout.*, though i guess thats not a writablestream

@varshneydevansh
Copy link

If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of?

@javalsai
Copy link

javalsai commented Dec 26, 2023

If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of?

EDIT: Ignore all this and skip to my next reply.
EDIT AGAIN: This does apply, I just didn't realize smth

I don't know if you are still interesting, but if someone really needs to get that function to work, this workaround worked for me:

import { type Writable } from 'node:stream'

function WriteStreamToWeb(streamWritable: Writable): WritableStream {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

and ofc the JS counterpart

function WriteStreamToWeb(streamWritable) {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

@javalsai
Copy link

If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of?

I don't know if you are still interesting, but if someone really needs to get that function to work, this workaround worked for me:

import { type Writable } from 'node:stream'

function WriteStreamToWeb(streamWritable: Writable): WritableStream {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

and ofc the JS counterpart

function WriteStreamToWeb(streamWritable) {
    return new WritableStream({
        start() { },
        write(chunk) {
            streamWritable.write(chunk)
        },
        close() {
            streamWritable.end()
        },
        abort() { },
    })
}

Well, I just found out that Writable.toWeb might do this job, and there's also Writable.fromWeb and Readable equivalents, plus a generic .from function that can take transformers, passthroughs...

@varshneydevansh
Copy link

@javalsai so should I work on this?

@javalsai
Copy link

@javalsai so should I work on this?

I don't think it's necessary, Writable.toWeb seems to do the same as stream.Writable.toWeb.

You can take a look at why the native Writable differs from stream.Writable tho. It makes sense to me that stream would just use native Writable and Readable, specially if it's a custom implementation of node:stream for Bun; but that doesn't seem to be the case.

I don't know about Zig or how the Bun source code is structured, but if you decide to take a look and find something interesting feel free to share here!

@javalsai
Copy link

@javalsai so should I work on this?

I don't think it's necessary, Writable.toWeb seems to do the same as stream.Writable.toWeb.

You can take a look at why the native Writable differs from stream.Writable tho. It makes sense to me that stream would just use native Writable and Readable, specially if it's a custom implementation of node:stream for Bun; but that doesn't seem to be the case.

I don't know about Zig or how the Bun source code is structured, but if you decide to take a look and find something interesting feel free to share here!

Wait no, I imported Writable and Readable from node:stream (it was late when I was working on this and didn't realize it worked because the stream data was being cached by my function), so there's no native Writable and Readable

@javalsai
Copy link

Messing with the code I found that this file might be relevant:
https://github.com/oven-sh/bun/blob/main/src/js/node/stream.js

Specially lines 3283, 3369, 4025 and 4017 (what is that function supposed to do?)
(I think that what happens is that webStreamsAdapters is missing functions like newStreamWritableFromWritableStream and newWritableStreamFromStreamWritable (there's probably more))

@javalsai
Copy link

Ig somthing like this would fix Writable.toWeb on most cases:

diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index 5297c033c..c2c0d0ddd 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -4013,16 +4013,29 @@ var require_writable = __commonJS({
     Writable.prototype[EE.captureRejectionSymbol] = function (err) {
       this.destroy(err);
     };
-    var webStreamsAdapters;
-    function lazyWebStreams() {
-      if (webStreamsAdapters === void 0) webStreamsAdapters = {};
-      return webStreamsAdapters;
-    }
+    var webStreamsAdapters = {
+        newStreamWritableFromWritableStream() {
+            // TODO:
+        },
+        newWritableStreamFromStreamWritable(streamWritable) {
+            // ! This is very likely incomplete
+            return new WritableStream({
+                start() { },
+                write(chunk) {
+                    streamWritable.write(chunk)
+                },
+                close() {
+                    streamWritable.end()
+                },
+                abort() { },
+            })
+        }
+    };
     Writable.fromWeb = function (writableStream, options) {
-      return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options);
+      return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
     };
     Writable.toWeb = function (streamWritable) {
-      return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
+      return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
     };
   },
 });

NOTE: I also found that Duplex has lazyWebStreams to an empty webStreamsAdapters, so I'm guessing it also misses functionality for fromWeb and toWeb

@varshneydevansh
Copy link

Okay I will look into this @javalsai and will create the PR

@javalsai
Copy link

Okay I will look into this @javalsai and will create the PR

Sure, good luck! And ask if you want help with anything, I'd love to see this fixed on next release. Try to mimic the checks that the reader webStreamsAdapters do, mainly to provide the same compatibility. I might take a look at Duplexafter too, as it seems to have the same issue.

@varshneydevansh
Copy link

I'll try my best thanks for helping this much =)

@varshneydevansh
Copy link

Hi @javalsai I completed the development setup and understood what needs to be done to an extent but I just couldn't understand how do I test it?

@javalsai
Copy link

Hi @javalsai I completed the development setup and understood what needs to be done to an extent but I just couldn't understand how do I test it?

I don't really know how bun structures everything, but my guess is that you'll just have to implement the functions Writable functions to behave like they do on node (just follow a similar logic to Readable which is implemented in bun) and then compile it and test some code that relies on it.

Maybe this will be useful? https://github.com/oven-sh/bun/blob/main/CONTRIBUTING.md

Also, check if there has been any change on this since then.

I'ma make a quick JS file that works on node and not on bun for you to test this.

@javalsai
Copy link

@varshneydevansh this should play with those adapter functions enough to test them (make sure to make /tmp/readfile or just change it to any existing file or device...)

const { createReadStream, createWriteStream } = require('node:fs')
const { Readable, Writable } = require('node:stream')

const readStream = createReadStream('/tmp/readfile')
const writeStream = createWriteStream('/tmp/writefile')

const readWebStream = Readable.toWeb(readStream)
const writeWebStream = Writable.toWeb(writeStream)

const readStreamAgain = Readable.fromWeb(readWebStream)
const writeStreamAgain = Writable.fromWeb(writeWebStream)

console.log(readStreamAgain, writeStreamAgain)

Running that with node will adapt the stream 2 times in both directions and print the streams (will appear as object data on console)
Running with bun will fail on calling the Writable toWeb/fromWeb functions (Readable should work as they are implemented), the error should be something like TypeError: undefined is not a function

@paperclover
Copy link
Member

Maybe this will be useful?
sorry this file is really out of date

follow this to setup a devenv: https://bun.sh/docs/project/contributing

the code you need to edit is src/js/stream.js. this code is bundled version of the readable-stream package that has a ton of random edits to have some hooks into native code.

Ig somthing like this would fix...

Yeah that looks about right. we can also inline these function calls into the actual toWeb function insetad of having it be a function that calls another one.

Also, we should handle abort, right? does node do that? i assume it's something like doing stream.emit("error", ...)

@varshneydevansh
Copy link

Thank you for the help Dave and Sai.

image

I was reading this earlier -

https://nodejs.org/dist/latest-v17.x/docs/api/stream.html#class-streamwritable

@javalsai
Copy link

Also, we should handle abort, right? does node do that? i assume it's something like doing stream.emit("error", ...)

Sure, that's was just a hotfix that I'm using temporarily, might be useful to get the basic structure of it.
But I think it should be implemented in a format similar to the Readable counterpart (source of fromWeb and toWeb).

Also, don't forget that Duplex has those functions empty just like Readable, but maybe that can be done by just separating it into readable and writable and adapting each. I think web duplex equivalent is just a object with a readable and writable property (https://developer.mozilla.org/en-US/docs/Web/API/TransformStream#instance_properties, I think I got it working in bun by putting it as a object tho).

In fact, I ended up with these utils working:

export function writable_ToWeb(streamWritable: Writable): WritableStream {
	return new WritableStream({
		start() { },
		write(chunk) {
			streamWritable.write(chunk)
		},
		close() {
			streamWritable.end()
		},
		abort() { },
	})
}

export function transformerToWeb(transformer: Transform) {
	return {
		readable: Readable.toWeb(transformer) as ReadableStream,
		writable: writable_ToWeb(transformer),
	}
}

Ik these are Transformers, not Duplexes, but they seem to be the same concept

@varshneydevansh
Copy link

This is the Duplex one where the same change is needed? -

https://github.com/oven-sh/bun/blob/353f724a9cd12d1749e974e92bbc47d9138a6601/src/js/node/stream.js#L4442C1-L4452C7

    var webStreamsAdapters;
    function lazyWebStreams() {
      if (webStreamsAdapters === void 0) webStreamsAdapters = {};
      return webStreamsAdapters;
    }
    Duplex.fromWeb = function (pair, options) {
      return lazyWebStreams().newStreamDuplexFromReadableWritablePair(pair, options);
    };
    Duplex.toWeb = function (duplex) {
      return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
    };

and this is the place of the Readable one? -

https://github.com/oven-sh/bun/blob/353f724a9cd12d1749e974e92bbc47d9138a6601/src/js/node/stream.js#L3364C1-L3369C7

    Readable.fromWeb = function (readableStream, options) {
      return webStreamsAdapters.newStreamReadableFromReadableStream(readableStream, options);
    };
    Readable.toWeb = function (streamReadable, options) {
      return webStreamsAdapters.newReadableStreamFromStreamReadable(streamReadable, options);
    };

This is what I have done for the Writable -

    Writable.fromWeb = function (writableStream, options) {
      return new WritableStream({
        start() { },
        write(chunk) {
             writableStream.write(chunk)
        },
        close() {
           writableStream.end()
        },
        abort(reason) {
          const err = new Error(`Abort: ${reason}`);
          writableStream.destroy(err);
        },
      });
    };
    Writable.toWeb = function (streamWritable) {
      return new WritableStream({
        start() { },
        write(chunk) {
          try {
            streamWritable.write(chunk);
          } catch (e) {
            this.controller.error(e);
          }
        },
        close() {
          streamWritable.end();
        },
        abort(reason) {
          streamWritable.destroy(new Error(`Abort: ${reason}`));
        },
      });
    }; 

@javalsai
Copy link

@varshneydevansh don't worry about duplexes if you're not sure on how to do it, they're not close to being as important as the writable functions.

Also, the Writable.toWeb looks good to me for it to work at least (It'll prob need some future work for options or smth), but the Writable.fromWeb takes a web stream and returns a Writable instance, exactly the opposite function. Let me take a look at it bcs I think I made it also work, but deleted it soon after due to being dead code...

Two last things:

  • If Readable has the adapter functions in webStreamAdapters, I'd do the same for Writable just to keep the convention (it's a local variable for each thing, Readable has it's own, Writable another one...).
  • And the file looks a lil bit weird to me, maybe it's transpiled from some ts file? Just take a look, if it's not, or if any documentation talks about editing those files for contributing, you should be fine.

Nice work btw!

@javalsai
Copy link

Or maybe just omit fromWeb for now, it seems a little bit overcomplicated:

  • bun/src/js/node/stream.js

    Lines 2458 to 2504 in 353f724

    function newStreamReadableFromReadableStream(readableStream, options = {}) {
    if (!isReadableStream(readableStream)) {
    throw new ERR_INVALID_ARG_TYPE("readableStream", "ReadableStream", readableStream);
    }
    validateObject(options, "options");
    const {
    highWaterMark,
    encoding,
    objectMode = false,
    signal,
    // native = true,
    } = options;
    if (encoding !== undefined && !Buffer.isEncoding(encoding))
    throw new ERR_INVALID_ARG_VALUE(encoding, "options.encoding");
    validateBoolean(objectMode, "options.objectMode");
    // validateBoolean(native, "options.native");
    // if (!native) {
    // return new ReadableFromWeb(
    // {
    // highWaterMark,
    // encoding,
    // objectMode,
    // signal,
    // },
    // readableStream,
    // );
    // }
    const nativeStream = getNativeReadableStream(Readable, readableStream, options);
    return (
    nativeStream ||
    new ReadableFromWeb(
    {
    highWaterMark,
    encoding,
    objectMode,
    signal,
    },
    readableStream,
    )
    );
    }
  • bun/src/js/node/stream.js

    Lines 2323 to 2444 in 353f724

    class ReadableFromWeb extends Readable {
    #reader;
    #closed;
    #pendingChunks;
    #stream;
    constructor(options, stream) {
    const { objectMode, highWaterMark, encoding, signal } = options;
    super({
    objectMode,
    highWaterMark,
    encoding,
    signal,
    });
    this.#pendingChunks = [];
    this.#reader = undefined;
    this.#stream = stream;
    this.#closed = false;
    }
    #drainPending() {
    var pendingChunks = this.#pendingChunks,
    pendingChunksI = 0,
    pendingChunksCount = pendingChunks.length;
    for (; pendingChunksI < pendingChunksCount; pendingChunksI++) {
    const chunk = pendingChunks[pendingChunksI];
    pendingChunks[pendingChunksI] = undefined;
    if (!this.push(chunk, undefined)) {
    this.#pendingChunks = pendingChunks.slice(pendingChunksI + 1);
    return true;
    }
    }
    if (pendingChunksCount > 0) {
    this.#pendingChunks = [];
    }
    return false;
    }
    #handleDone(reader) {
    reader.releaseLock();
    this.#reader = undefined;
    this.#closed = true;
    this.push(null);
    return;
    }
    async _read() {
    $debug("ReadableFromWeb _read()", this.__id);
    var stream = this.#stream,
    reader = this.#reader;
    if (stream) {
    reader = this.#reader = stream.getReader();
    this.#stream = undefined;
    } else if (this.#drainPending()) {
    return;
    }
    var deferredError;
    try {
    do {
    var done = false,
    value;
    const firstResult = reader.readMany();
    if ($isPromise(firstResult)) {
    ({ done, value } = await firstResult);
    if (this.#closed) {
    this.#pendingChunks.push(...value);
    return;
    }
    } else {
    ({ done, value } = firstResult);
    }
    if (done) {
    this.#handleDone(reader);
    return;
    }
    if (!this.push(value[0])) {
    this.#pendingChunks = value.slice(1);
    return;
    }
    for (let i = 1, count = value.length; i < count; i++) {
    if (!this.push(value[i])) {
    this.#pendingChunks = value.slice(i + 1);
    return;
    }
    }
    } while (!this.#closed);
    } catch (e) {
    deferredError = e;
    } finally {
    if (deferredError) throw deferredError;
    }
    }
    _destroy(error, callback) {
    if (!this.#closed) {
    var reader = this.#reader;
    if (reader) {
    this.#reader = undefined;
    reader.cancel(error).finally(() => {
    this.#closed = true;
    callback(error);
    });
    }
    return;
    }
    try {
    callback(error);
    } catch (error) {
    globalThis.reportError(error);
    }
    }
    }
  • bun/src/js/node/stream.js

    Lines 5446 to 5461 in 353f724

    function getNativeReadableStream(Readable, stream, options) {
    if (!(stream && typeof stream === "object" && stream instanceof ReadableStream)) {
    return undefined;
    }
    const native = $direct(stream);
    if (!native) {
    $debug("no native readable stream");
    return undefined;
    }
    const { stream: ptr, data: type } = native;
    const NativeReadable = getNativeReadableStreamPrototype(type, Readable);
    return new NativeReadable(ptr, options);
    }
  • bun/src/js/node/stream.js

    Lines 5442 to 5444 in 353f724

    function getNativeReadableStreamPrototype(nativeType, Readable) {
    return (nativeReadableStreamPrototypes[nativeType] ||= createNativeStreamReadable(nativeType, Readable));
    }
  • etc

@guest271314

This comment was marked as off-topic.

@guest271314

This comment was marked as off-topic.

@varshneydevansh
Copy link

The Node.js compatibility documentation page should probably be updated to include Duplex.toWeb() doesn't work.

This is what I try to do today -

Writable.fromWeb = function (writableStream, options = {}) {
  const { encoding = 'utf-8', highWaterMark = 16384 } = options;

  return new WritableStream({
    start(controller) {
      this.controller = controller;
    },
    write(chunk) {
      try {
        writableStream.write(chunk, encoding);
      } catch (error) {
        this.controller.error(error);
      }

      // Backpressure handling:
      if (writableStream.writable && writableStream.writableEnded) {
        this.controller.close();
      } else if (writableStream.writableHighWaterMark !== undefined &&
                 writableStream.bufferedAmount >= writableStream.writableHighWaterMark) {
        this.controller.desiredSize = 0;
      }
    },
    close() {
      writableStream.end();
    },
    abort(reason) {
      const err = new Error(`Abort: ${reason}`);
      writableStream.destroy(err);
    },
  });
};

Writable.toWeb = function (streamWritable, options = {}) {
  const { highWaterMark = 16384 } = options;

  return new WritableStream({
    start(controller) {
      this.controller = controller;
    },
    write(chunk) {
      try {
        streamWritable.write(chunk);
      } catch (error) {
        this.controller.error(error);
      }

      // Backpressure handling:
      if (streamWritable.bufferedAmount >= highWaterMark) {
        this.controller.desiredSize = 0;
      }
    },
    close() {
      streamWritable.end();
    },
    abort(reason) {
      const err = new Error(`Abort: ${reason}`);
      streamWritable.destroy(err);
    },
  });
};

Is this better than what I earlier did?

@guest271314

This comment was marked as off-topic.

guest271314 added a commit to guest271314/bun that referenced this issue Feb 16, 2024
paperclover pushed a commit that referenced this issue Feb 16, 2024
@javalsai
Copy link

This is what I wound up doing for now. The rest of the code is runtime agnostic and runs using node, deno, and bun

const runtime = navigator.userAgent;

let readable, writable, exit;

if (runtime.startsWith("Deno")) {
  ({ readable } = Deno.stdin);
  ({ writable } = Deno.stdout);
  exit = Deno.exit;
}

if (runtime.startsWith("Node")) {
   const { stdin, stdout, exit:_exit } = await import("node:process");
   const { Duplex } = await import("node:stream");
   ({ readable } = Duplex.toWeb(stdin));
   ({ writable } = Duplex.toWeb(stdout));
   exit = process.exit;
}

if (runtime.startsWith("Bun")) {
   readable = Bun.stdin.stream();
   writable = new WritableStream({
     async write(value) {
       await Bun.write(Bun.stdout, value);
     }
   }, new CountQueuingStrategy({ highWaterMark: Infinity }));
   exit = process.exit;
}

He was working on some changes on Bun's node:streams implementation, not some program that uses it. I'm not sure how the official functions should behave, so I didn't answer him, but they don't look bad to me.

@varshneydevansh
Copy link

varshneydevansh commented Feb 18, 2024

Is this reasonable? varshneydevansh@acf43f6

    /**
     * @param {WritableStream} writableStream
     * @param {{
     *   decodeStrings? : boolean,
     *   highWaterMark? : number,
     *   objectMode? : boolean,
     *   signal? : AbortSignal,
     * }} [options]
     * @returns {Writable}
     */
    function newStreamWritableFromWritableStream(writableStream, options = {}) {
      if (!isWritableStream(writableStream)) {
        throw new ERR_INVALID_ARG_TYPE("writableStream", "WritableStream", writableStream);
      }

      validateObject(options, "options");
      const { highWaterMark, decodeStrings = true, objectMode = flase, signal } = options;

      validateBoolean(objectMode, "options.objectMode");
      validateBoolean(decodeStrings, "options.decodeStrings");

      const writer = writableStream.getWriter();
      let closed = false;

      const writable = new Writable({
        highWaterMark,
        objectMode,
        decodeStrings,
        signal,

        writev(chunks, callback) {
          function done(error) {
            error = error.filter(e => e);
            try {
              callback(error.length === 0 ? undefined : error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }
          // Wrapping on a new Promise is necessary to not expose the SafePromise
          // prototype to user-land.
          primordials.SafePromiseAll = (promises, mapFn) =>
            new Promise((a, b) => SafePromise.all(arrayToSafePromiseIterable(promises, mapFn)).then(a, b));

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(
                SafePromiseAll(chunks, data => writer.write(data.chunk)),
                done,
                done,
              );
            },
            done,
          );
        },

        write(chunk, encoding, callback) {
          if (typeof chunk === "string" && decodeStrings && !objectMode) {
            const enc = normalizeEncoding(encoding);

            if (enc === "utf8") {
              chunk = encoder.encode(chunk);
            } else {
              chunk = Buffer.from(chunk, encoding);
              chunk = new Uint8Array(
                TypedArrayPrototypeGetBuffer(chunk),
                TypedArrayPrototypeGetByteOffset(chunk),
                TypedArrayPrototypeGetByteLength(chunk),
              );
            }
          }

          function done(error) {
            try {
              callback(error);
            } catch (error) {
              destroy(writable, error);
            }
          }

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(writer.write(chunk), done, done);
            },
            done,
          );
        },

        destroy(error, callback) {
          function done() {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => {
                throw error;
              });
            }
          }

          if (!closed) {
            if (error != null) {
              PromisePrototypeThen(writer.abort(error), done, done);
            } else {
              PromisePrototypeThen(writer.close(), done, done);
            }
            return;
          }

          done();
        },

        final(callback) {
          function done(error) {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }

          if (!closed) {
            PromisePrototypeThen(writer.close(), done, done);
          }
        },
      });

      PromisePrototypeThen(
        writer.closed,
        () => {
          // If the WritableStream closes before the stream.Writable has been
          // ended, we signal an error on the stream.Writable.
          closed = true;
          if (!isWritableEnded(writable)) destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
        },
        error => {
          // If the WritableStream errors before the stream.Writable has been
          // destroyed, signal an error on the stream.Writable.
          closed = true;
          destroy(writable, error);
        },
      );

      return writable;
    }

    /**
     * @typedef {import('../../stream').Writable} Writable
     * @typedef {import('../../stream').Readable} Readable
     * @typedef {import('./writablestream').WritableStream} WritableStream
     * @typedef {import('./readablestream').ReadableStream} ReadableStream
     */

    /**
     * @typedef {import('../abort_controller').AbortSignal} AbortSignal
     */

    /**
     * @param {Writable} streamWritable
     * @returns {WritableStream}
     */
    function newWritableStreamFromStreamWritable(streamWritable) {
      // Not using the internal/streams/utils isWritableNodeStream utility
      // here because it will return false if streamWritable is a Duplex
      // whose writable option is false. For a Duplex that is not writable,
      // we want it to pass this check but return a closed WritableStream.
      // We check if the given stream is a stream.Writable or http.OutgoingMessage
      const checkIfWritableOrOutgoingMessage =
        streamWritable && typeof streamWritable?.write === "function" && typeof streamWritable?.on === "function";
      if (!checkIfWritableOrOutgoingMessage) {
        throw new ERR_INVALID_ARG_TYPE("streamWritable", "stream.Writable", streamWritable);
      }

      if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
        const writable = new WritableStream();
        writable.close();
        return writable;
      }

      const highWaterMark = streamWritable.writableHighWaterMark;
      const strategy = streamWritable.writableObjectMode
        ? new CountQueuingStrategy({ highWaterMark })
        : { highWaterMark };

      let controller;
      let backpressurePromise;
      let closed;

      function onDrain() {
        if (backpressurePromise !== undefined) backpressurePromise.resolve();
      }

      const cleanup = finished(streamWritable, error => {
        error = handleKnownInternalErrors(error);

        cleanup();
        // This is a protection against non-standard, legacy streams
        // that happen to emit an error event again after finished is called.
        streamWritable.on("error", () => {});
        if (error != null) {
          if (backpressurePromise !== undefined) backpressurePromise.reject(error);
          // If closed is not undefined, the error is happening
          // after the WritableStream close has already started.
          // We need to reject it here.
          if (closed !== undefined) {
            closed.reject(error);
            closed = undefined;
          }
          controller.error(error);
          controller = undefined;
          return;
        }

        if (closed !== undefined) {
          closed.resolve();
          closed = undefined;
          return;
        }
        controller.error(new AbortError());
        controller = undefined;
      });

      streamWritable.on("drain", onDrain);

      return new WritableStream(
        {
          start(c) {
            controller = c;
          },

          async write(chunk) {
            if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
              backpressurePromise = createDeferredPromise();
              return SafePromisePrototypeFinally(backpressurePromise.promise, () => {
                backpressurePromise = undefined;
              });
            }
          },

          abort(reason) {
            destroy(streamWritable, reason);
          },

          close() {
            if (closed === undefined && !isWritableEnded(streamWritable)) {
              closed = createDeferredPromise();
              streamWritable.end();
              return closed.promise;
            }

            controller = undefined;
            return PromiseResolve();
          },
        },
        strategy,
      );
    }

    var webStreamsAdapters = {
      newStreamWritableFromWritableStream,

      newWritableStreamFromStreamWritable,
    };

    Writable.fromWeb = function (writableStream, options) {
      return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
    };
    Writable.toWeb = function (streamWritable, options) {
      return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
    };

@javalsai
Copy link

Is this reasonable? varshneydevansh@acf43f6

    /**
     * @param {WritableStream} writableStream
     * @param {{
     *   decodeStrings? : boolean,
     *   highWaterMark? : number,
     *   objectMode? : boolean,
     *   signal? : AbortSignal,
     * }} [options]
     * @returns {Writable}
     */
    function newStreamWritableFromWritableStream(writableStream, options = {}) {
      if (!isWritableStream(writableStream)) {
        throw new ERR_INVALID_ARG_TYPE("writableStream", "WritableStream", writableStream);
      }

      validateObject(options, "options");
      const { highWaterMark, decodeStrings = true, objectMode = flase, signal } = options;

      validateBoolean(objectMode, "options.objectMode");
      validateBoolean(decodeStrings, "options.decodeStrings");

      const writer = writableStream.getWriter();
      let closed = false;

      const writable = new Writable({
        highWaterMark,
        objectMode,
        decodeStrings,
        signal,

        writev(chunks, callback) {
          function done(error) {
            error = error.filter(e => e);
            try {
              callback(error.length === 0 ? undefined : error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }
          // Wrapping on a new Promise is necessary to not expose the SafePromise
          // prototype to user-land.
          primordials.SafePromiseAll = (promises, mapFn) =>
            new Promise((a, b) => SafePromise.all(arrayToSafePromiseIterable(promises, mapFn)).then(a, b));

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(
                SafePromiseAll(chunks, data => writer.write(data.chunk)),
                done,
                done,
              );
            },
            done,
          );
        },

        write(chunk, encoding, callback) {
          if (typeof chunk === "string" && decodeStrings && !objectMode) {
            const enc = normalizeEncoding(encoding);

            if (enc === "utf8") {
              chunk = encoder.encode(chunk);
            } else {
              chunk = Buffer.from(chunk, encoding);
              chunk = new Uint8Array(
                TypedArrayPrototypeGetBuffer(chunk),
                TypedArrayPrototypeGetByteOffset(chunk),
                TypedArrayPrototypeGetByteLength(chunk),
              );
            }
          }

          function done(error) {
            try {
              callback(error);
            } catch (error) {
              destroy(writable, error);
            }
          }

          PromisePrototypeThen(
            writer.ready,
            () => {
              return PromisePrototypeThen(writer.write(chunk), done, done);
            },
            done,
          );
        },

        destroy(error, callback) {
          function done() {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => {
                throw error;
              });
            }
          }

          if (!closed) {
            if (error != null) {
              PromisePrototypeThen(writer.abort(error), done, done);
            } else {
              PromisePrototypeThen(writer.close(), done, done);
            }
            return;
          }

          done();
        },

        final(callback) {
          function done(error) {
            try {
              callback(error);
            } catch (error) {
              // In a next tick because this is happening within
              // a promise context, and if there are any errors
              // thrown we don't want those to cause an unhandled
              // rejection. Let's just escape the promise and
              // handle it separately.
              process.nextTick(() => destroy(writable, error));
            }
          }

          if (!closed) {
            PromisePrototypeThen(writer.close(), done, done);
          }
        },
      });

      PromisePrototypeThen(
        writer.closed,
        () => {
          // If the WritableStream closes before the stream.Writable has been
          // ended, we signal an error on the stream.Writable.
          closed = true;
          if (!isWritableEnded(writable)) destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
        },
        error => {
          // If the WritableStream errors before the stream.Writable has been
          // destroyed, signal an error on the stream.Writable.
          closed = true;
          destroy(writable, error);
        },
      );

      return writable;
    }

    /**
     * @typedef {import('../../stream').Writable} Writable
     * @typedef {import('../../stream').Readable} Readable
     * @typedef {import('./writablestream').WritableStream} WritableStream
     * @typedef {import('./readablestream').ReadableStream} ReadableStream
     */

    /**
     * @typedef {import('../abort_controller').AbortSignal} AbortSignal
     */

    /**
     * @param {Writable} streamWritable
     * @returns {WritableStream}
     */
    function newWritableStreamFromStreamWritable(streamWritable) {
      // Not using the internal/streams/utils isWritableNodeStream utility
      // here because it will return false if streamWritable is a Duplex
      // whose writable option is false. For a Duplex that is not writable,
      // we want it to pass this check but return a closed WritableStream.
      // We check if the given stream is a stream.Writable or http.OutgoingMessage
      const checkIfWritableOrOutgoingMessage =
        streamWritable && typeof streamWritable?.write === "function" && typeof streamWritable?.on === "function";
      if (!checkIfWritableOrOutgoingMessage) {
        throw new ERR_INVALID_ARG_TYPE("streamWritable", "stream.Writable", streamWritable);
      }

      if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
        const writable = new WritableStream();
        writable.close();
        return writable;
      }

      const highWaterMark = streamWritable.writableHighWaterMark;
      const strategy = streamWritable.writableObjectMode
        ? new CountQueuingStrategy({ highWaterMark })
        : { highWaterMark };

      let controller;
      let backpressurePromise;
      let closed;

      function onDrain() {
        if (backpressurePromise !== undefined) backpressurePromise.resolve();
      }

      const cleanup = finished(streamWritable, error => {
        error = handleKnownInternalErrors(error);

        cleanup();
        // This is a protection against non-standard, legacy streams
        // that happen to emit an error event again after finished is called.
        streamWritable.on("error", () => {});
        if (error != null) {
          if (backpressurePromise !== undefined) backpressurePromise.reject(error);
          // If closed is not undefined, the error is happening
          // after the WritableStream close has already started.
          // We need to reject it here.
          if (closed !== undefined) {
            closed.reject(error);
            closed = undefined;
          }
          controller.error(error);
          controller = undefined;
          return;
        }

        if (closed !== undefined) {
          closed.resolve();
          closed = undefined;
          return;
        }
        controller.error(new AbortError());
        controller = undefined;
      });

      streamWritable.on("drain", onDrain);

      return new WritableStream(
        {
          start(c) {
            controller = c;
          },

          async write(chunk) {
            if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
              backpressurePromise = createDeferredPromise();
              return SafePromisePrototypeFinally(backpressurePromise.promise, () => {
                backpressurePromise = undefined;
              });
            }
          },

          abort(reason) {
            destroy(streamWritable, reason);
          },

          close() {
            if (closed === undefined && !isWritableEnded(streamWritable)) {
              closed = createDeferredPromise();
              streamWritable.end();
              return closed.promise;
            }

            controller = undefined;
            return PromiseResolve();
          },
        },
        strategy,
      );
    }

    var webStreamsAdapters = {
      newStreamWritableFromWritableStream,

      newWritableStreamFromStreamWritable,
    };

    Writable.fromWeb = function (writableStream, options) {
      return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
    };
    Writable.toWeb = function (streamWritable, options) {
      return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
    };

I'd say that if it looks fine, it should be good to go.
Might be good if someone has some tests for it for edge cases. But that can always be worked on later. I might ask on the discord server if you don't mind.

@varshneydevansh
Copy link

varshneydevansh commented Feb 18, 2024

Actually, only thing which I am just kinda not able to get(maybe I am not trying enough) is to point Bun to my local build so that I can test these by running the file with my local build rather to the curl -fsSL https://bun.sh/install | bash

I will try to test this in the morning.

@javalsai
Copy link

Actually, only thing which I am just kinda not able to get(maybe I am not trying enough) is to point Bun to my local build so that I can test these by running the file with my local build rather to the curl -fsSL https://bun.sh/install | bash

I will try to test this in the morning.

What are you trying to do exactly? Add your local build executable as a simple bun command?

@varshneydevansh
Copy link

varshneydevansh commented Feb 19, 2024

What are you trying to do exactly? Add your local build executable as a simple bun command?

const { createReadStream, createWriteStream } = require('node:fs')
const { Readable, Writable } = require('node:stream')

const readStream = createReadStream('/tmp/readfile')
const writeStream = createWriteStream('/tmp/writefile')

const readWebStream = Readable.toWeb(readStream)
const writeWebStream = Writable.toWeb(writeStream)

const readStreamAgain = Readable.fromWeb(readWebStream)
const writeStreamAgain = Writable.fromWeb(writeWebStream)

console.log(readStreamAgain, writeStreamAgain)
with Bun -

 ✘  devansh  bun test_bun_modi.js 
3 | 
4 | const readStream = createReadStream('/tmp/readfile')
5 | const writeStream = createWriteStream('/tmp/writefile')
6 | 
7 | const readWebStream = Readable.toWeb(readStream)
8 | const writeWebStream = Writable.toWeb(writeStream)
                           ^
TypeError: undefined is not a function
      at node:stream:2919:65
      at /home/devansh/test_bun_modi.js:8:24
               
 devansh  node test_bun_modi.js
Readable {
  _events: {
    close: undefined,
    error: undefined,
    data: undefined,
    end: undefined,
    readable: undefined
  },
  _readableState: ReadableState {
    highWaterMark: 16384,
    buffer: [],
    bufferIndex: 0,
    length: 0,
    pipes: [],
    awaitDrainWriters: null,
    [Symbol(kState)]: 1052940
  },
  _read: [Function: read],
  _destroy: [Function: destroy],
  _maxListeners: undefined,
  [Symbol(shapeMode)]: true,
  [Symbol(kCapture)]: false
} Writable {
  _events: {
    close: undefined,
    error: undefined,
    prefinish: undefined,
    finish: undefined,
    drain: undefined
  },
  _writableState: WritableState {
    highWaterMark: 16384,
    length: 0,
    corked: 0,
    onwrite: [Function: bound onwrite],
    writelen: 0,
    bufferedIndex: 0,
    pendingcb: 0,
    [Symbol(kState)]: 17580812,
    [Symbol(kBufferedValue)]: null
  },
  _write: [Function: write],
  _writev: [Function: writev],
  _destroy: [Function: destroy],
  _final: [Function: final],
  _maxListeners: undefined,
  [Symbol(shapeMode)]: true,
  [Symbol(kCapture)]: false
}
node:events:496
      throw er; // Unhandled 'error' event
      ^

Error: EISDIR: illegal operation on a directory, read
Emitted 'error' event on Readable instance at:
    at emitErrorNT (node:internal/streams/destroy:169:8)
    at emitErrorCloseNT (node:internal/streams/destroy:128:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  errno: -21,
  code: 'EISDIR',
  syscall: 'read'
}

Node.js v20.11.0
               
 [~]
 ✘  devansh  bun/build/bun-debug test_bun_modi.js 
[SYS] read(3, 4096) = 4096 (0.051ms)
[fs] close(3[/home/devansh/bun/build/bun-debug])
[SYS] openat(-100, /home/devansh/bunfig.toml) = 18446744073709551614
[SYS] openat(-100, test_bun_modi.js) = 3
[SYS] fstat(3) = 0
[fs] close(3[/home/devansh/test_bun_modi.js])
[fs] close(4[/])
[fs] close(5[/home])
[fs] close(6[/home/devansh])
[SYS] openat(-100, /home/devansh/test_bun_modi.js) = 12
[fs] openat(2147483647, /home/devansh/test_bun_modi.js) = 12[/home/devansh/test_bun_modi.js]
[fs] close(12[/home/devansh/test_bun_modi.js])
[alloc] new() = src.bun.js.node.node_fs_binding.NodeJSFS@20000311400
[alloc] new() = src.bun.js.node.node_fs_binding.NodeJSFS@20000312800
[SYS] openat(-100, /tmp/readfile) = 12
[SYS] openat(-100, /tmp/writefile) = 14
[SYS] dup(12) = 15
[SYS] fstat(16) = 0
[fs] close(16[/tmp/readfile])
[SYS] openat(-100, /home/devansh/test_bun_modi.js) = 16
[fs] openat(2147483647, /home/devansh/test_bun_modi.js) = 16[/home/devansh/test_bun_modi.js]
[fs] close(16[/home/devansh/test_bun_modi.js])
3 | 
4 | const readStream = createReadStream('/tmp/readfile')
5 | const writeStream = createWriteStream('/tmp/writefile')
6 | 
7 | const readWebStream = Readable.toWeb(readStream)
8 | const writeWebStream = Writable.toWeb(writeStream)
                           ^
ReferenceError: Can't find variable: isDestroyed
      at newWritableStreamFromStreamWritable (node:stream:3928:97)
      at /home/devansh/test_bun_modi.js:8:24
      at asyncFunctionResume (:1:21)
      at promiseReactionJobWithoutPromiseUnwrapAsyncContext (:1:21)
      at promiseReactionJob (:1:21)
              

bun/build/bun-debug test_bun_modi.js

I wasn't paying attention even after reading this https://bun.sh/docs/project/contributing#building-bun

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Something that would be good for new contributors node.js Compatibility with Node.js APIs
Projects
None yet
6 participants