Skip to content

stream: deprecate readableFlowing setter #39644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions doc/api/deprecations.md
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,18 @@ These properties are now available within the standard `detail` property
of the `PerformanceEntry` object. The existing accessors have been
deprecated and should no longer be used.

### DEP0153: `readable.readableFlowing` setter
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/39644
description: Documentation-only deprecation.
-->

Type: Documentation-only

The [`readable.readableFlowing`][] setter should not be used.

[Legacy URL API]: url.md#url_legacy_url_api
[NIST SP 800-38D]: https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-38d.pdf
[RFC 6066]: https://tools.ietf.org/html/rfc6066#section-3
Expand Down Expand Up @@ -2853,6 +2865,7 @@ deprecated and should no longer be used.
[`process.env`]: process.md#process_process_env
[`process.mainModule`]: process.md#process_process_mainmodule
[`punycode`]: punycode.md
[`readable.readableFlowing`]: stream.md#stream_readable_readableflowing
[`request.abort()`]: http.md#http_request_abort
[`request.connection`]: http.md#http_request_connection
[`request.destroy()`]: http.md#http_request_destroy_error
Expand Down
4 changes: 4 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1305,13 +1305,17 @@ Becomes `true` when [`'end'`][] event is emitted.
##### `readable.readableFlowing`
<!-- YAML
added: v9.4.0
deprecated: REPLACEME
-->
> Stability: 0 - Deprecated

* {boolean}

This property reflects the current state of a `Readable` stream as described
in the [Three states][] section.

The setter is deprecated.

##### `readable.readableHighWaterMark`
<!-- YAML
added: v9.3.0
Expand Down
117 changes: 63 additions & 54 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ function socketErrorListener(err) {
}

// Ensure that no further data will come out of the socket
socket.removeListener('data', socketOnData);
socket.removeListener('readable', socketOnReadable);
socket.removeListener('end', socketOnEnd);
socket.destroy();
}
Expand All @@ -477,70 +477,79 @@ function socketOnEnd() {
socket.destroy();
}

function socketOnData(d) {
function socketOnReadable() {
const socket = this;
const req = this._httpMessage;
const parser = this.parser;

assert(parser && parser.socket === socket);

const ret = parser.execute(d);
if (ret instanceof Error) {
prepareError(ret, parser, d);
debug('parse error', ret);
freeParser(parser, req, socket);
socket.destroy();
req.socket._hadError = true;
req.emit('error', ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade (if status code 101) or CONNECT
const bytesParsed = ret;
const res = parser.incoming;
req.res = res;

socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('drain', ondrain);

if (req.timeoutCb) socket.removeListener('timeout', req.timeoutCb);
socket.removeListener('timeout', responseOnTimeout);
while (true) {
Copy link

@Mifrill Mifrill Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest avoiding this always-true condition in terms of memory leak issues and looking at enough complexity of the logic below, we probably cannot be sure of triggering the break in some cases.

const d = this.read();
if (d === null) {
break;
}

parser.finish();
freeParser(parser, req, socket);
const ret = parser.execute(d);
if (ret instanceof Error) {
prepareError(ret, parser, d);
debug('parse error', ret);
freeParser(parser, req, socket);
socket.destroy();
req.socket._hadError = true;
req.emit('error', ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade (if status code 101) or CONNECT
const bytesParsed = ret;
const res = parser.incoming;
req.res = res;

const bodyHead = TypedArrayPrototypeSlice(d, bytesParsed, d.length);
socket.removeListener('readable', socketOnReadable);
socket.removeListener('end', socketOnEnd);
socket.removeListener('drain', ondrain);

const eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
if (req.listenerCount(eventName) > 0) {
req.upgradeOrConnect = true;
if (req.timeoutCb) socket.removeListener('timeout', req.timeoutCb);
socket.removeListener('timeout', responseOnTimeout);

// detach the socket
socket.emit('agentRemove');
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
parser.finish();
freeParser(parser, req, socket);

socket._httpMessage = null;
socket.readableFlowing = null;
const bodyHead = TypedArrayPrototypeSlice(d, bytesParsed, d.length);

req.emit(eventName, res, socket, bodyHead);
req.destroyed = true;
req._closed = true;
req.emit('close');
} else {
// Requested Upgrade or used CONNECT method, but have no handler.
socket.destroy();
const eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
if (req.listenerCount(eventName) > 0) {
req.upgradeOrConnect = true;

// detach the socket
socket.emit('agentRemove');
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);

socket._httpMessage = null;

req.emit(eventName, res, socket, bodyHead);
req.destroyed = true;
req._closed = true;
req.emit('close');
} else {
// Requested Upgrade or used CONNECT method, but have no handler.
socket.destroy();
}

break;
} else if (parser.incoming && parser.incoming.complete &&
// When the status code is informational (100, 102-199),
// the server will send a final response after this client
// sends a request body, so we must not free the parser.
// 101 (Switching Protocols) and all other status codes
// should be processed normally.
!statusIsInformational(parser.incoming.statusCode)) {
socket.removeListener('readable', socketOnReadable);
socket.removeListener('end', socketOnEnd);
socket.removeListener('drain', ondrain);
freeParser(parser, req, socket);
break;
}
} else if (parser.incoming && parser.incoming.complete &&
// When the status code is informational (100, 102-199),
// the server will send a final response after this client
// sends a request body, so we must not free the parser.
// 101 (Switching Protocols) and all other status codes
// should be processed normally.
!statusIsInformational(parser.incoming.statusCode)) {
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('drain', ondrain);
freeParser(parser, req, socket);
}
}

Expand Down Expand Up @@ -643,7 +652,7 @@ function responseKeepAlive(req) {
}
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
socket.removeListener('data', socketOnData);
socket.removeListener('readable', socketOnReadable);
socket.removeListener('end', socketOnEnd);

// TODO(ronag): Between here and emitFreeNT the socket
Expand Down Expand Up @@ -741,7 +750,7 @@ function tickOnSocket(req, socket) {

parser.onIncoming = parserOnIncomingClient;
socket.on('error', socketErrorListener);
socket.on('data', socketOnData);
socket.on('readable', socketOnReadable);
socket.on('end', socketOnEnd);
socket.on('close', socketCloseListener);
socket.on('drain', ondrain);
Expand Down
1 change: 0 additions & 1 deletion lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ ChildProcess.prototype.spawn = function(options) {
stream.handle.reading = false;
stream.handle.readStop();
stream._stdio.pause();
stream._stdio.readableFlowing = false;
stream._stdio._readableState.reading = false;
stream._stdio[kIsUsedAsStdio] = true;
continue;
Expand Down
2 changes: 1 addition & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ function Socket(options) {
// Stop the handle from reading and pause the stream
this._handle.reading = false;
this._handle.readStop();
this.readableFlowing = false;
this.pause();
} else if (!options.manualStart) {
this.read(0);
}
Expand Down