Skip to content

Commit e088d11

Browse files
committed
stream: implement ReadableStream.from
Fixes: nodejs#48389
1 parent 8a725c7 commit e088d11

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed

lib/internal/webstreams/readablestream.js

+76
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const {
66
ArrayBufferPrototypeSlice,
77
ArrayPrototypePush,
88
ArrayPrototypeShift,
9+
Boolean,
910
DataView,
1011
FunctionPrototypeBind,
1112
FunctionPrototypeCall,
@@ -110,6 +111,8 @@ const {
110111
nonOpCancel,
111112
nonOpPull,
112113
nonOpStart,
114+
getIterator,
115+
iteratorNext,
113116
kType,
114117
kState,
115118
} = require('internal/webstreams/util');
@@ -312,6 +315,10 @@ class ReadableStream {
312315
return isReadableStreamLocked(this);
313316
}
314317

318+
static from(iterable) {
319+
return readableStreamFromIterable(iterable);
320+
}
321+
315322
/**
316323
* @param {any} [reason]
317324
* @returns { Promise<void> }
@@ -1248,6 +1255,75 @@ const isReadableStreamBYOBReader =
12481255

12491256
// ---- ReadableStream Implementation
12501257

1258+
function readableStreamFromIterable(iterable) {
1259+
let stream;
1260+
const iteratorRecord = getIterator(iterable, 'async');
1261+
1262+
const startAlgorithm = nonOpStart;
1263+
1264+
function pullAlgorithm() {
1265+
let nextResult;
1266+
try {
1267+
nextResult = iteratorNext(iteratorRecord);
1268+
} catch (error) {
1269+
return PromiseReject(error);
1270+
}
1271+
const nextPromise = PromiseResolve(nextResult);
1272+
return PromisePrototypeThen(nextPromise, (iterResult) => {
1273+
if (typeof iterResult !== 'object' || iterResult === null) {
1274+
throw new ERR_INVALID_STATE.TypeError(
1275+
'The promise returned by the iterator.next() method must fulfill with an object');
1276+
}
1277+
const done = Boolean(iterResult.done);
1278+
if (done) {
1279+
readableStreamDefaultControllerClose(stream[kState].controller);
1280+
} else {
1281+
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
1282+
}
1283+
});
1284+
}
1285+
1286+
function cancelAlgorithm(reason) {
1287+
const iterator = iteratorRecord.iterator;
1288+
let returnMethod;
1289+
try {
1290+
returnMethod = iterator.return;
1291+
} catch (error) {
1292+
return PromiseReject(error);
1293+
}
1294+
if (returnMethod === undefined) {
1295+
return PromiseResolve();
1296+
}
1297+
let returnResult;
1298+
try {
1299+
returnResult = FunctionPrototypeCall(returnMethod, iterator, reason);
1300+
} catch (error) {
1301+
return PromiseReject(error);
1302+
}
1303+
const returnPromise = PromiseResolve(returnResult);
1304+
return PromisePrototypeThen(returnPromise, (iterResult) => {
1305+
if (typeof iterResult !== 'object' || iterResult === null) {
1306+
throw new ERR_INVALID_STATE.TypeError(
1307+
'The promise returned by the iterator.return() method must fulfill with an object');
1308+
}
1309+
return undefined;
1310+
});
1311+
}
1312+
1313+
stream = new ReadableStream({
1314+
start: startAlgorithm,
1315+
pull: pullAlgorithm,
1316+
cancel: cancelAlgorithm,
1317+
}, {
1318+
size() {
1319+
return 1;
1320+
},
1321+
highWaterMark: 0,
1322+
});
1323+
1324+
return stream;
1325+
}
1326+
12511327
function readableStreamPipeTo(
12521328
source,
12531329
dest,

lib/internal/webstreams/util.js

+53
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ const {
1313
PromiseReject,
1414
ReflectGet,
1515
Symbol,
16+
SymbolAsyncIterator,
17+
SymbolIterator,
1618
Uint8Array,
1719
} = primordials;
1820

1921
const {
2022
codes: {
2123
ERR_INVALID_ARG_VALUE,
2224
ERR_OPERATION_FAILED,
25+
ERR_INVALID_STATE,
2326
},
2427
} = require('internal/errors');
2528

@@ -217,6 +220,54 @@ function lazyTransfer() {
217220
return transfer;
218221
}
219222

223+
function createAsyncFromSyncIterator(syncIteratorRecord) {
224+
const syncIterable = {
225+
[SymbolIterator]: () => syncIteratorRecord.iterator,
226+
};
227+
228+
const asyncIterator = (async function* () {
229+
return yield* syncIterable;
230+
}());
231+
232+
const nextMethod = asyncIterator.next;
233+
return { iterator: asyncIterator, nextMethod, done: false };
234+
}
235+
236+
function getIterator(obj, kind = 'sync', method) {
237+
if (method === undefined) {
238+
if (kind === 'async') {
239+
method = obj[SymbolAsyncIterator];
240+
if (method === undefined) {
241+
const syncMethod = obj[SymbolIterator];
242+
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
243+
return createAsyncFromSyncIterator(syncIteratorRecord);
244+
}
245+
} else {
246+
method = obj[SymbolIterator];
247+
}
248+
}
249+
250+
const iterator = FunctionPrototypeCall(method, obj);
251+
if (typeof iterator !== 'object' || iterator === null) {
252+
throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object');
253+
}
254+
const nextMethod = iterator.next;
255+
return { iterator, nextMethod, done: false };
256+
}
257+
258+
function iteratorNext(iteratorRecord, value) {
259+
let result;
260+
if (value === undefined) {
261+
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
262+
} else {
263+
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
264+
}
265+
if (typeof result !== 'object' || result === null) {
266+
throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object');
267+
}
268+
return result;
269+
}
270+
220271
module.exports = {
221272
ArrayBufferViewGetBuffer,
222273
ArrayBufferViewGetByteLength,
@@ -243,6 +294,8 @@ module.exports = {
243294
nonOpPull,
244295
nonOpStart,
245296
nonOpWrite,
297+
getIterator,
298+
iteratorNext,
246299
kType,
247300
kState,
248301
};

0 commit comments

Comments
 (0)