-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stream.Writable.toWeb
#3927
Comments
If this is still active, can I look into this? May I know if there are any initial indicators that I should be aware of? |
I don't know if you are still interesting, but if someone really needs to get that function to work, this workaround worked for me: import { type Writable } from 'node:stream'
function WriteStreamToWeb(streamWritable: Writable): WritableStream {
return new WritableStream({
start() { },
write(chunk) {
streamWritable.write(chunk)
},
close() {
streamWritable.end()
},
abort() { },
})
} and ofc the JS counterpart function WriteStreamToWeb(streamWritable) {
return new WritableStream({
start() { },
write(chunk) {
streamWritable.write(chunk)
},
close() {
streamWritable.end()
},
abort() { },
})
} |
Well, I just found out that |
@javalsai so should I work on this? |
I don't think it's necessary, You can take a look at why the native I don't know about Zig or how the Bun source code is structured, but if you decide to take a look and find something interesting feel free to share here! |
Wait no, I imported |
Messing with the code I found that this file might be relevant: Specially lines 3283, 3369, 4025 and 4017 (what is that function supposed to do?) |
Ig somthing like this would fix diff --git a/src/js/node/stream.js b/src/js/node/stream.js
index 5297c033c..c2c0d0ddd 100644
--- a/src/js/node/stream.js
+++ b/src/js/node/stream.js
@@ -4013,16 +4013,29 @@ var require_writable = __commonJS({
Writable.prototype[EE.captureRejectionSymbol] = function (err) {
this.destroy(err);
};
- var webStreamsAdapters;
- function lazyWebStreams() {
- if (webStreamsAdapters === void 0) webStreamsAdapters = {};
- return webStreamsAdapters;
- }
+ var webStreamsAdapters = {
+ newStreamWritableFromWritableStream() {
+ // TODO:
+ },
+ newWritableStreamFromStreamWritable(streamWritable) {
+ // ! This is very likely incomplete
+ return new WritableStream({
+ start() { },
+ write(chunk) {
+ streamWritable.write(chunk)
+ },
+ close() {
+ streamWritable.end()
+ },
+ abort() { },
+ })
+ }
+ };
Writable.fromWeb = function (writableStream, options) {
- return lazyWebStreams().newStreamWritableFromWritableStream(writableStream, options);
+ return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
};
Writable.toWeb = function (streamWritable) {
- return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
+ return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
};
},
}); NOTE: I also found that |
Okay I will look into this @javalsai and will create the PR |
Sure, good luck! And ask if you want help with anything, I'd love to see this fixed on next release. Try to mimic the checks that the reader |
I'll try my best thanks for helping this much =) |
Hi @javalsai I completed the development setup and understood what needs to be done to an extent but I just couldn't understand how do I test it? |
I don't really know how bun structures everything, but my guess is that you'll just have to implement the functions Writable functions to behave like they do on node (just follow a similar logic to Readable which is implemented in bun) and then compile it and test some code that relies on it. Maybe this will be useful? https://github.com/oven-sh/bun/blob/main/CONTRIBUTING.md Also, check if there has been any change on this since then. I'ma make a quick JS file that works on node and not on bun for you to test this. |
@varshneydevansh this should play with those adapter functions enough to test them (make sure to make /tmp/readfile or just change it to any existing file or device...) const { createReadStream, createWriteStream } = require('node:fs')
const { Readable, Writable } = require('node:stream')
const readStream = createReadStream('/tmp/readfile')
const writeStream = createWriteStream('/tmp/writefile')
const readWebStream = Readable.toWeb(readStream)
const writeWebStream = Writable.toWeb(writeStream)
const readStreamAgain = Readable.fromWeb(readWebStream)
const writeStreamAgain = Writable.fromWeb(writeWebStream)
console.log(readStreamAgain, writeStreamAgain) Running that with node will adapt the stream 2 times in both directions and print the streams (will appear as object data on console) |
follow this to setup a devenv: https://bun.sh/docs/project/contributing the code you need to edit is src/js/stream.js. this code is bundled version of the readable-stream package that has a ton of random edits to have some hooks into native code.
Yeah that looks about right. we can also inline these function calls into the actual toWeb function insetad of having it be a function that calls another one. Also, we should handle |
Thank you for the help Dave and Sai. I was reading this earlier - https://nodejs.org/dist/latest-v17.x/docs/api/stream.html#class-streamwritable |
Sure, that's was just a hotfix that I'm using temporarily, might be useful to get the basic structure of it. Also, don't forget that Duplex has those functions empty just like In fact, I ended up with these utils working: export function writable_ToWeb(streamWritable: Writable): WritableStream {
return new WritableStream({
start() { },
write(chunk) {
streamWritable.write(chunk)
},
close() {
streamWritable.end()
},
abort() { },
})
}
export function transformerToWeb(transformer: Transform) {
return {
readable: Readable.toWeb(transformer) as ReadableStream,
writable: writable_ToWeb(transformer),
}
} Ik these are Transformers, not Duplexes, but they seem to be the same concept |
This is the Duplex one where the same change is needed? - var webStreamsAdapters;
function lazyWebStreams() {
if (webStreamsAdapters === void 0) webStreamsAdapters = {};
return webStreamsAdapters;
}
Duplex.fromWeb = function (pair, options) {
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(pair, options);
};
Duplex.toWeb = function (duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
}; and this is the place of the Readable one? - Readable.fromWeb = function (readableStream, options) {
return webStreamsAdapters.newStreamReadableFromReadableStream(readableStream, options);
};
Readable.toWeb = function (streamReadable, options) {
return webStreamsAdapters.newReadableStreamFromStreamReadable(streamReadable, options);
}; This is what I have done for the Writable - Writable.fromWeb = function (writableStream, options) {
return new WritableStream({
start() { },
write(chunk) {
writableStream.write(chunk)
},
close() {
writableStream.end()
},
abort(reason) {
const err = new Error(`Abort: ${reason}`);
writableStream.destroy(err);
},
});
};
Writable.toWeb = function (streamWritable) {
return new WritableStream({
start() { },
write(chunk) {
try {
streamWritable.write(chunk);
} catch (e) {
this.controller.error(e);
}
},
close() {
streamWritable.end();
},
abort(reason) {
streamWritable.destroy(new Error(`Abort: ${reason}`));
},
});
}; |
@varshneydevansh don't worry about duplexes if you're not sure on how to do it, they're not close to being as important as the writable functions. Also, the Two last things:
Nice work btw! |
Or maybe just omit
|
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This is what I try to do today - Writable.fromWeb = function (writableStream, options = {}) {
const { encoding = 'utf-8', highWaterMark = 16384 } = options;
return new WritableStream({
start(controller) {
this.controller = controller;
},
write(chunk) {
try {
writableStream.write(chunk, encoding);
} catch (error) {
this.controller.error(error);
}
// Backpressure handling:
if (writableStream.writable && writableStream.writableEnded) {
this.controller.close();
} else if (writableStream.writableHighWaterMark !== undefined &&
writableStream.bufferedAmount >= writableStream.writableHighWaterMark) {
this.controller.desiredSize = 0;
}
},
close() {
writableStream.end();
},
abort(reason) {
const err = new Error(`Abort: ${reason}`);
writableStream.destroy(err);
},
});
};
Writable.toWeb = function (streamWritable, options = {}) {
const { highWaterMark = 16384 } = options;
return new WritableStream({
start(controller) {
this.controller = controller;
},
write(chunk) {
try {
streamWritable.write(chunk);
} catch (error) {
this.controller.error(error);
}
// Backpressure handling:
if (streamWritable.bufferedAmount >= highWaterMark) {
this.controller.desiredSize = 0;
}
},
close() {
streamWritable.end();
},
abort(reason) {
const err = new Error(`Abort: ${reason}`);
streamWritable.destroy(err);
},
});
}; Is this better than what I earlier did? |
This comment was marked as off-topic.
This comment was marked as off-topic.
He was working on some changes on Bun's node:streams implementation, not some program that uses it. I'm not sure how the official functions should behave, so I didn't answer him, but they don't look bad to me. |
Is this reasonable? varshneydevansh@acf43f6 /**
* @param {WritableStream} writableStream
* @param {{
* decodeStrings? : boolean,
* highWaterMark? : number,
* objectMode? : boolean,
* signal? : AbortSignal,
* }} [options]
* @returns {Writable}
*/
function newStreamWritableFromWritableStream(writableStream, options = {}) {
if (!isWritableStream(writableStream)) {
throw new ERR_INVALID_ARG_TYPE("writableStream", "WritableStream", writableStream);
}
validateObject(options, "options");
const { highWaterMark, decodeStrings = true, objectMode = flase, signal } = options;
validateBoolean(objectMode, "options.objectMode");
validateBoolean(decodeStrings, "options.decodeStrings");
const writer = writableStream.getWriter();
let closed = false;
const writable = new Writable({
highWaterMark,
objectMode,
decodeStrings,
signal,
writev(chunks, callback) {
function done(error) {
error = error.filter(e => e);
try {
callback(error.length === 0 ? undefined : error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(writable, error));
}
}
// Wrapping on a new Promise is necessary to not expose the SafePromise
// prototype to user-land.
primordials.SafePromiseAll = (promises, mapFn) =>
new Promise((a, b) => SafePromise.all(arrayToSafePromiseIterable(promises, mapFn)).then(a, b));
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(
SafePromiseAll(chunks, data => writer.write(data.chunk)),
done,
done,
);
},
done,
);
},
write(chunk, encoding, callback) {
if (typeof chunk === "string" && decodeStrings && !objectMode) {
const enc = normalizeEncoding(encoding);
if (enc === "utf8") {
chunk = encoder.encode(chunk);
} else {
chunk = Buffer.from(chunk, encoding);
chunk = new Uint8Array(
TypedArrayPrototypeGetBuffer(chunk),
TypedArrayPrototypeGetByteOffset(chunk),
TypedArrayPrototypeGetByteLength(chunk),
);
}
}
function done(error) {
try {
callback(error);
} catch (error) {
destroy(writable, error);
}
}
PromisePrototypeThen(
writer.ready,
() => {
return PromisePrototypeThen(writer.write(chunk), done, done);
},
done,
);
},
destroy(error, callback) {
function done() {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error;
});
}
}
if (!closed) {
if (error != null) {
PromisePrototypeThen(writer.abort(error), done, done);
} else {
PromisePrototypeThen(writer.close(), done, done);
}
return;
}
done();
},
final(callback) {
function done(error) {
try {
callback(error);
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => destroy(writable, error));
}
}
if (!closed) {
PromisePrototypeThen(writer.close(), done, done);
}
},
});
PromisePrototypeThen(
writer.closed,
() => {
// If the WritableStream closes before the stream.Writable has been
// ended, we signal an error on the stream.Writable.
closed = true;
if (!isWritableEnded(writable)) destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
},
error => {
// If the WritableStream errors before the stream.Writable has been
// destroyed, signal an error on the stream.Writable.
closed = true;
destroy(writable, error);
},
);
return writable;
}
/**
* @typedef {import('../../stream').Writable} Writable
* @typedef {import('../../stream').Readable} Readable
* @typedef {import('./writablestream').WritableStream} WritableStream
* @typedef {import('./readablestream').ReadableStream} ReadableStream
*/
/**
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
*/
/**
* @param {Writable} streamWritable
* @returns {WritableStream}
*/
function newWritableStreamFromStreamWritable(streamWritable) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
// we want it to pass this check but return a closed WritableStream.
// We check if the given stream is a stream.Writable or http.OutgoingMessage
const checkIfWritableOrOutgoingMessage =
streamWritable && typeof streamWritable?.write === "function" && typeof streamWritable?.on === "function";
if (!checkIfWritableOrOutgoingMessage) {
throw new ERR_INVALID_ARG_TYPE("streamWritable", "stream.Writable", streamWritable);
}
if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
const writable = new WritableStream();
writable.close();
return writable;
}
const highWaterMark = streamWritable.writableHighWaterMark;
const strategy = streamWritable.writableObjectMode
? new CountQueuingStrategy({ highWaterMark })
: { highWaterMark };
let controller;
let backpressurePromise;
let closed;
function onDrain() {
if (backpressurePromise !== undefined) backpressurePromise.resolve();
}
const cleanup = finished(streamWritable, error => {
error = handleKnownInternalErrors(error);
cleanup();
// This is a protection against non-standard, legacy streams
// that happen to emit an error event again after finished is called.
streamWritable.on("error", () => {});
if (error != null) {
if (backpressurePromise !== undefined) backpressurePromise.reject(error);
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if (closed !== undefined) {
closed.reject(error);
closed = undefined;
}
controller.error(error);
controller = undefined;
return;
}
if (closed !== undefined) {
closed.resolve();
closed = undefined;
return;
}
controller.error(new AbortError());
controller = undefined;
});
streamWritable.on("drain", onDrain);
return new WritableStream(
{
start(c) {
controller = c;
},
async write(chunk) {
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = createDeferredPromise();
return SafePromisePrototypeFinally(backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
}
},
abort(reason) {
destroy(streamWritable, reason);
},
close() {
if (closed === undefined && !isWritableEnded(streamWritable)) {
closed = createDeferredPromise();
streamWritable.end();
return closed.promise;
}
controller = undefined;
return PromiseResolve();
},
},
strategy,
);
}
var webStreamsAdapters = {
newStreamWritableFromWritableStream,
newWritableStreamFromStreamWritable,
};
Writable.fromWeb = function (writableStream, options) {
return webStreamsAdapters.newStreamWritableFromWritableStream(writableStream, options);
};
Writable.toWeb = function (streamWritable, options) {
return webStreamsAdapters.newWritableStreamFromStreamWritable(streamWritable);
}; |
I'd say that if it looks fine, it should be good to go. |
Actually, only thing which I am just kinda not able to get(maybe I am not trying enough) is to point Bun to my local build so that I can test these by running the file with my local build rather to the I will try to test this in the morning. |
What are you trying to do exactly? Add your local build executable as a simple |
const { createReadStream, createWriteStream } = require('node:fs')
const { Readable, Writable } = require('node:stream')
const readStream = createReadStream('/tmp/readfile')
const writeStream = createWriteStream('/tmp/writefile')
const readWebStream = Readable.toWeb(readStream)
const writeWebStream = Writable.toWeb(writeStream)
const readStreamAgain = Readable.fromWeb(readWebStream)
const writeStreamAgain = Writable.fromWeb(writeWebStream)
console.log(readStreamAgain, writeStreamAgain) with Bun -
✘ devansh bun test_bun_modi.js
3 |
4 | const readStream = createReadStream('/tmp/readfile')
5 | const writeStream = createWriteStream('/tmp/writefile')
6 |
7 | const readWebStream = Readable.toWeb(readStream)
8 | const writeWebStream = Writable.toWeb(writeStream)
^
TypeError: undefined is not a function
at node:stream:2919:65
at /home/devansh/test_bun_modi.js:8:24
devansh node test_bun_modi.js
Readable {
_events: {
close: undefined,
error: undefined,
data: undefined,
end: undefined,
readable: undefined
},
_readableState: ReadableState {
highWaterMark: 16384,
buffer: [],
bufferIndex: 0,
length: 0,
pipes: [],
awaitDrainWriters: null,
[Symbol(kState)]: 1052940
},
_read: [Function: read],
_destroy: [Function: destroy],
_maxListeners: undefined,
[Symbol(shapeMode)]: true,
[Symbol(kCapture)]: false
} Writable {
_events: {
close: undefined,
error: undefined,
prefinish: undefined,
finish: undefined,
drain: undefined
},
_writableState: WritableState {
highWaterMark: 16384,
length: 0,
corked: 0,
onwrite: [Function: bound onwrite],
writelen: 0,
bufferedIndex: 0,
pendingcb: 0,
[Symbol(kState)]: 17580812,
[Symbol(kBufferedValue)]: null
},
_write: [Function: write],
_writev: [Function: writev],
_destroy: [Function: destroy],
_final: [Function: final],
_maxListeners: undefined,
[Symbol(shapeMode)]: true,
[Symbol(kCapture)]: false
}
node:events:496
throw er; // Unhandled 'error' event
^
Error: EISDIR: illegal operation on a directory, read
Emitted 'error' event on Readable instance at:
at emitErrorNT (node:internal/streams/destroy:169:8)
at emitErrorCloseNT (node:internal/streams/destroy:128:3)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
errno: -21,
code: 'EISDIR',
syscall: 'read'
}
Node.js v20.11.0
[~]
✘ devansh bun/build/bun-debug test_bun_modi.js
[SYS] read(3, 4096) = 4096 (0.051ms)
[fs] close(3[/home/devansh/bun/build/bun-debug])
[SYS] openat(-100, /home/devansh/bunfig.toml) = 18446744073709551614
[SYS] openat(-100, test_bun_modi.js) = 3
[SYS] fstat(3) = 0
[fs] close(3[/home/devansh/test_bun_modi.js])
[fs] close(4[/])
[fs] close(5[/home])
[fs] close(6[/home/devansh])
[SYS] openat(-100, /home/devansh/test_bun_modi.js) = 12
[fs] openat(2147483647, /home/devansh/test_bun_modi.js) = 12[/home/devansh/test_bun_modi.js]
[fs] close(12[/home/devansh/test_bun_modi.js])
[alloc] new() = src.bun.js.node.node_fs_binding.NodeJSFS@20000311400
[alloc] new() = src.bun.js.node.node_fs_binding.NodeJSFS@20000312800
[SYS] openat(-100, /tmp/readfile) = 12
[SYS] openat(-100, /tmp/writefile) = 14
[SYS] dup(12) = 15
[SYS] fstat(16) = 0
[fs] close(16[/tmp/readfile])
[SYS] openat(-100, /home/devansh/test_bun_modi.js) = 16
[fs] openat(2147483647, /home/devansh/test_bun_modi.js) = 16[/home/devansh/test_bun_modi.js]
[fs] close(16[/home/devansh/test_bun_modi.js])
3 |
4 | const readStream = createReadStream('/tmp/readfile')
5 | const writeStream = createWriteStream('/tmp/writefile')
6 |
7 | const readWebStream = Readable.toWeb(readStream)
8 | const writeWebStream = Writable.toWeb(writeStream)
^
ReferenceError: Can't find variable: isDestroyed
at newWritableStreamFromStreamWritable (node:stream:3928:97)
at /home/devansh/test_bun_modi.js:8:24
at asyncFunctionResume (:1:21)
at promiseReactionJobWithoutPromiseUnwrapAsyncContext (:1:21)
at promiseReactionJob (:1:21)
I wasn't paying attention even after reading this https://bun.sh/docs/project/contributing#building-bun |
What version of Bun is running?
0.7.2
What platform is your computer?
No response
What steps can reproduce the bug?
What is the expected behavior?
No response
What do you see instead?
No response
Additional information
No response
The text was updated successfully, but these errors were encountered: