Skip to content

stream: add diagnostics_channel event for completion #42822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ const {

const assert = require('internal/assert');

const dc = require('diagnostics_channel');
let streamDoneChannel;
function getStreamDoneChannel () {
return streamDoneChannel ||= dc.channel('stream.web.done');
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this lazy-loading, I had tried to just create the channel immediately. However, this resulted in the following error at build time:

$ make -j24
ninja -C out/Release 
ninja: Entering directory `out/Release'
[21/25] ACTION node: node_mksnapshot_9b7a2d2290b02e76d66661df74749f56
FAILED: gen/node_snapshot.cc 
cd ../../; out/Release/node_mksnapshot out/Release/gen/node_snapshot.cc
global handle not serialized: 0x2c1d47302cd9: [JS_API_OBJECT_TYPE] in OldSpace
 - map: 0x08188aa5fd61 <Map(HOLEY_ELEMENTS)> [FastProperties]
 - prototype: 0x12df1ab07b79 <Object map = 0x8188aa5fd19>
 - elements: 0x20e18df01329 <FixedArray[0]> [HOLEY_ELEMENTS]
 - embedder fields: 1
 - properties: 0x20e18df01329 <FixedArray[0]>
 - All own properties (excluding elements): {}
 - embedder fields = {
    21965, aligned pointer: 0x55cde06a73c0
 }

global handle not serialized: 0x2c1d47302b49: [JS_OBJECT_TYPE] in OldSpace
 - map: 0x0f4b222452b9 <Map(HOLEY_ELEMENTS)> [FastProperties]
 - prototype: 0x2c1d473027a9 <Channel map = 0xf4b22245229>
 - elements: 0x20e18df01329 <FixedArray[0]> [HOLEY_ELEMENTS]
 - properties: 0x20e18df01329 <FixedArray[0]>
 - All own properties (excluding elements): {
    0x218677ef8c49: [String] in OldSpace: #_subscribers: 0x20e18df015b9 <undefined> (const data field 0), location: in-object
    0x20e18df058f1: [String] in ReadOnlySpace: #name: 0x218677ede9d1 <String[15]: #stream.web.done> (const data field 1), location: in-object
 }



#
# Fatal error in , line 0
# Check failed: handle_checker.CheckGlobalAndEternalHandles().
#
#
#
#FailureMessage Object: 0x7ffefc883920
 1: 0x55cddae43c35  [out/Release/node_mksnapshot]
 2: 0x55cddbcc2926 V8_Fatal(char const*, ...) [out/Release/node_mksnapshot]
 3: 0x55cddb396b05 v8::SnapshotCreator::CreateBlob(v8::SnapshotCreator::FunctionCodeHandling) [out/Release/node_mksnapshot]
 4: 0x55cddae6f680 node::SnapshotBuilder::Generate(node::SnapshotData*, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) [out/Release/node_mksnapshot]
 5: 0x55cddae707f4 node::SnapshotBuilder::Generate(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) [out/Release/node_mksnapshot]
 6: 0x55cddad822f2 BuildSnapshot(int, char**) [out/Release/node_mksnapshot]
 7: 0x7f4802c9dfd0  [/lib/x86_64-linux-gnu/libc.so.6]
 8: 0x7f4802c9e07d __libc_start_main [/lib/x86_64-linux-gnu/libc.so.6]
 9: 0x55cdda8c9e05 _start [out/Release/node_mksnapshot]
Trace/breakpoint trap (core dumped)
ninja: build stopped: subcommand failed.
make: *** [Makefile:127: node] Error 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


const kCancel = Symbol('kCancel');
const kClose = Symbol('kClose');
const kChunk = Symbol('kChunk');
Expand Down Expand Up @@ -424,6 +430,9 @@ class ReadableStream {
preventCancel = false,
} = options;

const stream = this;
const channel = getStreamDoneChannel();

const reader = new ReadableStreamDefaultReader(this);
let done = false;
let started = false;
Expand Down Expand Up @@ -455,6 +464,9 @@ class ReadableStream {
current = undefined;
done = true;
readableStreamReaderGenericRelease(reader);
if (channel.hasSubscribers) {
channel.publish({ stream });
}
promise.resolve({ done: true, value: undefined });
},
[kError](error) {
Expand All @@ -468,23 +480,25 @@ class ReadableStream {
}

async function returnSteps(value) {
if (done)
return { done: true, value };
done = true;
if (!done) {
done = true;

if (reader[kState].stream === undefined) {
throw new ERR_INVALID_STATE.TypeError(
'The reader is not bound to a ReadableStream');
if (reader[kState].stream === undefined) {
throw new ERR_INVALID_STATE.TypeError(
'The reader is not bound to a ReadableStream');
}
assert(!reader[kState].readRequests.length);
if (!preventCancel) {
const result = readableStreamReaderGenericCancel(reader, value);
readableStreamReaderGenericRelease(reader);
await result;
} else {
readableStreamReaderGenericRelease(reader);
}
}
assert(!reader[kState].readRequests.length);
if (!preventCancel) {
const result = readableStreamReaderGenericCancel(reader, value);
readableStreamReaderGenericRelease(reader);
await result;
return { done: true, value };
if (channel.hasSubscribers) {
channel.publish({ stream });
}

readableStreamReaderGenericRelease(reader);
return { done: true, value };
}

Expand Down Expand Up @@ -789,6 +803,12 @@ class ReadableStreamDefaultReader {
}
const readRequest = new DefaultReadRequest();
readableStreamDefaultReaderRead(this, readRequest);
if (getStreamDoneChannel().hasSubscribers) {
readRequest.promise.then(({ done }) => {
if (done)
getStreamDoneChannel().publish({ stream: this[kState].stream });
})
}
return readRequest.promise;
}

Expand Down Expand Up @@ -906,6 +926,12 @@ class ReadableStreamBYOBReader {
}
const readIntoRequest = new ReadIntoRequest();
readableStreamBYOBReaderRead(this, view, readIntoRequest);
if (getStreamDoneChannel().hasSubscribers) {
readIntoRequest.promise.then(({ done }) => {
if (done)
getStreamDoneChannel().publish({ stream: this[kState].stream });
})
}
return readIntoRequest.promise;
}

Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const expectedModules = new Set([
'Internal Binding v8',
'Internal Binding worker',
'NativeModule buffer',
'NativeModule diagnostics_channel',
'NativeModule events',
'NativeModule fs',
'NativeModule internal/abort_controller',
Expand Down
44 changes: 44 additions & 0 deletions test/parallel/test-whatwg-webstreams-dc-events.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Flags: --expose-internals
import * as common from '../common/index.mjs';
import assert from 'assert';

import util from 'internal/webstreams/util';

import { Readable } from 'stream';

import * as dc from 'diagnostics_channel';

{
const readable = Readable.toWeb(Readable.from([1]));

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

const reader = readable.getReader();
let result;

while (!result?.done) {
result = await reader.read();
}

channel.unsubscribe(subscriber);
}

{
const readable = Readable.toWeb(Readable.from([1]));

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

for await (const _ of readable) {}

channel.unsubscribe(subscriber);
}