Skip to content

Commit 8dc31a7

Browse files
authored
Add connection close listeners and update package versions (#258)
Introduced `ConnectionClosedListener` and related methods for improved resource management on connection closes. Fix #253 Signed-off-by: Alberto Ricart <[email protected]>
1 parent 59b625c commit 8dc31a7

27 files changed

+286
-57
lines changed

core/deno.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/nats-core",
3-
"version": "3.0.0",
3+
"version": "3.0.2-1",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"

core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/nats-core",
3-
"version": "3.0.0",
3+
"version": "3.0.2-1",
44
"files": [
55
"lib/",
66
"LICENSE",

core/src/core.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ export type SlowConsumerStatus = {
6565
pending: number;
6666
};
6767

68+
export type CloseStatus = {
69+
type: "close";
70+
};
71+
6872
export type Status =
6973
| DisconnectStatus
7074
| ReconnectStatus
@@ -75,7 +79,8 @@ export type Status =
7579
| ClientPingStatus
7680
| StaleConnectionStatus
7781
| SlowConsumerStatus
78-
| ForceReconnectStatus;
82+
| ForceReconnectStatus
83+
| CloseStatus;
7984

8085
export type MsgCallback<T> = (
8186
err: Error | null,
@@ -1042,3 +1047,9 @@ export type RequestInfo = {
10421047
client_id?: string;
10431048
nonce?: string;
10441049
};
1050+
1051+
export type CallbackOptionalErrorFn = (err: Error | void) => void;
1052+
1053+
export type ConnectionClosedListener = {
1054+
connectionClosedCallback: CallbackOptionalErrorFn;
1055+
};

core/src/internal_mod.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ export type {
8787
Auth,
8888
Authenticator,
8989
CallbackFn,
90+
CallbackOptionalErrorFn,
9091
ClientPingStatus,
9192
ClusterUpdateStatus,
93+
ConnectionClosedListener,
9294
ConnectionOptions,
9395
DisconnectStatus,
9496
Dispatcher,
@@ -129,6 +131,7 @@ export type {
129131
TokenAuth,
130132
UserPass,
131133
} from "./core.ts";
134+
132135
export { createInbox, Match, syncIterator } from "./core.ts";
133136
export { SubscriptionImpl, Subscriptions } from "./protocol.ts";
134137

core/src/nats.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import type { RequestManyOptionsInternal } from "./request.ts";
2626
import { RequestMany, RequestOne } from "./request.ts";
2727

2828
import type {
29+
ConnectionClosedListener,
2930
ConnectionOptions,
3031
Context,
3132
Msg,
@@ -48,6 +49,7 @@ export class NatsConnectionImpl implements NatsConnection {
4849
options: ConnectionOptions;
4950
protocol!: ProtocolHandler;
5051
draining: boolean;
52+
closeListeners?: CloseListeners;
5153

5254
private constructor(opts: ConnectionOptions) {
5355
this.draining = false;
@@ -534,4 +536,50 @@ export class NatsConnectionImpl implements NatsConnection {
534536
}
535537
return this.protocol.reconnect();
536538
}
539+
540+
// internal
541+
addCloseListener(listener: ConnectionClosedListener) {
542+
if (this.closeListeners === undefined) {
543+
this.closeListeners = new CloseListeners(this.closed());
544+
}
545+
this.closeListeners.add(listener);
546+
}
547+
// internal
548+
removeCloseListener(listener: ConnectionClosedListener) {
549+
if (this.closeListeners) {
550+
this.closeListeners.remove(listener);
551+
}
552+
}
553+
}
554+
555+
class CloseListeners {
556+
listeners: ConnectionClosedListener[];
557+
558+
constructor(closed: Promise<void | Error>) {
559+
this.listeners = [];
560+
closed.then((err) => {
561+
this.notify(err);
562+
});
563+
}
564+
565+
add(listener: ConnectionClosedListener) {
566+
this.listeners.push(listener);
567+
}
568+
569+
remove(listener: ConnectionClosedListener) {
570+
this.listeners = this.listeners.filter((l) => l !== listener);
571+
}
572+
573+
notify(err: void | Error) {
574+
this.listeners.forEach((l) => {
575+
if (typeof l.connectionClosedCallback === "function") {
576+
try {
577+
l.connectionClosedCallback(err);
578+
} catch (_) {
579+
// ignored
580+
}
581+
}
582+
});
583+
this.listeners = [];
584+
}
537585
}

core/src/version.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
// This file is generated - do not edit
2-
export const version = "3.0.0";
2+
export const version = "3.0.2-1";

core/tests/basics_test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import {
3535
syncIterator,
3636
} from "../src/internal_mod.ts";
3737
import type {
38+
ConnectionClosedListener,
39+
Deferred,
3840
Msg,
3941
MsgHdrs,
4042
MsgImpl,
@@ -1680,3 +1682,47 @@ class MM implements Msg {
16801682
return "";
16811683
}
16821684
}
1685+
1686+
Deno.test("basics - internal close listener", async () => {
1687+
const ns = await NatsServer.start();
1688+
const port = ns.port;
1689+
1690+
// nothing bad should happen if none registered
1691+
let nc = await connect({ port }) as NatsConnectionImpl;
1692+
await nc.close();
1693+
1694+
function makeListener(d: Deferred<unknown>): ConnectionClosedListener {
1695+
return {
1696+
connectionClosedCallback: () => {
1697+
d.resolve();
1698+
},
1699+
};
1700+
}
1701+
1702+
// can add and remove
1703+
nc = await connect({ port }) as NatsConnectionImpl;
1704+
let done = deferred();
1705+
let listener = makeListener(done);
1706+
1707+
(nc as NatsConnectionImpl).addCloseListener(listener);
1708+
// @ts-ignore: internal
1709+
assertEquals((nc as NatsConnectionImpl).closeListeners.listeners.length, 1);
1710+
(nc as NatsConnectionImpl).removeCloseListener(listener);
1711+
// @ts-ignore: internal
1712+
assertEquals((nc as NatsConnectionImpl).closeListeners.listeners.length, 0);
1713+
await nc.close();
1714+
done.resolve();
1715+
await done;
1716+
1717+
// closed called
1718+
nc = await connect({ port }) as NatsConnectionImpl;
1719+
done = deferred();
1720+
listener = makeListener(done);
1721+
(nc as NatsConnectionImpl).addCloseListener(listener);
1722+
await nc.close();
1723+
await done;
1724+
// @ts-ignore: internal
1725+
assertEquals((nc as NatsConnectionImpl).closeListeners.listeners.length, 0);
1726+
1727+
await ns.stop();
1728+
});

jetstream/deno.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/jetstream",
3-
"version": "3.0.1-2",
3+
"version": "3.0.2-1",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"
@@ -33,6 +33,6 @@
3333
"test": "deno test -A --parallel --reload --trace-leaks --quiet tests/ --import-map=import_map.json"
3434
},
3535
"imports": {
36-
"@nats-io/nats-core": "jsr:@nats-io/[email protected].0"
36+
"@nats-io/nats-core": "jsr:@nats-io/[email protected].2-1"
3737
}
3838
}

jetstream/import_map.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"imports": {
33
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
44
"@nats-io/nuid": "jsr:@nats-io/[email protected]",
5-
"@nats-io/nats-core": "jsr:@nats-io/[email protected].0",
6-
"@nats-io/nats-core/internal": "jsr:@nats-io/[email protected].0/internal",
5+
"@nats-io/nats-core": "jsr:@nats-io/[email protected].2-1",
6+
"@nats-io/nats-core/internal": "jsr:@nats-io/[email protected].2-1/internal",
77
"test_helpers": "../test_helpers/mod.ts",
88
"@std/io": "jsr:@std/[email protected]"
99
}

jetstream/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/jetstream",
3-
"version": "3.0.1-2",
3+
"version": "3.0.2-1",
44
"files": [
55
"lib/",
66
"LICENSE",
@@ -33,7 +33,7 @@
3333
},
3434
"description": "jetstream library - this library implements all the base functionality for NATS JetStream for javascript clients",
3535
"dependencies": {
36-
"@nats-io/nats-core": "3.0.0"
36+
"@nats-io/nats-core": "3.0.2-1"
3737
},
3838
"devDependencies": {
3939
"@types/node": "^22.13.10",

jetstream/src/consumer.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import type {
1717
CallbackFn,
18+
ConnectionClosedListener,
1819
Delay,
1920
MsgImpl,
2021
QueuedIterator,
@@ -129,6 +130,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
129130
inbox!: string;
130131
cancelables: Delay[];
131132
inReset: boolean;
133+
closeListener: ConnectionClosedListener;
132134

133135
// callback: ConsumerCallbackFn;
134136
constructor(
@@ -176,11 +178,15 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
176178
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
177179
this.bind = copts.bind === true;
178180

179-
this.consumer.api.nc.closed().then(() => {
180-
this._push(() => {
181-
this.stop();
182-
});
183-
});
181+
this.closeListener = {
182+
// we don't propagate the error here
183+
connectionClosedCallback: () => {
184+
this._push(() => {
185+
this.stop();
186+
});
187+
},
188+
};
189+
this.consumer.api.nc.addCloseListener(this.closeListener);
184190

185191
this.start();
186192
}
@@ -683,6 +689,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
683689
if (this.done) {
684690
return;
685691
}
692+
this.consumer.api.nc.removeCloseListener(this.closeListener);
693+
686694
this.sub?.unsubscribe();
687695
this.clearTimers();
688696
this.statusIterator?.stop();

jetstream/tests/consume_test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,3 +611,74 @@ Deno.test("consume - consumer not found and no responders", async () => {
611611

612612
await cleanup(ns, nc);
613613
});
614+
615+
Deno.test("consumer - internal close listener", async () => {
616+
const { ns, nc } = await setup(jetstreamServerConf());
617+
const nci = nc as NatsConnectionImpl;
618+
//@ts-ignore: internal
619+
assertEquals(nci.closeListeners, undefined);
620+
621+
const jsm = await jetstreamManager(nc);
622+
await jsm.streams.add({ name: "leak", subjects: ["leak.*"] });
623+
const ci = await jsm.consumers.add("leak", {});
624+
const js = jsm.jetstream();
625+
await js.publish("leak.a");
626+
await js.publish("leak.a");
627+
await js.publish("leak.a");
628+
629+
// next
630+
const c = await js.consumers.get("leak", ci.name);
631+
await c.next();
632+
//@ts-ignore: internal
633+
assertEquals(nci.closeListeners.listeners.length, 0);
634+
635+
// fetch
636+
let iter = await c.fetch({ max_messages: 1 });
637+
await (async () => {
638+
for await (const _ of iter) {
639+
// nothing
640+
}
641+
})();
642+
//@ts-ignore: internal
643+
assertEquals(nci.closeListeners.listeners.length, 0);
644+
645+
iter = await c.consume({ max_messages: 1 });
646+
await (async () => {
647+
for await (const _ of iter) {
648+
break;
649+
}
650+
})();
651+
//@ts-ignore: internal
652+
assertEquals(nci.closeListeners.listeners.length, 0);
653+
654+
// long
655+
iter = await c.consume({ max_messages: 1 });
656+
let done = (async () => {
657+
for await (const _ of iter) {
658+
// nothing
659+
}
660+
})();
661+
662+
await nc.flush();
663+
//@ts-ignore: internal
664+
assertEquals(nci.closeListeners.listeners.length, 1);
665+
iter.stop();
666+
await done;
667+
//@ts-ignore: internal
668+
assertEquals(nci.closeListeners.listeners.length, 0);
669+
670+
iter = await c.consume({ max_messages: 1 });
671+
done = (async () => {
672+
for await (const _ of iter) {
673+
// nothing
674+
}
675+
})();
676+
// @ts-ignore: internal
677+
assertEquals(nci.closeListeners.listeners.length, 1);
678+
await nc.close();
679+
await done;
680+
// @ts-ignore: internal
681+
assertEquals(nci.closeListeners.listeners.length, 0);
682+
683+
await cleanup(ns, nc);
684+
});

kv/deno.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/kv",
3-
"version": "3.0.1-1",
3+
"version": "3.0.2-1",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"
@@ -33,7 +33,7 @@
3333
"test": "deno test -A --parallel --reload --quiet tests/ --import-map=import_map.json"
3434
},
3535
"imports": {
36-
"@nats-io/nats-core": "jsr:@nats-io/[email protected].0",
37-
"@nats-io/jetstream": "jsr:@nats-io/[email protected].1-2"
36+
"@nats-io/nats-core": "jsr:@nats-io/[email protected].2-1",
37+
"@nats-io/jetstream": "jsr:@nats-io/[email protected].2-1"
3838
}
3939
}

kv/import_map.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{
22
"imports": {
3-
"@nats-io/nats-core": "jsr:@nats-io/[email protected].0",
4-
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0/internal",
5-
"@nats-io/jetstream": "jsr:@nats-io/[email protected].1-2",
6-
"@nats-io/jetstream/internal": "jsr:@nats-io/[email protected].1-2/internal",
3+
"@nats-io/nats-core": "jsr:@nats-io/[email protected].2-1",
4+
"@nats-io/nats-core/internal": "jsr:@nats-io/[email protected].2-1/internal",
5+
"@nats-io/jetstream": "jsr:@nats-io/[email protected].2-1",
6+
"@nats-io/jetstream/internal": "jsr:@nats-io/[email protected].2-1/internal",
77
"test_helpers": "../test_helpers/mod.ts",
88
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
99
"@nats-io/nuid": "jsr:@nats-io/[email protected]",

0 commit comments

Comments
 (0)