Skip to content

Commit eb272af

Browse files
committed
fix(stream): avoid event listener leak
1 parent 3a5909b commit eb272af

File tree

2 files changed

+76
-44
lines changed

2 files changed

+76
-44
lines changed

src/lib/BetaMessageStream.ts

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -181,22 +181,30 @@ export class BetaMessageStream implements AsyncIterable<BetaMessageStreamEvent>
181181
options?: RequestOptions,
182182
): Promise<void> {
183183
const signal = options?.signal;
184+
let abortHandler: (() => void) | undefined;
184185
if (signal) {
185186
if (signal.aborted) this.controller.abort();
186-
signal.addEventListener('abort', () => this.controller.abort());
187+
abortHandler = this.controller.abort.bind(this.controller);
188+
signal.addEventListener('abort', abortHandler);
187189
}
188-
this.#beginRequest();
189-
const { response, data: stream } = await messages
190-
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
191-
.withResponse();
192-
this._connected(response);
193-
for await (const event of stream) {
194-
this.#addStreamEvent(event);
195-
}
196-
if (stream.controller.signal?.aborted) {
197-
throw new APIUserAbortError();
190+
try {
191+
this.#beginRequest();
192+
const { response, data: stream } = await messages
193+
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
194+
.withResponse();
195+
this._connected(response);
196+
for await (const event of stream) {
197+
this.#addStreamEvent(event);
198+
}
199+
if (stream.controller.signal?.aborted) {
200+
throw new APIUserAbortError();
201+
}
202+
this.#endRequest();
203+
} finally {
204+
if (signal && abortHandler) {
205+
signal.removeEventListener('abort', abortHandler);
206+
}
198207
}
199-
this.#endRequest();
200208
}
201209

202210
protected _connected(response: Response | null) {
@@ -497,20 +505,28 @@ export class BetaMessageStream implements AsyncIterable<BetaMessageStreamEvent>
497505
options?: RequestOptions,
498506
): Promise<void> {
499507
const signal = options?.signal;
508+
let abortHandler: (() => void) | undefined;
500509
if (signal) {
501510
if (signal.aborted) this.controller.abort();
502-
signal.addEventListener('abort', () => this.controller.abort());
511+
abortHandler = this.controller.abort.bind(this.controller);
512+
signal.addEventListener('abort', abortHandler);
503513
}
504-
this.#beginRequest();
505-
this._connected(null);
506-
const stream = Stream.fromReadableStream<BetaMessageStreamEvent>(readableStream, this.controller);
507-
for await (const event of stream) {
508-
this.#addStreamEvent(event);
509-
}
510-
if (stream.controller.signal?.aborted) {
511-
throw new APIUserAbortError();
514+
try {
515+
this.#beginRequest();
516+
this._connected(null);
517+
const stream = Stream.fromReadableStream<BetaMessageStreamEvent>(readableStream, this.controller);
518+
for await (const event of stream) {
519+
this.#addStreamEvent(event);
520+
}
521+
if (stream.controller.signal?.aborted) {
522+
throw new APIUserAbortError();
523+
}
524+
this.#endRequest();
525+
} finally {
526+
if (signal && abortHandler) {
527+
signal.removeEventListener('abort', abortHandler);
528+
}
512529
}
513-
this.#endRequest();
514530
}
515531

516532
/**

src/lib/MessageStream.ts

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -180,22 +180,30 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
180180
options?: RequestOptions,
181181
): Promise<void> {
182182
const signal = options?.signal;
183+
let abortHandler: (() => void) | undefined;
183184
if (signal) {
184185
if (signal.aborted) this.controller.abort();
185-
signal.addEventListener('abort', () => this.controller.abort());
186+
abortHandler = this.controller.abort.bind(this.controller);
187+
signal.addEventListener('abort', abortHandler);
186188
}
187-
this.#beginRequest();
188-
const { response, data: stream } = await messages
189-
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
190-
.withResponse();
191-
this._connected(response);
192-
for await (const event of stream) {
193-
this.#addStreamEvent(event);
194-
}
195-
if (stream.controller.signal?.aborted) {
196-
throw new APIUserAbortError();
189+
try {
190+
this.#beginRequest();
191+
const { response, data: stream } = await messages
192+
.create({ ...params, stream: true }, { ...options, signal: this.controller.signal })
193+
.withResponse();
194+
this._connected(response);
195+
for await (const event of stream) {
196+
this.#addStreamEvent(event);
197+
}
198+
if (stream.controller.signal?.aborted) {
199+
throw new APIUserAbortError();
200+
}
201+
this.#endRequest();
202+
} finally {
203+
if (signal && abortHandler) {
204+
signal.removeEventListener('abort', abortHandler);
205+
}
197206
}
198-
this.#endRequest();
199207
}
200208

201209
protected _connected(response: Response | null) {
@@ -496,20 +504,28 @@ export class MessageStream implements AsyncIterable<MessageStreamEvent> {
496504
options?: RequestOptions,
497505
): Promise<void> {
498506
const signal = options?.signal;
507+
let abortHandler: (() => void) | undefined;
499508
if (signal) {
500509
if (signal.aborted) this.controller.abort();
501-
signal.addEventListener('abort', () => this.controller.abort());
510+
abortHandler = this.controller.abort.bind(this.controller);
511+
signal.addEventListener('abort', abortHandler);
502512
}
503-
this.#beginRequest();
504-
this._connected(null);
505-
const stream = Stream.fromReadableStream<MessageStreamEvent>(readableStream, this.controller);
506-
for await (const event of stream) {
507-
this.#addStreamEvent(event);
508-
}
509-
if (stream.controller.signal?.aborted) {
510-
throw new APIUserAbortError();
513+
try {
514+
this.#beginRequest();
515+
this._connected(null);
516+
const stream = Stream.fromReadableStream<MessageStreamEvent>(readableStream, this.controller);
517+
for await (const event of stream) {
518+
this.#addStreamEvent(event);
519+
}
520+
if (stream.controller.signal?.aborted) {
521+
throw new APIUserAbortError();
522+
}
523+
this.#endRequest();
524+
} finally {
525+
if (signal && abortHandler) {
526+
signal.removeEventListener('abort', abortHandler);
527+
}
511528
}
512-
this.#endRequest();
513529
}
514530

515531
/**

0 commit comments

Comments
 (0)