Skip to content

Commit c38dfeb

Browse files
refactor: improve streaming implementation (#34)
1 parent ed7650c commit c38dfeb

File tree

1 file changed

+106
-105
lines changed

1 file changed

+106
-105
lines changed

src/streaming.ts

Lines changed: 106 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -13,61 +13,6 @@ type ServerSentEvent = {
1313
raw: string[];
1414
};
1515

16-
class SSEDecoder {
17-
private data: string[];
18-
private event: string | null;
19-
private chunks: string[];
20-
21-
constructor() {
22-
this.event = null;
23-
this.data = [];
24-
this.chunks = [];
25-
}
26-
27-
decode(line: string) {
28-
if (line.endsWith('\r')) {
29-
line = line.substring(0, line.length - 1);
30-
}
31-
32-
if (!line) {
33-
// empty line and we didn't previously encounter any messages
34-
if (!this.event && !this.data.length) return null;
35-
36-
const sse: ServerSentEvent = {
37-
event: this.event,
38-
data: this.data.join('\n'),
39-
raw: this.chunks,
40-
};
41-
42-
this.event = null;
43-
this.data = [];
44-
this.chunks = [];
45-
46-
return sse;
47-
}
48-
49-
this.chunks.push(line);
50-
51-
if (line.startsWith(':')) {
52-
return null;
53-
}
54-
55-
let [fieldname, _, value] = partition(line, ':');
56-
57-
if (value.startsWith(' ')) {
58-
value = value.substring(1);
59-
}
60-
61-
if (fieldname === 'event') {
62-
this.event = value;
63-
} else if (fieldname === 'data') {
64-
this.data.push(value);
65-
}
66-
67-
return null;
68-
}
69-
}
70-
7116
export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Item>> {
7217
/** @deprecated - please use the async iterator instead. We plan to add additional helper methods shortly. */
7318
response: Response;
@@ -93,9 +38,7 @@ export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Ite
9338

9439
const iter = readableStreamAsyncIterable<Bytes>(this.response.body);
9540
for await (const chunk of iter) {
96-
const text = decodeText(chunk);
97-
98-
for (const line of lineDecoder.decode(text)) {
41+
for (const line of lineDecoder.decode(chunk)) {
9942
const sse = this.decoder.decode(line);
10043
if (sse) yield sse;
10144
}
@@ -143,7 +86,60 @@ export class Stream<Item> implements AsyncIterable<Item>, APIResponse<Stream<Ite
14386
}
14487
}
14588

146-
const NEWLINE_CHARS = '\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029';
89+
class SSEDecoder {
90+
private data: string[];
91+
private event: string | null;
92+
private chunks: string[];
93+
94+
constructor() {
95+
this.event = null;
96+
this.data = [];
97+
this.chunks = [];
98+
}
99+
100+
decode(line: string) {
101+
if (line.endsWith('\r')) {
102+
line = line.substring(0, line.length - 1);
103+
}
104+
105+
if (!line) {
106+
// empty line and we didn't previously encounter any messages
107+
if (!this.event && !this.data.length) return null;
108+
109+
const sse: ServerSentEvent = {
110+
event: this.event,
111+
data: this.data.join('\n'),
112+
raw: this.chunks,
113+
};
114+
115+
this.event = null;
116+
this.data = [];
117+
this.chunks = [];
118+
119+
return sse;
120+
}
121+
122+
this.chunks.push(line);
123+
124+
if (line.startsWith(':')) {
125+
return null;
126+
}
127+
128+
let [fieldname, _, value] = partition(line, ':');
129+
130+
if (value.startsWith(' ')) {
131+
value = value.substring(1);
132+
}
133+
134+
if (fieldname === 'event') {
135+
this.event = value;
136+
} else if (fieldname === 'data') {
137+
this.data.push(value);
138+
}
139+
140+
return null;
141+
}
142+
}
147143

148144
/**
149145
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
@@ -152,15 +148,22 @@ const NEWLINE_CHARS = '\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029';
152148
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
153149
*/
154150
class LineDecoder {
151+
// prettier-ignore
152+
static NEWLINE_CHARS = new Set(['\n', '\r', '\x0b', '\x0c', '\x1c', '\x1d', '\x1e', '\x85', '\u2028', '\u2029']);
153+
static NEWLINE_REGEXP = /\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g;
154+
155155
buffer: string[];
156156
trailingCR: boolean;
157+
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.
157158

158159
constructor() {
159160
this.buffer = [];
160161
this.trailingCR = false;
161162
}
162163

163-
decode(text: string): string[] {
164+
decode(chunk: Bytes): string[] {
165+
let text = this.decodeText(chunk);
166+
164167
if (this.trailingCR) {
165168
text = '\r' + text;
166169
this.trailingCR = false;
@@ -174,10 +177,10 @@ class LineDecoder {
174177
return [];
175178
}
176179

177-
const trailing_newline = NEWLINE_CHARS.includes(text.slice(-1));
178-
let lines = text.split(/\r\n|[\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029]/g);
180+
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
181+
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
179182

180-
if (lines.length === 1 && !trailing_newline) {
183+
if (lines.length === 1 && !trailingNewline) {
181184
this.buffer.push(lines[0]!);
182185
return [];
183186
}
@@ -187,13 +190,50 @@ class LineDecoder {
187190
this.buffer = [];
188191
}
189192

190-
if (!trailing_newline) {
193+
if (!trailingNewline) {
191194
this.buffer = [lines.pop() || ''];
192195
}
193196

194197
return lines;
195198
}
196199

200+
decodeText(bytes: Bytes): string {
201+
if (bytes == null) return '';
202+
if (typeof bytes === 'string') return bytes;
203+
204+
// Node:
205+
if (typeof Buffer !== 'undefined') {
206+
if (bytes instanceof Buffer) {
207+
return bytes.toString();
208+
}
209+
if (bytes instanceof Uint8Array) {
210+
return Buffer.from(bytes).toString();
211+
}
212+
213+
throw new Error(
214+
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`,
215+
);
216+
}
217+
218+
// Browser
219+
if (typeof TextDecoder !== 'undefined') {
220+
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
221+
this.textDecoder ??= new TextDecoder('utf8');
222+
return this.textDecoder.decode(bytes);
223+
}
224+
225+
throw new Error(
226+
`Unexpected: received non-Uint8Array/ArrayBuffer (${
227+
(bytes as any).constructor.name
228+
}) in a web platform. Please report this error.`,
229+
);
230+
}
231+
232+
throw new Error(
233+
`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`,
234+
);
235+
}
236+
197237
flush(): string[] {
198238
if (!this.buffer.length && !this.trailingCR) {
199239
return [];
@@ -215,61 +255,22 @@ function partition(str: string, delimiter: string): [string, string, string] {
215255
return [str, '', ''];
216256
}
217257

218-
let _textDecoder;
219-
function decodeText(bytes: Bytes): string {
220-
if (bytes == null) return '';
221-
if (typeof bytes === 'string') return bytes;
222-
223-
// Node:
224-
if (typeof Buffer !== 'undefined') {
225-
if (bytes instanceof Buffer) {
226-
return bytes.toString();
227-
}
228-
if (bytes instanceof Uint8Array) {
229-
return Buffer.from(bytes).toString();
230-
}
231-
232-
throw new Error(`Unexpected: received non-Uint8Array (${bytes.constructor.name}) in Node.`);
233-
}
234-
235-
// Browser
236-
if (typeof TextDecoder !== 'undefined') {
237-
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
238-
_textDecoder ??= new TextDecoder('utf8');
239-
return _textDecoder.decode(bytes);
240-
}
241-
242-
throw new Error(
243-
`Unexpected: received non-Uint8Array/ArrayBuffer (${
244-
(bytes as any).constructor.name
245-
}) in a web platform.`,
246-
);
247-
}
248-
249-
throw new Error(`Unexpected: neither Buffer nor TextDecoder are available as globals.`);
250-
}
251-
252258
/**
253259
* Most browsers don't yet have async iterable support for ReadableStream,
254260
* and Node has a very different way of reading bytes from its "ReadableStream".
255261
*
256262
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1624185965
257-
*
258-
* We make extensive use of "any" here to avoid pulling in either "node" or "dom" types
259-
* to library users' type scopes.
260263
*/
261264
function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
262-
if (stream[Symbol.asyncIterator]) {
263-
return stream[Symbol.asyncIterator];
264-
}
265+
if (stream[Symbol.asyncIterator]) return stream[Symbol.asyncIterator];
265266

266267
const reader = stream.getReader();
267-
268268
return {
269269
next() {
270270
return reader.read();
271271
},
272272
async return() {
273+
reader.cancel();
273274
reader.releaseLock();
274275
return { done: true, value: undefined };
275276
},

0 commit comments

Comments
 (0)