Skip to content

Commit a1b253a

Browse files
mafintoshBethGriggs
authored andcommitted
stream: add auto-destroy mode
PR-URL: #22795 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Anna Henningsen <[email protected]> Reviewed-By: Jeremiah Senkpiel <[email protected]>
1 parent 3516a27 commit a1b253a

File tree

5 files changed

+158
-12
lines changed

5 files changed

+158
-12
lines changed

doc/api/stream.md

+17
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,11 @@ changes:
15031503
pr-url: https://github.com/nodejs/node/pull/18438
15041504
description: >
15051505
Add `emitClose` option to specify if `'close'` is emitted on destroy
1506+
- version: REPLACEME
1507+
pr-url: https://github.com/nodejs/node/pull/22795
1508+
description: >
1509+
Add `autoDestroy` option to automatically `destroy()` the stream
1510+
when it emits `'finish'` or errors
15061511
-->
15071512

15081513
* `options` {Object}
@@ -1531,6 +1536,8 @@ changes:
15311536
[`stream._destroy()`][writable-_destroy] method.
15321537
* `final` {Function} Implementation for the
15331538
[`stream._final()`][stream-_final] method.
1539+
* `autoDestroy` {boolean} Whether this stream should automatically call
1540+
`.destroy()` on itself after ending. **Default:** `false`.
15341541

15351542
<!-- eslint-disable no-useless-constructor -->
15361543
```js
@@ -1762,6 +1769,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])`
17621769
constructor and implement the `readable._read()` method.
17631770

17641771
#### new stream.Readable([options])
1772+
<!-- YAML
1773+
changes:
1774+
- version: REPLACEME
1775+
pr-url: https://github.com/nodejs/node/pull/22795
1776+
description: >
1777+
Add `autoDestroy` option to automatically `destroy()` the stream
1778+
when it emits `'end'` or errors
1779+
-->
17651780

17661781
* `options` {Object}
17671782
* `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
@@ -1776,6 +1791,8 @@ constructor and implement the `readable._read()` method.
17761791
method.
17771792
* `destroy` {Function} Implementation for the
17781793
[`stream._destroy()`][readable-_destroy] method.
1794+
* `autoDestroy` {boolean} Whether this stream should automatically call
1795+
`.destroy()` on itself after ending. **Default:** `false`.
17791796

17801797
<!-- eslint-disable no-useless-constructor -->
17811798
```js

lib/_stream_readable.js

+18-5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;
4646

4747
util.inherits(Readable, Stream);
4848

49+
const { errorOrDestroy } = destroyImpl;
4950
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
5051

