Skip to content

Commit 59c24b0

Browse files
committed
stream: writable state bitmap
1 parent c19b2a7 commit 59c24b0

File tree

1 file changed

+96
-38
lines changed

1 file changed

+96
-38
lines changed

lib/internal/streams/writable.js

+96-38
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ ObjectSetPrototypeOf(Writable, Stream);
7373
function nop() {}
7474

7575
const kOnFinished = Symbol('kOnFinished');
76+
const kErrored = Symbol('kErrored');
77+
const kCorked = Symbol('kCorked');
7678

7779
const kObjectMode = 1 << 0;
7880
const kEnded = 1 << 1;
@@ -94,6 +96,12 @@ const kBufferProcessing = 1 << 16;
9496
const kPrefinished = 1 << 17;
9597
const kAllBuffers = 1 << 18;
9698
const kAllNoop = 1 << 19;
99+
const kHasOnFinished = 1 << 20;
100+
const kHasErrored = 1 << 21;
101+
102+
const kCorkedShift = 22;
103+
const kCorkedMask = 0b1111
104+
const kCorked = kCorkedMask << kCorkedShift; // 4 bits
97105

98106
// TODO(benjamingr) it is likely slower to do it this way than with free functions
99107
function makeBitMapDescriptor(bit) {
@@ -176,6 +184,42 @@ ObjectDefineProperties(WritableState.prototype, {
176184

177185
allBuffers: makeBitMapDescriptor(kAllBuffers),
178186
allNoop: makeBitMapDescriptor(kAllNoop),
187+
188+
// Indicates whether the stream has errored. When true all write() calls
189+
// should return false. This is needed since when autoDestroy
190+
// is disabled we need a way to tell whether the stream has failed.
191+
// This is/should be a cold path.
192+
errored: {
193+
enumerable: false,
194+
get() { return (this.state & kHasErrored) !== 0 ? this[kErrored] : null; },
195+
set(value) {
196+
if (value) {
197+
this[kErrored] = value;
198+
this.state |= kHasErrored;
199+
} else {
200+
this.state &= ~kHasErrored;
201+
}
202+
},
203+
},
204+
205+
// When true all writes will be buffered until .uncork() call.
206+
// This is/should be a cold path.
207+
corked: {
208+
enumerable: false,
209+
get() {
210+
const corked = (this.state & kCorked) >> kCorkedShift;
211+
return corked < kCorkedMask ? val : this[kCorked];
212+
},
213+
set(value) {
214+
if (value < kCorkedMask) {
215+
this.state &= ~kCorked;
216+
this.state |= value << kCorkedShift;
217+
} else {
218+
this.state |= kCorked
219+
this[kCorked] = value;
220+
}
221+
},
222+
}
179223
});
180224

181225
function WritableState(options, stream, isDuplex) {
@@ -226,9 +270,6 @@ function WritableState(options, stream, isDuplex) {
226270
// socket or file.
227271
this.length = 0;
228272

229-
// When true all writes will be buffered until .uncork() call.
230-
this.corked = 0;
231-
232273
// The callback that's passed to _write(chunk, cb).
233274
this.onwrite = onwrite.bind(undefined, stream);
234275

@@ -247,13 +288,6 @@ function WritableState(options, stream, isDuplex) {
247288
// Number of pending user-supplied write callbacks
248289
// this must be 0 before 'finish' can be emitted.
249290
this.pendingcb = 0;
250-
251-
// Indicates whether the stream has errored. When true all write() calls
252-
// should return false. This is needed since when autoDestroy
253-
// is disabled we need a way to tell whether the stream has failed.
254-
this.errored = null;
255-
256-
this[kOnFinished] = [];
257291
}
258292

259293
function resetBuffer(state) {
@@ -394,17 +428,34 @@ Writable.prototype.write = function(chunk, encoding, cb) {
394428
};
395429

396430
Writable.prototype.cork = function() {
397-
this._writableState.corked++;
431+
const state = this._writableState;
432+
433+
const corked = ((state & kCorked) >> kCorkedShift) + 1;
434+
if (corked < kCorkedMask) {
435+
this.state |= corked << kCorkedShift;
436+
} else {
437+
this.state |= kCorked;
438+
this[kCorked] = value;
439+
}
398440
};
399441

400442
Writable.prototype.uncork = function() {
401443
const state = this._writableState;
402444

403-
if (state.corked) {
404-
state.corked--;
445+
if ((state.state & kCorked) === 0) {
446+
return
447+
}
405448

406-
if ((state.state & kWriting) === 0)
407-
clearBuffer(this, state);
449+
const corked = ((state & kCorked) >> kCorkedShift) - 1;
450+
if (corked < kCorkedMask) {
451+
this.state |= corked << kCorkedShift;
452+
} else {
453+
this.state |= kCorked
454+
this[kCorked] = value;
455+
}
456+
457+
if ((state.state & kWriting) === 0) {
458+
clearBuffer(this, state);
408459
}
409460
};
410461

@@ -432,7 +483,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
432483
if (!ret)
433484
state.state |= kNeedDrain;
434485

435-
if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
486+
if ((state.state & kWriting) !== 0 || (state.state & (kHasErrored | kCorked)) || (state.state & kConstructed) === 0) {
436487
state.buffered.push({ chunk, encoding, callback });
437488
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
438489
state.state &= ~kAllBuffers;
@@ -450,7 +501,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
450501

451502
// Return false if errored or destroyed in order to break
452503
// any synchronous while(stream.write(data)) loops.
453-
return ret && !state.errored && (state.state & kDestroyed) === 0;
504+
return ret && (state.state & kHasErrored) === 0 && (state.state & kDestroyed) === 0;
454505
}
455506

456507
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@@ -498,7 +549,7 @@ function onwrite(stream, er) {
498549
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
499550
er.stack; // eslint-disable-line no-unused-expressions
500551

501-
if (!state.errored) {
552+
if ((state.state & kHasErrored) === 0) {
502553
state.errored = er;
503554
}
504555

@@ -573,18 +624,19 @@ function errorBuffer(state) {
573624
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
574625
}
575626

576-
const onfinishCallbacks = state[kOnFinished].splice(0);
577-
for (let i = 0; i < onfinishCallbacks.length; i++) {
578-
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
627+
if ((state.state & kHasOnFinished) !== 0) {
628+
const onfinishCallbacks = state[kOnFinished].splice(0);
629+
for (let i = 0; i < onfinishCallbacks.length; i++) {
630+
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
631+
}
579632
}
580633

581634
resetBuffer(state);
582635
}
583636

584637
// If there's something in the buffer waiting, then process it.
585638
function clearBuffer(stream, state) {
586-
if (state.corked ||
587-
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
639+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
588640
(state.state & kConstructed) === 0) {
589641
return;
590642
}
@@ -669,14 +721,14 @@ Writable.prototype.end = function(chunk, encoding, cb) {
669721
}
670722

671723
// .end() fully uncorks.
672-
if (state.corked) {
724+
if ((state.state & kCorked) !== 0) {
673725
state.corked = 1;
674726
this.uncork();
675727
}
676728

677729
if (err) {
678730
// Do nothing...
679-
} else if (!state.errored && (state.state & kEnding) === 0) {
731+
} else if ((state.state & kErrored) === 0 && (state.state & kEnding) === 0) {
680732
// This is forgiving in terms of unnecessary calls to end() and can hide
681733
// logic errors. However, usually such errors are harmless and causing a
682734
// hard error can be disproportionately destructive. It is not always
@@ -698,6 +750,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
698750
} else if ((state.state & kFinished) !== 0) {
699751
process.nextTick(cb, null);
700752
} else {
753+
state.state |= kHasOnFinished;
754+
state[kOnFinished] ??= [];
701755
state[kOnFinished].push(cb);
702756
}
703757
}
@@ -715,10 +769,10 @@ function needFinish(state) {
715769
kFinished |
716770
kWriting |
717771
kErrorEmitted |
718-
kCloseEmitted
772+
kCloseEmitted |
773+
kHasErrored
719774
)) === (kEnding | kConstructed) &&
720775
state.length === 0 &&
721-
!state.errored &&
722776
state.buffered.length === 0);
723777
}
724778

@@ -734,9 +788,11 @@ function callFinal(stream, state) {
734788

735789
state.pendingcb--;
736790
if (err) {
737-
const onfinishCallbacks = state[kOnFinished].splice(0);
738-
for (let i = 0; i < onfinishCallbacks.length; i++) {
739-
onfinishCallbacks[i](err);
791+
if ((state.state & kHasOnFinished) !== 0) {
792+
const onfinishCallbacks = state[kOnFinished].splice(0);
793+
for (let i = 0; i < onfinishCallbacks.length; i++) {
794+
onfinishCallbacks[i](err);
795+
}
740796
}
741797
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
742798
} else if (needFinish(state)) {
@@ -799,9 +855,11 @@ function finish(stream, state) {
799855
state.pendingcb--;
800856
state.state |= kFinished;
801857

802-
const onfinishCallbacks = state[kOnFinished].splice(0);
803-
for (let i = 0; i < onfinishCallbacks.length; i++) {
804-
onfinishCallbacks[i](null);
858+
if ((state.state & kHasOnFinished) !== 0) {
859+
const onfinishCallbacks = state[kOnFinished].splice(0);
860+
for (let i = 0; i < onfinishCallbacks.length; i++) {
861+
onfinishCallbacks[i](null);
862+
}
805863
}
806864

807865
stream.emit('finish');
@@ -853,8 +911,8 @@ ObjectDefineProperties(Writable.prototype, {
853911
// where the writable side was disabled upon construction.
854912
// Compat. The user might manually disable writable side through
855913
// deprecated setter.
856-
return !!w && w.writable !== false && !w.errored &&
857-
(w.state & (kEnding | kEnded | kDestroyed)) === 0;
914+
return !!w && w.writable !== false &&
915+
(w.state & (kEnding | kEnded | kDestroyed | kHasErrored)) === 0;
858916
},
859917
set(val) {
860918
// Backwards compatible.
@@ -928,7 +986,7 @@ ObjectDefineProperties(Writable.prototype, {
928986
__proto__: null,
929987
enumerable: false,
930988
get() {
931-
return this._writableState ? this._writableState.errored : null;
989+
return this._writableState && (this._writableState.state & kHasErrored) !== 0 ? this._writableState.errored : null;
932990
},
933991
},
934992

@@ -938,7 +996,7 @@ ObjectDefineProperties(Writable.prototype, {
938996
get: function() {
939997
return !!(
940998
this._writableState.writable !== false &&
941-
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
999+
(this._writableState.state & (kDestroyed | kHasErrored)) !== 0 &&
9421000
(this._writableState.state & kFinished) === 0
9431001
);
9441002
},
@@ -952,7 +1010,7 @@ Writable.prototype.destroy = function(err, cb) {
9521010
// Invoke pending callbacks.
9531011
if ((state.state & kDestroyed) === 0 &&
9541012
(state.bufferedIndex < state.buffered.length ||
955-
state[kOnFinished].length)) {
1013+
(((state.state & kHasOnFinished) !== 0) && state[kOnFinished].length))) {
9561014
process.nextTick(errorBuffer, state);
9571015
}
9581016

0 commit comments

Comments
 (0)