9
9
ReturnTypeBasedOnParams
10
10
} from "@/types"
11
11
import OpenAI from "openai"
12
+ import { Stream } from "openai/streaming.mjs"
12
13
import { z , ZodError } from "zod"
13
14
import ZodStream , { OAIResponseParser , OAIStream , withResponseModel , type Mode } from "zod-stream"
14
15
import { fromZodError } from "zod-validation-error"
@@ -266,10 +267,10 @@ class Instructor<C extends GenericClient | OpenAI> {
266
267
return makeCompletionCallWithRetries ( )
267
268
}
268
269
269
- private async chatCompletionStream < T extends z . AnyZodObject > (
270
+ private async * chatCompletionStream < T extends z . AnyZodObject > (
270
271
{ max_retries, response_model, ...params } : ChatCompletionCreateParamsWithModel < T > ,
271
272
requestOptions ?: ClientTypeChatCompletionRequestOptions < C >
272
- ) : Promise < AsyncGenerator < Partial < T > & { _meta ?: CompletionMeta } , void , unknown > > {
273
+ ) : AsyncGenerator < Partial < T > & { _meta ?: CompletionMeta } , void , unknown > {
273
274
if ( max_retries ) {
274
275
this . log ( "warn" , "max_retries is not supported for streaming completions" )
275
276
}
@@ -293,7 +294,16 @@ class Instructor<C extends GenericClient | OpenAI> {
293
294
debug : this . debug ?? false
294
295
} )
295
296
296
- return streamClient . create ( {
297
+ async function checkForUsage ( reader : Stream < OpenAI . ChatCompletionChunk > ) {
298
+ for await ( const chunk of reader ) {
299
+ if ( "usage" in chunk ) {
300
+ streamUsage = chunk . usage as CompletionMeta [ "usage" ]
301
+ }
302
+ }
303
+ }
304
+
305
+ let streamUsage : CompletionMeta [ "usage" ] | undefined
306
+ const structuredStream = await streamClient . create ( {
297
307
completionPromise : async ( ) => {
298
308
if ( this . client . chat ?. completions ?. create ) {
299
309
const completion = await this . client . chat . completions . create (
@@ -306,6 +316,21 @@ class Instructor<C extends GenericClient | OpenAI> {
306
316
307
317
this . log ( "debug" , "raw stream completion response: " , completion )
308
318
319
+ if (
320
+ this . provider === "OAI" &&
321
+ completionParams ?. stream &&
322
+ "stream_options" in completionParams &&
323
+ completion instanceof Stream
324
+ ) {
325
+ const [ completion1 , completion2 ] = completion . tee ( )
326
+
327
+ checkForUsage ( completion1 )
328
+
329
+ return OAIStream ( {
330
+ res : completion2
331
+ } )
332
+ }
333
+
309
334
return OAIStream ( {
310
335
res : completion as unknown as AsyncIterable < OpenAI . ChatCompletionChunk >
311
336
} )
@@ -315,6 +340,16 @@ class Instructor<C extends GenericClient | OpenAI> {
315
340
} ,
316
341
response_model
317
342
} )
343
+
344
+ for await ( const chunk of structuredStream ) {
345
+ yield {
346
+ ...chunk ,
347
+ _meta : {
348
+ usage : streamUsage ?? undefined ,
349
+ ...( chunk ?. _meta ?? { } )
350
+ }
351
+ }
352
+ }
318
353
}
319
354
320
355
private isChatCompletionCreateParamsWithModel < T extends z . AnyZodObject > (
0 commit comments