Skip to content

Commit 3b6caa7

Browse files
stainless-botRobertCraigie
authored andcommitted
fix: correctly decode multi-byte characters over multiple chunks
1 parent 845cd5d commit 3b6caa7

File tree

3 files changed

+126
-40
lines changed

3 files changed

+126
-40
lines changed

src/internal/decoders/line.ts

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,56 +13,62 @@ export class LineDecoder {
1313
static NEWLINE_CHARS = new Set(['\n', '\r']);
1414
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;
1515

16-
buffer: string[];
17-
trailingCR: boolean;
16+
buffer: Uint8Array;
17+
#carriageReturnIndex: number | null;
1818
textDecoder:
1919
| undefined
2020
| {
2121
decode(buffer: Uint8Array | ArrayBuffer): string;
2222
};
2323

2424
constructor() {
25-
this.buffer = [];
26-
this.trailingCR = false;
25+
this.buffer = new Uint8Array();
26+
this.#carriageReturnIndex = null;
2727
}
2828

2929
decode(chunk: Bytes): string[] {
30-
let text = this.decodeText(chunk);
31-
32-
if (this.trailingCR) {
33-
text = '\r' + text;
34-
this.trailingCR = false;
35-
}
36-
if (text.endsWith('\r')) {
37-
this.trailingCR = true;
38-
text = text.slice(0, -1);
39-
}
40-
41-
if (!text) {
30+
if (chunk == null) {
4231
return [];
4332
}
4433

45-
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
46-
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
34+
const binaryChunk =
35+
chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
36+
: typeof chunk === 'string' ? new TextEncoder().encode(chunk)
37+
: chunk;
38+
39+
let newData = new Uint8Array(this.buffer.length + binaryChunk.length);
40+
newData.set(this.buffer);
41+
newData.set(binaryChunk, this.buffer.length);
42+
this.buffer = newData;
43+
44+
const lines: string[] = [];
45+
let patternIndex;
46+
while ((patternIndex = findNewlineIndex(this.buffer, this.#carriageReturnIndex)) != null) {
47+
if (patternIndex.carriage && this.#carriageReturnIndex == null) {
48+
// skip until we either get a corresponding `\n`, a new `\r` or nothing
49+
this.#carriageReturnIndex = patternIndex.index;
50+
continue;
51+
}
4752

48-
// if there is a trailing new line then the last entry will be an empty
49-
// string which we don't care about
50-
if (trailingNewline) {
51-
lines.pop();
52-
}
53+
// we got double \r or \rtext\n
54+
if (
55+
this.#carriageReturnIndex != null &&
56+
(patternIndex.index !== this.#carriageReturnIndex + 1 || patternIndex.carriage)
57+
) {
58+
lines.push(this.decodeText(this.buffer.slice(0, this.#carriageReturnIndex - 1)));
59+
this.buffer = this.buffer.slice(this.#carriageReturnIndex);
60+
this.#carriageReturnIndex = null;
61+
continue;
62+
}
5363

54-
if (lines.length === 1 && !trailingNewline) {
55-
this.buffer.push(lines[0]!);
56-
return [];
57-
}
64+
const endIndex =
65+
this.#carriageReturnIndex !== null ? patternIndex.preceding - 1 : patternIndex.preceding;
5866

59-
if (this.buffer.length > 0) {
60-
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
61-
this.buffer = [];
62-
}
67+
const line = this.decodeText(this.buffer.slice(0, endIndex));
68+
lines.push(line);
6369

64-
if (!trailingNewline) {
65-
this.buffer = [lines.pop() || ''];
70+
this.buffer = this.buffer.slice(patternIndex.index);
71+
this.#carriageReturnIndex = null;
6672
}
6773

6874
return lines;
@@ -106,13 +112,38 @@ export class LineDecoder {
106112
}
107113

108114
flush(): string[] {
109-
if (!this.buffer.length && !this.trailingCR) {
115+
if (!this.buffer.length) {
110116
return [];
111117
}
118+
return this.decode('\n');
119+
}
120+
}
112121

113-
const lines = [this.buffer.join('')];
114-
this.buffer = [];
115-
this.trailingCR = false;
116-
return lines;
122+
/**
123+
* This function searches the buffer for the end patterns, (\r or \n)
124+
* and returns an object with the index preceding the matched newline and the
125+
* index after the newline char. `null` is returned if no new line is found.
126+
*
127+
* ```ts
128+
* findNewLineIndex('abc\ndef') -> { preceding: 2, index: 3 }
129+
* ```
130+
*/
131+
function findNewlineIndex(
132+
buffer: Uint8Array,
133+
startIndex: number | null,
134+
): { preceding: number; index: number; carriage: boolean } | null {
135+
const newline = 0x0a; // \n
136+
const carriage = 0x0d; // \r
137+
138+
for (let i = startIndex ?? 0; i < buffer.length; i++) {
139+
if (buffer[i] === newline) {
140+
return { preceding: i, index: i + 1, carriage: false };
141+
}
142+
143+
if (buffer[i] === carriage) {
144+
return { preceding: i, index: i + 1, carriage: true };
145+
}
117146
}
147+
148+
return null;
118149
}

src/streaming.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,13 +354,17 @@ class SSEDecoder {
354354
}
355355

356356
/** This is an internal helper function that's just used for testing */
357-
export function _decodeChunks(chunks: string[]): string[] {
357+
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
358358
const decoder = new LineDecoder();
359359
const lines: string[] = [];
360360
for (const chunk of chunks) {
361361
lines.push(...decoder.decode(chunk));
362362
}
363363

364+
if (flush) {
365+
lines.push(...decoder.flush());
366+
}
367+
364368
return lines;
365369
}
366370

tests/streaming.test.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { PassThrough } from 'stream';
22
import assert from 'assert';
33
import { Stream, _iterSSEMessages, _decodeChunks as decodeChunks } from '@anthropic-ai/sdk/streaming';
44
import { APIConnectionError } from '@anthropic-ai/sdk/error';
5+
import { LineDecoder } from '@anthropic-ai/sdk/internal/decoders/line';
56

67
describe('line decoder', () => {
78
test('basic', () => {
@@ -10,8 +11,8 @@ describe('line decoder', () => {
1011
});
1112

1213
test('basic with \\r', () => {
13-
// baz is not included because the line hasn't ended yet
1414
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
15+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
1516
});
1617

1718
test('trailing new lines', () => {
@@ -29,6 +30,56 @@ describe('line decoder', () => {
2930
test('escaped new lines with \\r', () => {
3031
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
3132
});
33+
34+
test('\\r & \\n split across multiple chunks', () => {
35+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
36+
});
37+
38+
test('single \\r', () => {
39+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
40+
});
41+
42+
test('double \\r', () => {
43+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
44+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
45+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
46+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
47+
});
48+
49+
test('double \\r then \\r\\n', () => {
50+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
51+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
52+
});
53+
54+
test('double newline', () => {
55+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
56+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
57+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
58+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
59+
});
60+
61+
test('multi-byte characters across chunks', () => {
62+
const decoder = new LineDecoder();
63+
64+
// bytes taken from the string 'известни' and arbitrarily split
65+
// so that some multi-byte characters span multiple chunks
66+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
67+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
68+
expect(
69+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
70+
).toHaveLength(0);
71+
72+
const decoded = decoder.decode(new Uint8Array([0xa]));
73+
expect(decoded).toEqual(['известни']);
74+
});
75+
76+
test('flushing trailing newlines', () => {
77+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
78+
});
79+
80+
test('flushing empty buffer', () => {
81+
expect(decodeChunks([], { flush: true })).toEqual([]);
82+
});
3283
});
3384

3485
describe('streaming decoding', () => {

0 commit comments

Comments
 (0)