Skip to content

Commit 7d6dfa6

Browse files
committed
refactor: Moved files
1 parent 89f4f61 commit 7d6dfa6

File tree

6 files changed

+111
-105
lines changed

6 files changed

+111
-105
lines changed

src/Streaming.ts

Lines changed: 0 additions & 105 deletions
This file was deleted.

src/Streaming/Consts.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export const SSE_DATA_PREFIX = 'data: ';
2+
export const SSE_DONE_MSG = '[DONE]';

src/Streaming/SSEDecoder.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { Response as NodeResponse } from 'node-fetch';
2+
import { Readable } from 'stream';
3+
import { SSE_DATA_PREFIX } from './Consts';
4+
import { StreamingDecodeError } from '../errors';
5+
6+
7+
8+
export interface SSEDecoder {
9+
decode(line: string): string | null;
10+
iterLines(response: NodeResponse): AsyncIterableIterator<string>;
11+
}
12+
13+
export class DefaultSSEDecoder implements SSEDecoder {
14+
decode(line: string): string | null {
15+
if (!line) return null;
16+
17+
if (line.startsWith(SSE_DATA_PREFIX)) {
18+
return line.slice(SSE_DATA_PREFIX.length);
19+
}
20+
21+
throw new StreamingDecodeError(`Invalid SSE line: ${line}`);
22+
}
23+
24+
async *iterLines(response: NodeResponse): AsyncIterableIterator<string> {
25+
if (!response.body) {
26+
throw new Error('Response body is null');
27+
}
28+
29+
const webReadableStream = Readable.toWeb(response.body as any) as ReadableStream;
30+
const reader = webReadableStream.getReader();
31+
32+
let buffer = '';
33+
34+
try {
35+
while (true) {
36+
const { done, value } = await reader.read();
37+
38+
if (done) {
39+
if (buffer.length > 0) {
40+
const decoded = this.decode(buffer.trim());
41+
if (decoded) yield decoded;
42+
}
43+
break;
44+
}
45+
46+
buffer += new TextDecoder().decode(value);
47+
const lines = buffer.split('\n');
48+
buffer = lines.pop() || '';
49+
50+
for (const line of lines) {
51+
const decoded = this.decode(line.trim());
52+
if (decoded) yield decoded;
53+
}
54+
}
55+
} finally {
56+
reader.releaseLock();
57+
}
58+
}
59+
}

src/Streaming/Stream.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { Response as NodeResponse } from 'node-fetch';
2+
import { Readable } from 'stream';
3+
import { DefaultSSEDecoder } from './SSEDecoder';
4+
import { SSEDecoder } from './SSEDecoder';
5+
import { SSE_DONE_MSG } from './Consts';
6+
import { StreamingDecodeError } from '../errors';
7+
8+
9+
function getStreamMessage<T>(chunk: string): T {
10+
try {
11+
return JSON.parse(chunk);
12+
} catch (e) {
13+
throw new StreamingDecodeError(chunk);
14+
}
15+
}
16+
17+
export class Stream<T> implements AsyncIterableIterator<T> {
18+
private decoder: SSEDecoder;
19+
private iterator: AsyncIterableIterator<T>;
20+
21+
constructor(
22+
private response: NodeResponse,
23+
decoder?: SSEDecoder,
24+
) {
25+
this.decoder = decoder || new DefaultSSEDecoder();
26+
this.iterator = this.stream();
27+
}
28+
29+
private async *stream(): AsyncIterableIterator<T> {
30+
for await (const chunk of this.decoder.iterLines(this.response)) {
31+
if (chunk === SSE_DONE_MSG) break;
32+
yield getStreamMessage(chunk);
33+
}
34+
}
35+
async next(): Promise<IteratorResult<T>> {
36+
return this.iterator.next();
37+
}
38+
39+
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
40+
return this;
41+
}
42+
}

src/Streaming/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { Stream } from './Stream';

src/errors.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,10 @@ export class MissingAPIKeyError extends AI21Error {
55
super('API key is required');
66
}
77
}
8+
9+
export class StreamingDecodeError extends Error {
10+
constructor(chunk: string) {
11+
super(`Failed to decode chunk: ${chunk}`);
12+
}
13+
}
14+

0 commit comments

Comments
 (0)