Skip to content

Commit 59f941e

Browse files
committed
stream: use Array for Readable buffer
PR-URL: #50341
1 parent 25576b5 commit 59f941e

File tree

4 files changed

+114
-132
lines changed

4 files changed

+114
-132
lines changed

benchmark/streams/readable-bigread.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function main({ n }) {
1515

1616
bench.start();
1717
for (let k = 0; k < n; ++k) {
18-
for (let i = 0; i < 1e4; ++i)
18+
for (let i = 0; i < 1e3; ++i)
1919
s.push(b);
2020
while (s.read(128));
2121
}

benchmark/streams/readable-readall.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ function main({ n }) {
1515

1616
bench.start();
1717
for (let k = 0; k < n; ++k) {
18-
for (let i = 0; i < 1e4; ++i)
18+
for (let i = 0; i < 1e3; ++i)
1919
s.push(b);
2020
while (s.read());
2121
}

lib/internal/streams/readable.js

+112-29
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ const {
3131
ObjectSetPrototypeOf,
3232
Promise,
3333
SafeSet,
34+
Symbol,
3435
SymbolAsyncDispose,
3536
SymbolAsyncIterator,
36-
Symbol,
37+
SymbolSpecies,
38+
TypedArrayPrototypeSet,
3739
} = primordials;
3840

3941
module.exports = Readable;
@@ -51,7 +53,6 @@ const eos = require('internal/streams/end-of-stream');
5153
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
5254
debug = fn;
5355
});
54-
const BufferList = require('internal/streams/buffer_list');
5556
const destroyImpl = require('internal/streams/destroy');
5657
const {
5758
getHighWaterMark,
@@ -73,6 +74,7 @@ const {
7374
const { validateObject } = require('internal/validators');
7475

7576
const kState = Symbol('kState');
77+
const FastBuffer = Buffer[SymbolSpecies];
7678

7779
const { StringDecoder } = require('string_decoder');
7880
const from = require('internal/streams/from');
@@ -275,10 +277,8 @@ function ReadableState(options, stream, isDuplex) {
275277
getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
276278
getDefaultHighWaterMark(false);
277279

278-
// A linked list is used to store data chunks instead of an array because the
279-
// linked list can remove elements from the beginning faster than
280-
// array.shift().
281-
this.buffer = new BufferList();
280+
this.buffer = [];
281+
this.bufferIndex = 0;
282282
this.length = 0;
283283
this.pipes = [];
284284

@@ -546,10 +546,15 @@ function addChunk(stream, state, chunk, addToFront) {
546546
} else {
547547
// Update the buffer info.
548548
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
549-
if (addToFront)
550-
state.buffer.unshift(chunk);
551-
else
549+
if (addToFront) {
550+
if (state.bufferIndex > 0) {
551+
state.buffer[--state.bufferIndex] = chunk;
552+
} else {
553+
state.buffer.unshift(chunk); // Slow path
554+
}
555+
} else {
552556
state.buffer.push(chunk);
557+
}
553558

554559
if ((state[kState] & kNeedReadable) !== 0)
555560
emitReadable(stream);
@@ -564,21 +569,24 @@ Readable.prototype.isPaused = function() {
564569

565570
// Backwards compatibility.
566571
Readable.prototype.setEncoding = function(enc) {
572+
const state = this._readableState;
573+
567574
const decoder = new StringDecoder(enc);
568-
this._readableState.decoder = decoder;
575+
state.decoder = decoder;
569576
// If setEncoding(null), decoder.encoding equals utf8.
570-
this._readableState.encoding = this._readableState.decoder.encoding;
577+
state.encoding = state.decoder.encoding;
571578

572-
const buffer = this._readableState.buffer;
573579
// Iterate over current buffer to convert already stored Buffers:
574580
let content = '';
575-
for (const data of buffer) {
581+
for (const data of state.buffer.slice(state.bufferIndex)) {
576582
content += decoder.write(data);
577583
}
578-
buffer.clear();
584+
state.buffer.length = 0;
585+
state.bufferIndex = 0;
586+
579587
if (content !== '')
580-
buffer.push(content);
581-
this._readableState.length = content.length;
588+
state.buffer.push(content);
589+
state.length = content.length;
582590
return this;
583591
};
584592

@@ -611,7 +619,7 @@ function howMuchToRead(n, state) {
611619
if (NumberIsNaN(n)) {
612620
// Only flow one buffer at a time.
613621
if ((state[kState] & kFlowing) !== 0 && state.length)
614-
return state.buffer.first().length;
622+
return state.buffer[state.bufferIndex].length;
615623
return state.length;
616624
}
617625
if (n <= state.length)
@@ -1549,21 +1557,96 @@ function fromList(n, state) {
15491557
if (state.length === 0)
15501558
return null;
15511559

1560+
let idx = state.bufferIndex;
15521561
let ret;
1553-
if (state.objectMode)
1554-
ret = state.buffer.shift();
1555-
else if (!n || n >= state.length) {
1562+
1563+
const buf = state.buffer;
1564+
const len = buf.length;
1565+
1566+
if ((state[kState] & kObjectMode) !== 0) {
1567+
ret = buf[idx];
1568+
buf[idx++] = null;
1569+
} else if (!n || n >= state.length) {
15561570
// Read it all, truncate the list.
1557-
if (state.decoder)
1558-
ret = state.buffer.join('');
1559-
else if (state.buffer.length === 1)
1560-
ret = state.buffer.first();
1561-
else
1562-
ret = state.buffer.concat(state.length);
1563-
state.buffer.clear();
1571+
if ((state[kState] & kDecoder) !== 0) {
1572+
ret = '';
1573+
while (idx < len) {
1574+
ret += buf[idx];
1575+
buf[idx++] = null;
1576+
}
1577+
} else if (len - idx === 0) {
1578+
ret = Buffer.alloc(0);
1579+
} else if (len - idx === 1) {
1580+
ret = buf[idx];
1581+
buf[idx++] = null;
1582+
} else {
1583+
ret = Buffer.allocUnsafe(state.length);
1584+
1585+
let i = 0;
1586+
while (idx < len) {
1587+
TypedArrayPrototypeSet(ret, buf[idx], i);
1588+
i += buf[idx].length;
1589+
buf[idx++] = null;
1590+
}
1591+
}
1592+
} else if (n < buf[idx].length) {
1593+
// `slice` is the same for buffers and strings.
1594+
ret = buf[idx].slice(0, n);
1595+
buf[idx] = buf[idx].slice(n);
1596+
} else if (n === buf[idx].length) {
1597+
// First chunk is a perfect match.
1598+
ret = buf[idx];
1599+
buf[idx++] = null;
1600+
} else if ((state[kState] & kDecoder) !== 0) {
1601+
ret = '';
1602+
while (idx < len) {
1603+
const str = buf[idx];
1604+
if (n > str.length) {
1605+
ret += str;
1606+
n -= str.length;
1607+
buf[idx++] = null;
1608+
} else {
1609+
if (n === buf.length) {
1610+
ret += str;
1611+
buf[idx++] = null;
1612+
} else {
1613+
ret += str.slice(0, n);
1614+
buf[idx] = str.slice(n);
1615+
}
1616+
break;
1617+
}
1618+
}
1619+
} else {
1620+
ret = Buffer.allocUnsafe(n);
1621+
1622+
const retLen = n;
1623+
while (idx < len) {
1624+
const data = buf[idx];
1625+
if (n > data.length) {
1626+
TypedArrayPrototypeSet(ret, data, retLen - n);
1627+
n -= data.length;
1628+
buf[idx++] = null;
1629+
} else {
1630+
if (n === data.length) {
1631+
TypedArrayPrototypeSet(ret, data, retLen - n);
1632+
buf[idx++] = null;
1633+
} else {
1634+
TypedArrayPrototypeSet(ret, new FastBuffer(data.buffer, data.byteOffset, n), retLen - n);
1635+
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n);
1636+
}
1637+
break;
1638+
}
1639+
}
1640+
}
1641+
1642+
if (idx === buf.length) {
1643+
state.buffer.length = 0;
1644+
state.bufferIndex = 0;
1645+
} else if (idx > 1024) {
1646+
state.buffer.splice(0, idx);
1647+
state.bufferIndex = 0;
15641648
} else {
1565-
// read part of list.
1566-
ret = state.buffer.consume(n, state.decoder);
1649+
state.bufferIndex = idx;
15671650
}
15681651

15691652
return ret;

test/parallel/test-stream2-readable-from-list.js

-101
This file was deleted.

0 commit comments

Comments
 (0)