Skip to content

Commit 53669af

Browse files
stainless-app[bot]stainless-bot
authored andcommitted
fix: optimize sse chunk reading off-by-one error (#686)
1 parent a8862b9 commit 53669af

File tree

4 files changed

+161
-127
lines changed

4 files changed

+161
-127
lines changed

src/internal/decoders/line.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,34 @@ function findNewlineIndex(
143143

144144
return null;
145145
}
146+
147+
export function findDoubleNewlineIndex(buffer: Uint8Array): number {
148+
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
149+
// and returns the index right after the first occurrence of any pattern,
150+
// or -1 if none of the patterns are found.
151+
const newline = 0x0a; // \n
152+
const carriage = 0x0d; // \r
153+
154+
for (let i = 0; i < buffer.length - 1; i++) {
155+
if (buffer[i] === newline && buffer[i + 1] === newline) {
156+
// \n\n
157+
return i + 2;
158+
}
159+
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
160+
// \r\r
161+
return i + 2;
162+
}
163+
if (
164+
buffer[i] === carriage &&
165+
buffer[i + 1] === newline &&
166+
i + 3 < buffer.length &&
167+
buffer[i + 2] === carriage &&
168+
buffer[i + 3] === newline
169+
) {
170+
// \r\n\r\n
171+
return i + 4;
172+
}
173+
}
174+
175+
return -1;
176+
}

src/streaming.ts

Lines changed: 1 addition & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ReadableStream, type Response } from './_shims/index';
22
import { AnthropicError } from './error';
3-
import { LineDecoder } from './internal/decoders/line';
3+
import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line';
44
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';
55

66
import { createResponseHeaders } from './core';
@@ -261,37 +261,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
261261
}
262262
}
263263

264-
function findDoubleNewlineIndex(buffer: Uint8Array): number {
265-
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
266-
// and returns the index right after the first occurrence of any pattern,
267-
// or -1 if none of the patterns are found.
268-
const newline = 0x0a; // \n
269-
const carriage = 0x0d; // \r
270-
271-
for (let i = 0; i < buffer.length - 2; i++) {
272-
if (buffer[i] === newline && buffer[i + 1] === newline) {
273-
// \n\n
274-
return i + 2;
275-
}
276-
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
277-
// \r\r
278-
return i + 2;
279-
}
280-
if (
281-
buffer[i] === carriage &&
282-
buffer[i + 1] === newline &&
283-
i + 3 < buffer.length &&
284-
buffer[i + 2] === carriage &&
285-
buffer[i + 3] === newline
286-
) {
287-
// \r\n\r\n
288-
return i + 4;
289-
}
290-
}
291-
292-
return -1;
293-
}
294-
295264
class SSEDecoder {
296265
private data: string[];
297266
private event: string | null;
@@ -347,21 +316,6 @@ class SSEDecoder {
347316
}
348317
}
349318

