Skip to content

Commit 6df6490

Browse files
committed
feat: Decoder per implemtation
1 parent 6b92fcd commit 6df6490

File tree

8 files changed

+88
-46
lines changed

8 files changed

+88
-46
lines changed

src/APIClient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ export abstract class APIClient {
8585
};
8686

8787
return this.performRequest(options as FinalRequestOptions).then(
88-
(response) => handleAPIResponse<Rsp>(response) as Rsp,
88+
(response) => this.fetch.handleResponse<Rsp>(response) as Rsp,
8989
);
9090
}
9191

src/Streaming/SSEDecoder.ts

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import { SSE_DATA_PREFIX } from './Consts';
22
import { StreamingDecodeError } from '../errors';
3-
import { getReadableStream } from 'envFetch';
3+
import { UnifiedResponse } from 'types';
44

55
export interface SSEDecoder {
66
decode(line: string): string | null;
7-
iterLines(response: Response): AsyncIterableIterator<string>;
7+
iterLines(response: UnifiedResponse): AsyncIterableIterator<string>;
88
}
99

10-
export class DefaultSSEDecoder implements SSEDecoder {
10+
abstract class BaseSSEDecoder implements SSEDecoder {
1111
decode(line: string): string | null {
1212
if (!line) return null;
1313

@@ -18,38 +18,54 @@ export class DefaultSSEDecoder implements SSEDecoder {
1818
throw new StreamingDecodeError(`Invalid SSE line: ${line}`);
1919
}
2020

21-
async *iterLines(response: Response): AsyncIterableIterator<string> {
22-
if (!response.body) {
23-
throw new Error('Response body is null');
24-
}
25-
26-
const reader = await getReadableStream(response.body);
27-
let buffer = '';
28-
29-
30-
try {
31-
while (true) {
32-
const { done, value } = await reader.read();
21+
abstract iterLines(response: UnifiedResponse): AsyncIterableIterator<string>;
3322

34-
if (done) {
35-
if (buffer.length > 0) {
36-
const decoded = this.decode(buffer.trim());
23+
async *_iterLines(reader: ReadableStreamDefaultReader<Uint8Array>): AsyncIterableIterator<string> {
24+
25+
let buffer = '';
26+
27+
try {
28+
while (true) {
29+
const { done, value } = await reader.read();
30+
31+
if (done) {
32+
if (buffer.length > 0) {
33+
const decoded = this.decode(buffer.trim());
34+
if (decoded) yield decoded;
35+
}
36+
break;
37+
}
38+
39+
buffer += new TextDecoder().decode(value);
40+
const lines = buffer.split('\n');
41+
buffer = lines.pop() || '';
42+
43+
for (const line of lines) {
44+
const decoded = this.decode(line.trim());
3745
if (decoded) yield decoded;
3846
}
39-
break;
40-
}
41-
42-
buffer += new TextDecoder().decode(value);
43-
const lines = buffer.split('\n');
44-
buffer = lines.pop() || '';
45-
46-
for (const line of lines) {
47-
const decoded = this.decode(line.trim());
48-
if (decoded) yield decoded;
4947
}
48+
} finally {
49+
reader.releaseLock();
5050
}
51-
} finally {
52-
reader.releaseLock();
51+
}
52+
}
53+
54+
export class BrowserSSEDecoder extends BaseSSEDecoder {
55+
async *iterLines(response: UnifiedResponse): AsyncIterableIterator<string> {
56+
if (!response.body) {
57+
throw new Error('Response body is null');
5358
}
59+
60+
const body = response.body as ReadableStream<Uint8Array>;
61+
yield* this._iterLines(body.getReader());
5462
}
5563
}
64+
65+
export class NodeSSEDecoder extends BaseSSEDecoder {
66+
async *iterLines(response: UnifiedResponse): AsyncIterableIterator<string> {
67+
const readerStream = (await import("stream/web")).ReadableStream as any;
68+
const reader = readerStream.from(response.body).getReader();
69+
yield* this._iterLines(reader);
70+
}
71+
}

src/Streaming/Stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { DefaultSSEDecoder } from './SSEDecoder';
1+
import { BrowserSSEDecoder } from './SSEDecoder';
22
import { SSEDecoder } from './SSEDecoder';
33
import { SSE_DONE_MSG } from './Consts';
44
import { StreamingDecodeError } from '../errors';
@@ -20,7 +20,7 @@ export class Stream<T> implements AsyncIterableIterator<T> {
2020
private response: UnifiedResponse,
2121
decoder?: SSEDecoder,
2222
) {
23-
this.decoder = decoder || new DefaultSSEDecoder();
23+
this.decoder = decoder || new BrowserSSEDecoder();
2424
this.iterator = this.stream();
2525
}
2626

src/envFetch.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { BrowserFetch, Fetch, NodeFetch } from 'fetch';
33
export const isBrowser = typeof window !== "undefined" && typeof window.document !== "undefined";
44
export const isWebWorker =
55
typeof self === "object" &&
6-
// @ts-ignore
76
typeof self?.importScripts === "function" &&
87
(self.constructor?.name === "DedicatedWorkerGlobalScope" ||
98
self.constructor?.name === "ServiceWorkerGlobalScope" ||
@@ -12,16 +11,6 @@ export const isWebWorker =
1211

1312
export const isNode = typeof process !== "undefined" && Boolean(process.version) && Boolean(process.versions?.node);
1413

15-
export const getReadableStream = async (responseBody: ReadableStream<Uint8Array>): Promise<ReadableStreamDefaultReader<Uint8Array>> => {
16-
if (isBrowser || isWebWorker) {
17-
return responseBody.getReader();
18-
} else {
19-
// @ts-ignore - ReadableStream.from() is available in Node.js but TypeScript doesn't recognize it
20-
return (await import("stream/web")).ReadableStream.from(responseBody).getReader();
21-
}
22-
};
23-
24-
2514
export function createFetchInstance(): Fetch {
2615
if (isBrowser || isWebWorker) {
2716
return new BrowserFetch();

src/fetch/BaseFetch.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,29 @@
1+
import { AI21Error } from "errors";
2+
import { Stream } from "Streaming";
13
import { FinalRequestOptions, UnifiedResponse } from "types";
4+
import { APIResponseProps, UnifiedReadableStream } from "types/API";
25

36

7+
export type APIResponse<T> = {
8+
data?: T;
9+
response: UnifiedResponse;
10+
};
411
export abstract class Fetch {
512
abstract call(url: string, options: FinalRequestOptions): Promise<UnifiedResponse>;
13+
async handleResponse<T>({
14+
response,
15+
options
16+
}: APIResponseProps) {
17+
if (options.stream) {
18+
if (!response.body) {
19+
throw new AI21Error('Response body is null');
20+
}
21+
22+
return this.handleStream<T>(response);
23+
}
24+
25+
const contentType = response.headers.get('content-type');
26+
return contentType?.includes('application/json') ? await response.json() : null;
27+
}
28+
abstract handleStream<T>(response: UnifiedResponse): Stream<T>;
629
}

src/fetch/BrowserFetch.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { FinalRequestOptions, UnifiedResponse } from 'types';
22
import { Fetch } from './BaseFetch';
3+
import { Stream } from 'Streaming';
4+
import { BrowserSSEDecoder } from 'Streaming/SSEDecoder';
35

46
export class BrowserFetch extends Fetch {
57
call(url: string, options: FinalRequestOptions): Promise<UnifiedResponse> {
@@ -12,4 +14,8 @@ export class BrowserFetch extends Fetch {
1214
signal: controller.signal,
1315
});
1416
}
17+
18+
handleStream<T>(response: UnifiedResponse): Stream<T> {
19+
return new Stream<T>(response as Response, new BrowserSSEDecoder());
20+
}
1521
}

src/fetch/NodeFetch.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { FinalRequestOptions, UnifiedResponse } from "types";
22
import { Fetch } from "./BaseFetch";
3+
import { NodeSSEDecoder } from "Streaming/SSEDecoder";
4+
import { Stream } from "Streaming";
35

46
export class NodeFetch extends Fetch {
57
async call(url: string, options: FinalRequestOptions): Promise<UnifiedResponse> {
@@ -12,4 +14,9 @@ export class NodeFetch extends Fetch {
1214
body: options?.body ? JSON.stringify(options.body) : undefined,
1315
});
1416
}
15-
}
17+
18+
handleStream<T>(response: UnifiedResponse): Stream<T> {
19+
type NodeRespose = import('node-fetch').Response
20+
return new Stream<T>(response as NodeRespose, new NodeSSEDecoder());
21+
}
22+
}

src/types/API.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ export type FinalRequestOptions = RequestOptions & {
2727

2828
export type DefaultQuery = Record<string, unknown>;
2929
export type Headers = Record<string, string | null | undefined>;
30-
export type UnifiedResponse = Response | import('node-fetch').Response;
30+
export type UnifiedResponse = Response | import('node-fetch').Response;
31+
export type UnifiedReadableStream = ReadableStream<Uint8Array> | import('stream/web').ReadableStream;

0 commit comments

Comments
 (0)