Skip to content

Commit 843e74b

Browse files
committed
fix: Simplified iterlines
1 parent c8a4cc6 commit 843e74b

File tree

1 file changed

+44
-42
lines changed

1 file changed

+44
-42
lines changed

src/streaming/SSEDecoder.ts

Lines changed: 44 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,73 +18,75 @@ abstract class BaseSSEDecoder implements SSEDecoder {
1818
throw new StreamingDecodeError(`Invalid SSE line: ${line}`);
1919
}
2020

21-
protected abstract getReader(response: CrossPlatformResponse): ReadableStreamDefaultReader<Uint8Array>;
21+
abstract iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string>;
2222

23+
protected processLine(line: string): string {
24+
if (line.startsWith('data: ')) {
25+
return line.slice(6);
26+
}
27+
return line;
28+
}
29+
}
30+
31+
export class BrowserSSEDecoder extends BaseSSEDecoder {
2332
async *iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string> {
2433
if (!response.body) {
2534
throw new Error('Response body is null');
2635
}
2736

28-
yield* this._iterLines(this.getReader(response));
29-
}
30-
31-
async *_iterLines(reader: ReadableStreamDefaultReader<Uint8Array>): AsyncIterableIterator<string> {
37+
const reader = (response.body as ReadableStream<Uint8Array>).getReader();
38+
const decoder = new TextDecoder();
3239
let buffer = '';
3340

3441
try {
3542
while (true) {
3643
const { done, value } = await reader.read();
44+
if (done) break;
3745

38-
if (done) {
39-
if (buffer.length > 0) {
40-
const decoded = this.decode(buffer.trim());
41-
if (decoded) yield decoded;
42-
}
43-
break;
44-
}
45-
46-
buffer += new TextDecoder().decode(value);
47-
const lines = buffer.split('\n');
46+
buffer += decoder.decode(value, { stream: true });
47+
const lines = buffer.split(/\r\n|\n/);
4848
buffer = lines.pop() || '';
4949

5050
for (const line of lines) {
51-
const decoded = this.decode(line.trim());
52-
if (decoded) yield decoded;
51+
if (line.trim()) {
52+
yield this.processLine(line);
53+
}
5354
}
5455
}
56+
57+
if (buffer.trim()) {
58+
yield this.processLine(buffer);
59+
}
5560
} finally {
5661
reader.releaseLock();
5762
}
5863
}
5964
}
6065

61-
export class BrowserSSEDecoder extends BaseSSEDecoder {
62-
protected getReader(response: CrossPlatformResponse): ReadableStreamDefaultReader<Uint8Array> {
63-
const body = response.body as ReadableStream<Uint8Array>;
64-
return body.getReader();
65-
}
66-
}
67-
6866
export class NodeSSEDecoder extends BaseSSEDecoder {
69-
protected getReader(response: CrossPlatformResponse): ReadableStreamDefaultReader<Uint8Array> {
67+
async *iterLines(response: CrossPlatformResponse): AsyncIterableIterator<string> {
68+
if (!response.body) {
69+
throw new Error('Response body is null');
70+
}
71+
7072
const stream = response.body as NodeJS.ReadableStream;
71-
// Convert Node readable stream to Web readable stream
72-
const webStream = new ReadableStream({
73-
async start(controller) {
74-
try {
75-
for await (const chunk of stream) {
76-
const uint8Array =
77-
typeof chunk === 'string' ? new TextEncoder().encode(chunk)
78-
: chunk instanceof Uint8Array ? chunk
79-
: new Uint8Array(chunk);
80-
controller.enqueue(uint8Array);
81-
}
82-
controller.close();
83-
} catch (error) {
84-
controller.error(error);
73+
let buffer = '';
74+
75+
for await (const chunk of stream) {
76+
const text = typeof chunk === 'string' ? chunk : chunk.toString('utf-8');
77+
buffer += text;
78+
const lines = buffer.split(/\r\n|\n/);
79+
buffer = lines.pop() || '';
80+
81+
for (const line of lines) {
82+
if (line.trim()) {
83+
yield this.processLine(line);
8584
}
86-
},
87-
});
88-
return webStream.getReader();
85+
}
86+
}
87+
88+
if (buffer.trim()) {
89+
yield this.processLine(buffer);
90+
}
8991
}
9092
}

0 commit comments

Comments
 (0)