350-
/** This is an internal helper function that's just used for testing */
351-
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
352-
const decoder = new LineDecoder();
353-
const lines: string[] = [];
354-
for (const chunk of chunks) {
355-
lines.push(...decoder.decode(chunk));
356-
}
357-
358-
if (flush) {
359-
lines.push(...decoder.flush());
360-
}
361-
362-
return lines;
363-
}
364-
365319
function partition(str: string, delimiter: string): [string, string, string] {
366320
const index = str.indexOf(delimiter);
367321
if (index !== -1) {

tests/internal/decoders/line.test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import { findDoubleNewlineIndex, LineDecoder } from '@anthropic-ai/sdk/internal/decoders/line';
2+
3+
function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
4+
const decoder = new LineDecoder();
5+
const lines: string[] = [];
6+
for (const chunk of chunks) {
7+
lines.push(...decoder.decode(chunk));
8+
}
9+
10+
if (flush) {
11+
lines.push(...decoder.flush());
12+
}
13+
14+
return lines;
15+
}
16+
17+
describe('line decoder', () => {
18+
test('basic', () => {
19+
// baz is not included because the line hasn't ended yet
20+
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
21+
});
22+
23+
test('basic with \\r', () => {
24+
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
25+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
26+
});
27+
28+
test('trailing new lines', () => {
29+
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
30+
});
31+
32+
test('trailing new lines with \\r', () => {
33+
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
34+
});
35+
36+
test('escaped new lines', () => {
37+
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
38+
});
39+
40+
test('escaped new lines with \\r', () => {
41+
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
42+
});
43+
44+
test('\\r & \\n split across multiple chunks', () => {
45+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
46+
});
47+
48+
test('single \\r', () => {
49+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
50+
});
51+
52+
test('double \\r', () => {
53+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
54+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
55+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
56+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
57+
});
58+
59+
test('double \\r then \\r\\n', () => {
60+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
61+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
62+
});
63+
64+
test('double newline', () => {
65+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
66+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
67+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
68+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
69+
});
70+
71+
test('multi-byte characters across chunks', () => {
72+
const decoder = new LineDecoder();
73+
74+
// bytes taken from the string 'известни' and arbitrarily split
75+
// so that some multi-byte characters span multiple chunks
76+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
77+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
78+
expect(
79+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
80+
).toHaveLength(0);
81+
82+
const decoded = decoder.decode(new Uint8Array([0xa]));
83+
expect(decoded).toEqual(['известни']);
84+
});
85+
86+
test('flushing trailing newlines', () => {
87+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
88+
});
89+
90+
test('flushing empty buffer', () => {
91+
expect(decodeChunks([], { flush: true })).toEqual([]);
92+
});
93+
});
94+
95+
describe('findDoubleNewlineIndex', () => {
96+
test('finds \\n\\n', () => {
97+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5);
98+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2);
99+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5);
100+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2);
101+
});
102+
103+
test('finds \\r\\r', () => {
104+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5);
105+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2);
106+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5);
107+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2);
108+
});
109+
110+
test('finds \\r\\n\\r\\n', () => {
111+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7);
112+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4);
113+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7);
114+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4);
115+
});
116+
117+
test('returns -1 when no double newline found', () => {
118+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1);
119+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1);
120+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1);
121+
expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1);
122+
});
123+
124+
test('handles incomplete patterns', () => {
125+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1);
126+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1);
127+
});
128+
});

tests/streaming.test.ts

Lines changed: 1 addition & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,8 @@
11
import { Response } from 'node-fetch';
22
import { PassThrough } from 'stream';
33
import assert from 'assert';
4-
import { Stream, _iterSSEMessages, _decodeChunks as decodeChunks } from '@anthropic-ai/sdk/streaming';
4+
import { Stream, _iterSSEMessages } from '@anthropic-ai/sdk/streaming';
55
import { APIConnectionError } from '@anthropic-ai/sdk/error';
6-
import { LineDecoder } from '@anthropic-ai/sdk/internal/decoders/line';
7-
8-
describe('line decoder', () => {
9-
test('basic', () => {
10-
// baz is not included because the line hasn't ended yet
11-
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
12-
});
13-
14-
test('basic with \\r', () => {
15-
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
16-
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
17-
});
18-
19-
test('trailing new lines', () => {
20-
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
21-
});
22-
23-
test('trailing new lines with \\r', () => {
24-
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
25-
});
26-
27-
test('escaped new lines', () => {
28-
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
29-
});
30-
31-
test('escaped new lines with \\r', () => {
32-
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
33-
});
34-
35-
test('\\r & \\n split across multiple chunks', () => {
36-
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
37-
});
38-
39-
test('single \\r', () => {
40-
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
41-
});
42-
43-
test('double \\r', () => {
44-
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
45-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
46-
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
47-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
48-
});
49-
50-
test('double \\r then \\r\\n', () => {
51-
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
52-
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
53-
});
54-
55-
test('double newline', () => {
56-
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
57-
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
58-
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
59-
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
60-
});
61-
62-
test('multi-byte characters across chunks', () => {
63-
const decoder = new LineDecoder();
64-
65-
// bytes taken from the string 'известни' and arbitrarily split
66-
// so that some multi-byte characters span multiple chunks
67-
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
68-
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
69-
expect(
70-
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
71-
).toHaveLength(0);
72-
73-
const decoded = decoder.decode(new Uint8Array([0xa]));
74-
expect(decoded).toEqual(['известни']);
75-
});
76-
77-
test('flushing trailing newlines', () => {
78-
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
79-
});
80-
81-
test('flushing empty buffer', () => {
82-
expect(decodeChunks([], { flush: true })).toEqual([]);
83-
});
84-
});
856

867
describe('streaming decoding', () => {
878
test('basic', async () => {

0 commit comments

Comments
 (0)