WHATWG Streams in Node.js
@matteocollina
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
Let's agree that we cannot replace Node.js Streams
Why we want them in core
- Part of the Web standards
fetch()
?
- Make things more consistent in the ecosystspan
- All possible future uses!
Problems in adding them to core:
- Globals
- Same names of
require('stream').*
- Add confusion to confution
- semver-major and LTS
- compatibility with other core APIs
Readable.prototype.acquireStandardStream = function() {
emitExperimentalWarning('Readable.acquireStandardStream');
return new ReadableStream({
start: (controller) => {
this.pause();
this.on('data', (chunk) => {
// double buffer is happening
controller.enqueue(chunk);
this.pause();
});
this.once('end', () => controller.close());
this.once('error', (e) => controller.error(e));
},
pull: () => {
this.resume();
},
cancel: () => {
this.destroy();
},
}, {
highWaterMark: this.readableHighWaterMark,
});
};
Double buffering as a non-goal
Async Iterators
async function print(readable) {
// This function works the same on both
// and Node streams can match the WHATWG API
// for async iterators
let data = '';
for await (const k of readable) {
data += k;
}
console.log(data);
}
The pipeline problem
async function run(origin, dest) {
try {
// buildWrite is hard
const write = buildWrite(dest);
// This is an async iterator
for await (let chunk of origin) {
await write(chunk.toString().toUpperCase());
}
await promisify(finished)(dest);
} catch (err) {
origin.destroy(err);
dest.destroy(err);
}
}
async function run(origin, dest) {
function buildWrite(stream) {
// This is a good way of wrapping stream.write into a Promise.
// We are waiting for a drain event to resolve, and we are wrapping
// the error event. A consumer should probably use finished to
// know if the stream has completed.
const streamError = null;
stream.on("error", function(err) {
streamError = err;
});
return write;
function write(chunk) {
if (streamError) {
return Promise.reject(streamError);
}
return new Promise(function(resolve, reject) {
const res = stream.write(chunk);
if (res) {
resolve();
} else {
stream.once("drain", resolve);
}
});
}
}
Path forward
- Take asyncIterator out of experimental in Node 10 and 11
- Solve the "pipeline" problem with an utility in core
Proposal:
require('whatwg-stream')
- Ecosystem module
- Semver-versioned
- Support for node 6, 8, 10, 11
Compatibility between the two streams
pipeline(
whatwgReadable,
nodeTransform,
whatwgWritable,
(err) => {})
pipeline(
nodeReadable,
whatwgTransform1,
whatwgTransform2,
nodeWritable,
(err) => {})
Questions?