Skip to content

Commit ef81591

Browse files
chore(internal): move LineDecoder to a separate file (#541)
1 parent d864a5f commit ef81591

File tree

2 files changed

+115
-111
lines changed

2 files changed

+115
-111
lines changed

src/internal/decoders/line.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { AnthropicError } from '../../error';
2+
3+
type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;
4+
5+
/**
6+
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
7+
* reading lines from text.
8+
*
9+
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
10+
*/
11+
export class LineDecoder {
12+
// prettier-ignore
13+
static NEWLINE_CHARS = new Set(['\n', '\r']);
14+
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;
15+
16+
buffer: string[];
17+
trailingCR: boolean;
18+
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.
19+
20+
constructor() {
21+
this.buffer = [];
22+
this.trailingCR = false;
23+
}
24+
25+
decode(chunk: Bytes): string[] {
26+
let text = this.decodeText(chunk);
27+
28+
if (this.trailingCR) {
29+
text = '\r' + text;
30+
this.trailingCR = false;
31+
}
32+
if (text.endsWith('\r')) {
33+
this.trailingCR = true;
34+
text = text.slice(0, -1);
35+
}
36+
37+
if (!text) {
38+
return [];
39+
}
40+
41+
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
42+
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
43+
44+
// if there is a trailing new line then the last entry will be an empty
45+
// string which we don't care about
46+
if (trailingNewline) {
47+
lines.pop();
48+
}
49+
50+
if (lines.length === 1 && !trailingNewline) {
51+
this.buffer.push(lines[0]!);
52+
return [];
53+
}
54+
55+
if (this.buffer.length > 0) {
56+
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
57+
this.buffer = [];
58+
}
59+
60+
if (!trailingNewline) {
61+
this.buffer = [lines.pop() || ''];
62+
}
63+
64+
return lines;
65+
}
66+
67+
decodeText(bytes: Bytes): string {
68+
if (bytes == null) return '';
69+
if (typeof bytes === 'string') return bytes;
70+
71+
// Node:
72+
if (typeof Buffer !== 'undefined') {
73+
if (bytes instanceof Buffer) {
74+
return bytes.toString();
75+
}
76+
if (bytes instanceof Uint8Array) {
77+
return Buffer.from(bytes).toString();
78+
}
79+
80+
throw new AnthropicError(
81+
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`,
82+
);
83+
}
84+
85+
// Browser
86+
if (typeof TextDecoder !== 'undefined') {
87+
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
88+
this.textDecoder ??= new TextDecoder('utf8');
89+
return this.textDecoder.decode(bytes);
90+
}
91+
92+
throw new AnthropicError(
93+
`Unexpected: received non-Uint8Array/ArrayBuffer (${
94+
(bytes as any).constructor.name
95+
}) in a web platform. Please report this error.`,
96+
);
97+
}
98+
99+
throw new AnthropicError(
100+
`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`,
101+
);
102+
}
103+
104+
flush(): string[] {
105+
if (!this.buffer.length && !this.trailingCR) {
106+
return [];
107+
}
108+
109+
const lines = [this.buffer.join('')];
110+
this.buffer = [];
111+
this.trailingCR = false;
112+
return lines;
113+
}
114+
}

src/streaming.ts

Lines changed: 1 addition & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ReadableStream, type Response } from './_shims/index';
22
import { AnthropicError } from './error';
3+
import { LineDecoder } from './internal/decoders/line';
34

45
import { createResponseHeaders } from '@anthropic-ai/sdk/core';
56
import { APIError } from '@anthropic-ai/sdk/error';
@@ -345,117 +346,6 @@ class SSEDecoder {
345346
}
346347
}
347348

348-
/**
349-
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
350-
* reading lines from text.
351-
*
352-
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
353-
*/
354-
class LineDecoder {
355-
// prettier-ignore
356-
static NEWLINE_CHARS = new Set(['\n', '\r']);
357-
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;
358-
359-
buffer: string[];
360-
trailingCR: boolean;
361-
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.
362-
363-
constructor() {
364-
this.buffer = [];
365-
this.trailingCR = false;
366-
}
367-
368-
decode(chunk: Bytes): string[] {
369-
let text = this.decodeText(chunk);
370-
371-
if (this.trailingCR) {
372-
text = '\r' + text;
373-
this.trailingCR = false;
374-
}
375-
if (text.endsWith('\r')) {
376-
this.trailingCR = true;
377-
text = text.slice(0, -1);
378-
}
379-
380-
if (!text) {
381-
return [];
382-
}
383-
384-
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
385-
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
386-
387-
// if there is a trailing new line then the last entry will be an empty
388-
// string which we don't care about
389-
if (trailingNewline) {
390-
lines.pop();
391-
}
392-
393-
if (lines.length === 1 && !trailingNewline) {
394-
this.buffer.push(lines[0]!);
395-
return [];
396-
}
397-
398-
if (this.buffer.length > 0) {
399-
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
400-
this.buffer = [];
401-
}
402-
403-
if (!trailingNewline) {
404-
this.buffer = [lines.pop() || ''];
405-
}
406-
407-
return lines;
408-
}
409-
410-
decodeText(bytes: Bytes): string {
411-
if (bytes == null) return '';
412-
if (typeof bytes === 'string') return bytes;
413-
414-
// Node:
415-
if (typeof Buffer !== 'undefined') {
416-
if (bytes instanceof Buffer) {
417-
return bytes.toString();
418-
}
419-
if (bytes instanceof Uint8Array) {
420-
return Buffer.from(bytes).toString();
421-
}
422-
423-
throw new AnthropicError(
424-
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`,
425-
);
426-
}
427-
428-
// Browser
429-
if (typeof TextDecoder !== 'undefined') {
430-
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
431-
this.textDecoder ??= new TextDecoder('utf8');
432-
return this.textDecoder.decode(bytes);
433-
}
434-
435-
throw new AnthropicError(
436-
`Unexpected: received non-Uint8Array/ArrayBuffer (${
437-
(bytes as any).constructor.name
438-
}) in a web platform. Please report this error.`,
439-
);
440-
}
441-
442-
throw new AnthropicError(
443-
`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`,
444-
);
445-
}
446-
447-
flush(): string[] {
448-
if (!this.buffer.length && !this.trailingCR) {
449-
return [];
450-
}
451-
452-
const lines = [this.buffer.join('')];
453-
this.buffer = [];
454-
this.trailingCR = false;
455-
return lines;
456-
}
457-
}
458-
459349
/** This is an internal helper function that's just used for testing */
460350
export function _decodeChunks(chunks: string[]): string[] {
461351
const decoder = new LineDecoder();

0 commit comments

Comments
 (0)