Skip to content

Commit 271a702

Browse files
benjamingrronagatlowChemi
committed
stream: support dispose in writable
Add support to Symbol.asyncDispose in writable streams. Additionally add a test for writable, transform and duplex streams who inherit from readable/writable to avoid breakage. Co-authored-by: Robert Nagy <[email protected]> Co-authored-by: atlowChemi <[email protected]>
1 parent 852fa55 commit 271a702

File tree

5 files changed

+74
-10
lines changed

5 files changed

+74
-10
lines changed

doc/api/stream.md

+11
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,17 @@ added: v12.3.0
954954

955955
Getter for the property `objectMode` of a given `Writable` stream.
956956

957+
##### `writable[Symbol.asyncDispose]()`
958+
959+
<!-- YAML
960+
added: REPLACEME
961+
-->
962+
963+
> Stability: 1 - Experimental
964+
965+
Calls [`writable.destroy()`][writable-destroy] with an `AbortError` and returns
966+
a promise that fulfills when the stream is finished.
967+
957968
##### `writable.write(chunk[, encoding][, callback])`
958969

959970
<!-- YAML

lib/internal/streams/writable.js

+25-10
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ const {
3232
ObjectDefineProperties,
3333
ObjectDefineProperty,
3434
ObjectSetPrototypeOf,
35+
Promise,
3536
StringPrototypeToLowerCase,
3637
Symbol,
38+
SymbolAsyncDispose,
3739
SymbolHasInstance,
3840
} = primordials;
3941

@@ -44,6 +46,7 @@ const EE = require('events');
4446
const Stream = require('internal/streams/legacy').Stream;
4547
const { Buffer } = require('buffer');
4648
const destroyImpl = require('internal/streams/destroy');
49+
const eos = require('internal/streams/end-of-stream');
4750

4851
const {
4952
addAbortSignal,
@@ -54,16 +57,19 @@ const {
5457
getDefaultHighWaterMark,
5558
} = require('internal/streams/state');
5659
const {
57-
ERR_INVALID_ARG_TYPE,
58-
ERR_METHOD_NOT_IMPLEMENTED,
59-
ERR_MULTIPLE_CALLBACK,
60-
ERR_STREAM_CANNOT_PIPE,
61-
ERR_STREAM_DESTROYED,
62-
ERR_STREAM_ALREADY_FINISHED,
63-
ERR_STREAM_NULL_VALUES,
64-
ERR_STREAM_WRITE_AFTER_END,
65-
ERR_UNKNOWN_ENCODING,
66-
} = require('internal/errors').codes;
60+
AbortError,
61+
codes: {
62+
ERR_INVALID_ARG_TYPE,
63+
ERR_METHOD_NOT_IMPLEMENTED,
64+
ERR_MULTIPLE_CALLBACK,
65+
ERR_STREAM_CANNOT_PIPE,
66+
ERR_STREAM_DESTROYED,
67+
ERR_STREAM_ALREADY_FINISHED,
68+
ERR_STREAM_NULL_VALUES,
69+
ERR_STREAM_WRITE_AFTER_END,
70+
ERR_UNKNOWN_ENCODING,
71+
},
72+
} = require('internal/errors');
6773
const {
6874
kState,
6975
// bitfields
@@ -1142,3 +1148,12 @@ Writable.fromWeb = function(writableStream, options) {
11421148
Writable.toWeb = function(streamWritable) {
11431149
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
11441150
};
1151+
1152+
Writable.prototype[SymbolAsyncDispose] = function() {
1153+
let error;
1154+
if (!this.destroyed) {
1155+
error = this.writableFinished ? null : new AbortError();
1156+
this.destroy(error);
1157+
}
1158+
return new Promise((resolve, reject) => eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))));
1159+
};

test/parallel/test-stream-duplex-destroy.js

+15
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,18 @@ const assert = require('assert');
269269
}));
270270
duplex.destroy();
271271
}
272+
273+
{
274+
// Check Symbol.asyncDispose
275+
const duplex = new Duplex({
276+
write(chunk, enc, cb) { cb(); },
277+
read() {},
278+
});
279+
let count = 0;
280+
duplex.on('error', common.mustCall((e) => {
281+
assert.strictEqual(count++, 0); // Ensure not called twice
282+
assert.strictEqual(e.name, 'AbortError');
283+
}));
284+
duplex.on('close', common.mustCall());
285+
duplex[Symbol.asyncDispose]().then(common.mustCall());
286+
}

test/parallel/test-stream-transform-destroy.js

+11
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,14 @@ const assert = require('assert');
141141

142142
transform.destroy();
143143
}
144+
145+
{
146+
const transform = new Transform({
147+
transform(chunk, enc, cb) {}
148+
});
149+
transform.on('error', common.mustCall((err) => {
150+
assert.strictEqual(err.name, 'AbortError');
151+
}));
152+
transform.on('close', common.mustCall());
153+
transform[Symbol.asyncDispose]().then(common.mustCall());
154+
}

test/parallel/test-stream-writable-destroy.js

+12
Original file line numberDiff line numberDiff line change
@@ -487,3 +487,15 @@ const assert = require('assert');
487487
}));
488488
s.destroy(_err);
489489
}
490+
491+
{
492+
const write = new Writable({
493+
write(chunk, enc, cb) { cb(); }
494+
});
495+
496+
write.on('error', common.mustCall((e) => {
497+
assert.strictEqual(e.name, 'AbortError');
498+
assert.strictEqual(write.destroyed, true);
499+
}));
500+
write[Symbol.asyncDispose]().then(common.mustCall());
501+
}

0 commit comments

Comments
 (0)