5152
function prependListener(emitter, event, fn) {
@@ -118,6 +119,9 @@ function ReadableState(options, stream, isDuplex) {
118119
// Should close be emitted on destroy. Defaults to true.
119120
this.emitClose = options.emitClose !== false;
120121

122+
// Should .destroy() be called after 'end' (and potentially 'finish')
123+
this.autoDestroy = !!options.autoDestroy;
124+
121125
// has it been destroyed
122126
this.destroyed = false;
123127

@@ -236,7 +240,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
236240
if (!skipChunkCheck)
237241
er = chunkInvalid(state, chunk);
238242
if (er) {
239-
stream.emit('error', er);
243+
errorOrDestroy(stream, er);
240244
} else if (state.objectMode || chunk && chunk.length > 0) {
241245
if (typeof chunk !== 'string' &&
242246
!state.objectMode &&
@@ -246,11 +250,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
246250

247251
if (addToFront) {
248252
if (state.endEmitted)
249-
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
253+
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
250254
else
251255
addChunk(stream, state, chunk, true);
252256
} else if (state.ended) {
253-
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
257+
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
254258
} else if (state.destroyed) {
255259
return false;
256260
} else {
@@ -604,7 +608,7 @@ function maybeReadMore_(stream, state) {
604608
// for virtual (non-string, non-buffer) streams, "length" is somewhat
605609
// arbitrary, and perhaps not very meaningful.
606610
Readable.prototype._read = function(n) {
607-
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
611+
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
608612
};
609613

610614
Readable.prototype.pipe = function(dest, pipeOpts) {
@@ -710,7 +714,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
710714
unpipe();
711715
dest.removeListener('error', onerror);
712716
if (EE.listenerCount(dest, 'error') === 0)
713-
dest.emit('error', er);
717+
errorOrDestroy(dest, er);
714718
}
715719

716720
// Make sure our error handler is attached before userland ones.
@@ -1123,5 +1127,14 @@ function endReadableNT(state, stream) {
11231127
state.endEmitted = true;
11241128
stream.readable = false;
11251129
stream.emit('end');
1130+
1131+
if (state.autoDestroy) {
1132+
// In case of duplex streams we need a way to detect
1133+
// if the writable side is ready for autoDestroy as well
1134+
const wState = stream._writableState;
1135+
if (!wState || (wState.autoDestroy && wState.finished)) {
1136+
stream.destroy();
1137+
}
1138+
}
11261139
}
11271140
}

lib/_stream_writable.js

+20-6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const {
4545
ERR_UNKNOWN_ENCODING
4646
} = require('internal/errors').codes;
4747

48+
const { errorOrDestroy } = destroyImpl;
49+
4850
util.inherits(Writable, Stream);
4951

5052
function nop() {}
@@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
147149
// Should close be emitted on destroy. Defaults to true.
148150
this.emitClose = options.emitClose !== false;
149151

152+
// Should .destroy() be called after 'finish' (and potentially 'end')
153+
this.autoDestroy = !!options.autoDestroy;
154+
150155
// count buffered requests
151156
this.bufferedRequestCount = 0;
152157

@@ -235,14 +240,14 @@ function Writable(options) {
235240

236241
// Otherwise people can pipe Writable streams, which is just wrong.
237242
Writable.prototype.pipe = function() {
238-
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
243+
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
239244
};
240245

241246

242247
function writeAfterEnd(stream, cb) {
243248
var er = new ERR_STREAM_WRITE_AFTER_END();
244249
// TODO: defer error events consistently everywhere, not just the cb
245-
stream.emit('error', er);
250+
errorOrDestroy(stream, er);
246251
process.nextTick(cb, er);
247252
}
248253

@@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
258263
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
259264
}
260265
if (er) {
261-
stream.emit('error', er);
266+
errorOrDestroy(stream, er);
262267
process.nextTick(cb, er);
263268
return false;
264269
}
@@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
422427
// after error
423428
process.nextTick(finishMaybe, stream, state);
424429
stream._writableState.errorEmitted = true;
425-
stream.emit('error', er);
430+
errorOrDestroy(stream, er);
426431
} else {
427432
// the caller expect this to happen before if
428433
// it is async
429434
cb(er);
430435
stream._writableState.errorEmitted = true;
431-
stream.emit('error', er);
436+
errorOrDestroy(stream, er);
432437
// this can emit finish, but finish must
433438
// always follow error
434439
finishMaybe(stream, state);
@@ -612,7 +617,7 @@ function callFinal(stream, state) {
612617
stream._final((err) => {
613618
state.pendingcb--;
614619
if (err) {
615-
stream.emit('error', err);
620+
errorOrDestroy(stream, err);
616621
}
617622
state.prefinished = true;
618623
stream.emit('prefinish');
@@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
639644
if (state.pendingcb === 0) {
640645
state.finished = true;
641646
stream.emit('finish');
647+
648+
if (state.autoDestroy) {
649+
// In case of duplex streams we need a way to detect
650+
// if the readable side is ready for autoDestroy as well
651+
const rState = stream._readableState;
652+
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
653+
stream.destroy();
654+
}
655+
}
642656
}
643657
}
644658
return need;

lib/internal/streams/destroy.js

+19-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,25 @@ function emitErrorNT(self, err) {
8282
self.emit('error', err);
8383
}
8484

85+
function errorOrDestroy(stream, err) {
86+
// We have tests that rely on errors being emitted
87+
// in the same tick, so changing this is semver major.
88+
// For now when you opt-in to autoDestroy we allow
89+
// the error to be emitted nextTick. In a future
90+
// semver major update we should change the default to this.
91+
92+
const rState = stream._readableState;
93+
const wState = stream._writableState;
94+
95+
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
96+
stream.destroy(err);
97+
else
98+
stream.emit('error', err);
99+
}
100+
101+
85102
module.exports = {
86103
destroy,
87-
undestroy
104+
undestroy,
105+
errorOrDestroy
88106
};
+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
'use strict';
2+
const common = require('../common');
3+
const stream = require('stream');
4+
const assert = require('assert');
5+
6+
{
7+
const r = new stream.Readable({
8+
autoDestroy: true,
9+
read() {
10+
this.push('hello');
11+
this.push('world');
12+
this.push(null);
13+
},
14+
destroy: common.mustCall((err, cb) => cb())
15+
});
16+
17+
let ended = false;
18+
19+
r.resume();
20+
21+
r.on('end', common.mustCall(() => {
22+
ended = true;
23+
}));
24+
25+
r.on('close', common.mustCall(() => {
26+
assert(ended);
27+
}));
28+
}
29+
30+
{
31+
const w = new stream.Writable({
32+
autoDestroy: true,
33+
write(data, enc, cb) {
34+
cb(null);
35+
},
36+
destroy: common.mustCall((err, cb) => cb())
37+
});
38+
39+
let finished = false;
40+
41+
w.write('hello');
42+
w.write('world');
43+
w.end();
44+
45+
w.on('finish', common.mustCall(() => {
46+
finished = true;
47+
}));
48+
49+
w.on('close', common.mustCall(() => {
50+
assert(finished);
51+
}));
52+
}
53+
54+
{
55+
const t = new stream.Transform({
56+
autoDestroy: true,
57+
transform(data, enc, cb) {
58+
cb(null, data);
59+
},
60+
destroy: common.mustCall((err, cb) => cb())
61+
});
62+
63+
let ended = false;
64+
let finished = false;
65+
66+
t.write('hello');
67+
t.write('world');
68+
t.end();
69+
70+
t.resume();
71+
72+
t.on('end', common.mustCall(() => {
73+
ended = true;
74+
}));
75+
76+
t.on('finish', common.mustCall(() => {
77+
finished = true;
78+
}));
79+
80+
t.on('close', common.mustCall(() => {
81+
assert(ended);
82+
assert(finished);
83+
}));
84+
}

0 commit comments

Comments
 (0)