Skip to content

Commit 57aefa7

Browse files
stainless-app[bot]stainless-bot
authored andcommitted
chore(internal): minor restructuring (#664)
1 parent 0c46639 commit 57aefa7

File tree

2 files changed

+35
-35
lines changed

2 files changed

+35
-35
lines changed

src/internal/stream-utils.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Most browsers don't yet have async iterable support for ReadableStream,
3+
* and Node has a very different way of reading bytes from its "ReadableStream".
4+
*
5+
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
6+
*/
7+
export function ReadableStreamToAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
8+
if (stream[Symbol.asyncIterator]) return stream;
9+
10+
const reader = stream.getReader();
11+
return {
12+
async next() {
13+
try {
14+
const result = await reader.read();
15+
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
16+
return result;
17+
} catch (e) {
18+
reader.releaseLock(); // release lock when stream becomes errored
19+
throw e;
20+
}
21+
},
22+
async return() {
23+
const cancelPromise = reader.cancel();
24+
reader.releaseLock();
25+
await cancelPromise;
26+
return { done: true, value: undefined };
27+
},
28+
[Symbol.asyncIterator]() {
29+
return this;
30+
},
31+
};
32+
}

src/streaming.ts

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ReadableStream, type Response } from './_shims/index';
22
import { AnthropicError } from './error';
33
import { LineDecoder } from './internal/decoders/line';
4+
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';
45

56
import { createResponseHeaders } from './core';
67
import { APIError } from './error';
@@ -98,7 +99,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
9899
async function* iterLines(): AsyncGenerator<string, void, unknown> {
99100
const lineDecoder = new LineDecoder();
100101

101-
const iter = readableStreamAsyncIterable<Bytes>(readableStream);
102+
const iter = ReadableStreamToAsyncIterable<Bytes>(readableStream);
102103
for await (const chunk of iter) {
103104
for (const line of lineDecoder.decode(chunk)) {
104105
yield line;
@@ -212,7 +213,7 @@ export async function* _iterSSEMessages(
212213
const sseDecoder = new SSEDecoder();
213214
const lineDecoder = new LineDecoder();
214215

215-
const iter = readableStreamAsyncIterable<Bytes>(response.body);
216+
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
216217
for await (const sseChunk of iterSSEChunks(iter)) {
217218
for (const line of lineDecoder.decode(sseChunk)) {
218219
const sse = sseDecoder.decode(line);
@@ -365,36 +366,3 @@ function partition(str: string, delimiter: string): [string, string, string] {
365366

366367
return [str, '', ''];
367368
}
368-
369-
/**
370-
* Most browsers don't yet have async iterable support for ReadableStream,
371-
* and Node has a very different way of reading bytes from its "ReadableStream".
372-
*
373-
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
374-
*/
375-
export function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
376-
if (stream[Symbol.asyncIterator]) return stream;
377-
378-
const reader = stream.getReader();
379-
return {
380-
async next() {
381-
try {
382-
const result = await reader.read();
383-
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
384-
return result;
385-
} catch (e) {
386-
reader.releaseLock(); // release lock when stream becomes errored
387-
throw e;
388-
}
389-
},
390-
async return() {
391-
const cancelPromise = reader.cancel();
392-
reader.releaseLock();
393-
await cancelPromise;
394-
return { done: true, value: undefined };
395-
},
396-
[Symbol.asyncIterator]() {
397-
return this;
398-
},
399-
};
400-
}

0 commit comments

Comments
 (0)