@@ -6,6 +6,8 @@ import { ReadableStreamToAsyncIterable } from '../internal/shims';
6
6
import { isAbortError } from '../internal/errors' ;
7
7
import { safeJSON } from '../internal/utils/values' ;
8
8
import { encodeUTF8 } from '../internal/utils/bytes' ;
9
+ import { loggerFor } from '../internal/utils/log' ;
10
+ import type { BaseAnthropic } from '../client' ;
9
11
10
12
import { APIError } from './error' ;
11
13
@@ -19,16 +21,24 @@ export type ServerSentEvent = {
19
21
20
22
export class Stream < Item > implements AsyncIterable < Item > {
21
23
controller : AbortController ;
24
+ #client: BaseAnthropic | undefined ;
22
25
23
26
constructor (
24
27
private iterator : ( ) => AsyncIterator < Item > ,
25
28
controller : AbortController ,
29
+ client ?: BaseAnthropic ,
26
30
) {
27
31
this . controller = controller ;
32
+ this . #client = client ;
28
33
}
29
34
30
- static fromSSEResponse < Item > ( response : Response , controller : AbortController ) : Stream < Item > {
35
+ static fromSSEResponse < Item > (
36
+ response : Response ,
37
+ controller : AbortController ,
38
+ client ?: BaseAnthropic ,
39
+ ) : Stream < Item > {
31
40
let consumed = false ;
41
+ const logger = client ? loggerFor ( client ) : console ;
32
42
33
43
async function * iterator ( ) : AsyncIterator < Item , any , undefined > {
34
44
if ( consumed ) {
@@ -42,8 +52,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
42
52
try {
43
53
yield JSON . parse ( sse . data ) ;
44
54
} catch ( e ) {
45
- console . error ( `Could not parse message into JSON:` , sse . data ) ;
46
- console . error ( `From chunk:` , sse . raw ) ;
55
+ logger . error ( `Could not parse message into JSON:` , sse . data ) ;
56
+ logger . error ( `From chunk:` , sse . raw ) ;
47
57
throw e ;
48
58
}
49
59
}
@@ -59,8 +69,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
59
69
try {
60
70
yield JSON . parse ( sse . data ) ;
61
71
} catch ( e ) {
62
- console . error ( `Could not parse message into JSON:` , sse . data ) ;
63
- console . error ( `From chunk:` , sse . raw ) ;
72
+ logger . error ( `Could not parse message into JSON:` , sse . data ) ;
73
+ logger . error ( `From chunk:` , sse . raw ) ;
64
74
throw e ;
65
75
}
66
76
}
@@ -84,14 +94,18 @@ export class Stream<Item> implements AsyncIterable<Item> {
84
94
}
85
95
}
86
96
87
- return new Stream ( iterator , controller ) ;
97
+ return new Stream ( iterator , controller , client ) ;
88
98
}
89
99
90
100
/**
91
101
* Generates a Stream from a newline-separated ReadableStream
92
102
* where each item is a JSON value.
93
103
*/
94
- static fromReadableStream < Item > ( readableStream : ReadableStream , controller : AbortController ) : Stream < Item > {
104
+ static fromReadableStream < Item > (
105
+ readableStream : ReadableStream ,
106
+ controller : AbortController ,
107
+ client ?: BaseAnthropic ,
108
+ ) : Stream < Item > {
95
109
let consumed = false ;
96
110
97
111
async function * iterLines ( ) : AsyncGenerator < string , void , unknown > {
@@ -131,7 +145,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
131
145
}
132
146
}
133
147
134
- return new Stream ( iterator , controller ) ;
148
+ return new Stream ( iterator , controller , client ) ;
135
149
}
136
150
137
151
[ Symbol . asyncIterator ] ( ) : AsyncIterator < Item > {
@@ -161,8 +175,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
161
175
} ;
162
176
163
177
return [
164
- new Stream ( ( ) => teeIterator ( left ) , this . controller ) ,
165
- new Stream ( ( ) => teeIterator ( right ) , this . controller ) ,
178
+ new Stream ( ( ) => teeIterator ( left ) , this . controller , this . #client ) ,
179
+ new Stream ( ( ) => teeIterator ( right ) , this . controller , this . #client ) ,
166
180
] ;
167
181
}
168
182
0 commit comments