Skip to content

Commit db9db98

Browse files
committed
fix: Streaming for NodeJS
1 parent f0d8c82 commit db9db98

File tree

2 files changed

+35
-55
lines changed

2 files changed

+35
-55
lines changed

examples/studio/chat/stream-chat-completions.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ async function main() {
1515
process.stdout.write(chunk.choices[0].delta.content);
1616
}
1717
}
18+
19+
process.stdout.write('\n');
20+
21+
// Explicitly exit after stream completion
22+
process.exit(0);
1823
} catch (error) {
1924
console.error('Error:', error);
2025
throw error;

src/streaming/SSEDecoder.ts

Lines changed: 30 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,15 @@ abstract class BaseSSEDecoder implements SSEDecoder {
1818
throw new StreamingDecodeError(`Invalid SSE line: ${line}`);
1919
}
2020

21-
abstract iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string>;
21+
protected abstract getReader(response: CrossPlatformResponse): ReadableStreamDefaultReader<Uint8Array>;
22+
23+
async *iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string> {
24+
if (!response.body) {
25+
throw new Error('Response body is null');
26+
}
27+
28+
yield* this._iterLines(this.getReader(response));
29+
}
2230

2331
async *_iterLines(reader: ReadableStreamDefaultReader<Uint8Array>): AsyncIterableIterator<string> {
2432
let buffer = '';
@@ -51,66 +59,33 @@ abstract class BaseSSEDecoder implements SSEDecoder {
5159
}
5260

5361
export class BrowserSSEDecoder extends BaseSSEDecoder {
54-
async *iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string> {
55-
if (!response.body) {
56-
throw new Error('Response body is null');
57-
}
58-
59-
console.log('BrowserSSEDecoder iterLines', response);
60-
62+
protected getReader(response: CrossPlatformResponse): ReadableStreamDefaultReader<Uint8Array> {
6163
const body = response.body as ReadableStream<Uint8Array>;
62-
yield* this._iterLines(body.getReader());
64+
return body.getReader();
6365
}
6466
}
6567

6668
export class NodeSSEDecoder extends BaseSSEDecoder {
67-
async *iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string> {
68-
console.log('NodeSSEDecoder iterLines', response);
69-
70-
// const readerStream = (await import('stream/web')).ReadableStream as any;
71-
72-
try {
73-
// Try the newer stream/web API first
74-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
75-
const webStream = (await import('stream/web').catch(() => null)) as any;
76-
if (webStream?.ReadableStream) {
77-
const reader = webStream.ReadableStream.from(response.body).getReader();
78-
yield* this._iterLines(reader);
79-
return;
80-
}
81-
82-
// Fallback for older Node.js versions or environments without stream/web
83-
console.log('Falling back to old stream API');
84-
let buffer = '';
85-
if (!response.body) {
86-
throw new Error('Response body is null');
87-
}
88-
89-
console.log('Starting stream', response.body);
90-
for await (const chunk of response.body as NodeJS.ReadableStream) {
91-
const text = typeof chunk === 'string' ? chunk : chunk.toString('utf-8');
92-
buffer += text;
93-
94-
const lines = buffer.split('\n');
95-
buffer = lines.pop() || '';
96-
97-
for (const line of lines) {
98-
const trimmedLine = line.trim();
99-
const decoded = this.decode(trimmedLine);
100-
if (decoded) yield decoded;
69+
protected getReader(response: CrossPlatformResponse): ReadableStreamDefaultReader<Uint8Array> {
70+
const stream = response.body as NodeJS.ReadableStream;
71+
// Convert Node readable stream to Web readable stream
72+
const webStream = new ReadableStream({
73+
async start(controller) {
74+
try {
75+
for await (const chunk of stream) {
76+
const uint8Array = typeof chunk === 'string'
77+
? new TextEncoder().encode(chunk)
78+
: chunk instanceof Uint8Array
79+
? chunk
80+
: new Uint8Array(chunk);
81+
controller.enqueue(uint8Array);
82+
}
83+
controller.close();
84+
} catch (error) {
85+
controller.error(error);
10186
}
10287
}
103-
104-
// Handle any remaining data in the buffer
105-
if (buffer.length > 0) {
106-
const decoded = this.decode(buffer.trim());
107-
if (decoded) yield decoded;
108-
}
109-
} catch (error) {
110-
console.error('Error:', error);
111-
throw error;
112-
}
113-
114-
// const reader = readerStream.from(response.body).getReader();
88+
});
89+
return webStream.getReader();
11590
}
11691
}

0 commit comments

Comments
 (0)