Skip to content

Commit 489cebf

Browse files
committed
stream: try to fix, this can have major consonances, really WIP
just want the CI to run
1 parent b59b8dc commit 489cebf

File tree

1 file changed

+18
-17
lines changed

1 file changed

+18
-17
lines changed

lib/internal/streams/readable.js

+18-17
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ function ReadableState(options, stream, isDuplex) {
108108
// linked list can remove elements from the beginning faster than
109109
// array.shift().
110110
this.buffer = new BufferList();
111-
this.length = 0;
112111
this.pipes = [];
113112
this.flowing = null;
114113
this.ended = false;
@@ -191,6 +190,16 @@ function ReadableState(options, stream, isDuplex) {
191190
}
192191

193192

193+
ObjectDefineProperties(ReadableState.prototype, {
194+
length: {
195+
__proto__: null,
196+
get() {
197+
return this.buffer.length;
198+
},
199+
},
200+
});
201+
202+
194203
function Readable(options) {
195204
if (!(this instanceof Readable))
196205
return new Readable(options);
@@ -1108,7 +1117,7 @@ async function* createAsyncIterator(stream, options) {
11081117
let callback = nop;
11091118

11101119
function next(resolve) {
1111-
if (this === stream) {
1120+
if (this === stream || !resolve) {
11121121
callback();
11131122
callback = nop;
11141123
} else {
@@ -1155,23 +1164,15 @@ async function* createAsyncIterator(stream, options) {
11551164
}
11561165

11571166
function staticUnref(stream) {
1158-
const unrefStream = new Readable({
1159-
objectMode: stream.readableObjectMode ?? stream.objectMode ?? true,
1160-
// highWaterMark 0 as unref is basically a proxy, so don't consume more data
1161-
// as we would lose it when continue consuming from the original stream
1162-
highWaterMark: 0,
1163-
// TODO - what about other options?
1164-
destroy(err, callback) {
1165-
// Not destroying the stream here as we unref it.
1166-
callback(err);
1167-
},
1168-
}).wrap(stream);
1167+
const unrefStream = Object.create(stream, {
1168+
});
11691169

1170-
unrefStream.once('error', (e) => {
1171-
if (e.name !== 'AbortError') {
1172-
destroyImpl.destroyer(stream, e);
1170+
unrefStream._destroy = (err) => {
1171+
if (err?.name !== 'AbortError') {
1172+
destroyImpl.destroyer(stream, err);
11731173
}
1174-
});
1174+
};
1175+
unrefStream._readableState = Object.create(stream._readableState);
11751176

11761177
return unrefStream;
11771178
}

0 commit comments

Comments
 (0)