Skip to content

Commit d4665ee

Browse files
authored
Merge pull request #694 from nats-io/fix-691
[FIX] legacy order consumer subscription leak
2 parents 7fe57c3 + d1a3fe8 commit d4665ee

File tree

4 files changed

+133
-2
lines changed

4 files changed

+133
-2
lines changed

jetstream/jsclient.ts

+2
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,8 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
795795
this.js._request(subj, req, { retries: -1 })
796796
.then((v) => {
797797
const ci = v as ConsumerInfo;
798+
const jinfo = this.sub.info as JetStreamSubscriptionInfo;
799+
jinfo.last = ci;
798800
this.info!.config = ci.config;
799801
this.info!.name = ci.name;
800802
})

nats-base-client/protocol.ts

+1
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
995995
if (!s || this.isClosed()) {
996996
return;
997997
}
998+
this.unsub(s);
998999
s.subject = subject;
9991000
this.subscriptions.resub(s);
10001001
// we don't auto-unsub here because we don't

tests/helpers/launcher.ts

+54
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,45 @@ export interface JSZ {
7272
};
7373
}
7474

75+
export interface SubDetails {
76+
subject: string;
77+
sid: string;
78+
msgs: number;
79+
cid: number;
80+
}
81+
82+
export interface Conn {
83+
cid: number;
84+
kind: string;
85+
type: string;
86+
ip: string;
87+
port: number;
88+
start: string;
89+
"last_activity": string;
90+
"rtt": string;
91+
uptime: string;
92+
idle: string;
93+
"pending_bytes": number;
94+
"in_msgs": number;
95+
"out_msgs": number;
96+
subscriptions: number;
97+
name: string;
98+
lang: string;
99+
version: string;
100+
subscriptions_list?: string[];
101+
subscriptions_list_detail?: SubDetails[];
102+
}
103+
104+
export interface ConnZ {
105+
"server_id": string;
106+
now: string;
107+
"num_connections": number;
108+
"total": number;
109+
"offset": number;
110+
"limit": number;
111+
"connections": Conn[];
112+
}
113+
75114
function parseHostport(
76115
s?: string,
77116
): { hostname: string; port: number } | undefined {
@@ -316,6 +355,21 @@ export class NatsServer implements PortInfo {
316355
return await resp.json();
317356
}
318357

358+
async connz(cid?: number, subs: boolean | "detail" = true): Promise<ConnZ> {
359+
if (!this.monitoring) {
360+
return Promise.reject(new Error("server is not monitoring"));
361+
}
362+
const args = [];
363+
args.push(`subs=${subs}`);
364+
if (cid) {
365+
args.push(`cid=${cid}`);
366+
}
367+
368+
const qs = args.length ? args.join("&") : "";
369+
const resp = await fetch(`http://127.0.0.1:${this.monitoring}/connz?${qs}`);
370+
return await resp.json();
371+
}
372+
319373
async dataDir(): Promise<string | null> {
320374
const jsz = await this.jsz();
321375
return jsz.config.store_dir;

tests/resub_test.ts

+76-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@
1515

1616
import { cleanup, setup } from "./helpers/mod.ts";
1717
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
18-
import { assertEquals } from "https://deno.land/[email protected]/assert/mod.ts";
19-
import { createInbox, Msg } from "../nats-base-client/core.ts";
18+
import {
19+
assert,
20+
assertEquals,
21+
assertExists,
22+
fail,
23+
} from "https://deno.land/[email protected]/assert/mod.ts";
24+
import { createInbox, Msg, NatsConnection } from "../nats-base-client/core.ts";
25+
import { NatsServer } from "./helpers/launcher.ts";
2026

2127
Deno.test("resub - iter", async () => {
2228
const { ns, nc } = await setup();
@@ -75,3 +81,71 @@ Deno.test("resub - callback", async () => {
7581
assertEquals(buf[1].subject, subjb);
7682
await cleanup(ns, nc);
7783
});
84+
85+
async function assertEqualSubs(
86+
ns: NatsServer,
87+
nc: NatsConnection,
88+
): Promise<void> {
89+
const nci = nc as NatsConnectionImpl;
90+
const cid = nc.info?.client_id || -1;
91+
if (cid === -1) {
92+
fail("client_id not found");
93+
}
94+
95+
const connz = await ns.connz(cid, "detail");
96+
97+
const conn = connz.connections.find((c) => {
98+
return c.cid === cid;
99+
});
100+
assertExists(conn);
101+
assertExists(conn.subscriptions_list_detail);
102+
103+
const subs = nci.protocol.subscriptions.all();
104+
subs.forEach((sub) => {
105+
const ssub = conn.subscriptions_list_detail?.find((d) => {
106+
return d.sid === `${sub.sid}`;
107+
});
108+
assertExists(ssub);
109+
assertEquals(ssub.subject, sub.subject);
110+
});
111+
}
112+
113+
Deno.test("resub - removes server interest", async () => {
114+
const { ns, nc } = await setup();
115+
116+
nc.subscribe("a", {
117+
callback() {
118+
// nothing
119+
},
120+
});
121+
await nc.flush();
122+
123+
const nci = nc as NatsConnectionImpl;
124+
let sub = nci.protocol.subscriptions.all().find((s) => {
125+
return s.subject === "a";
126+
});
127+
assertExists(sub);
128+
129+
// assert the server sees the same subscriptions
130+
await assertEqualSubs(ns, nc);
131+
132+
// change it
133+
nci._resub(sub, "b");
134+
await nc.flush();
135+
136+
// make sure we don't find a
137+
sub = nci.protocol.subscriptions.all().find((s) => {
138+
return s.subject === "a";
139+
});
140+
assert(sub === undefined);
141+
142+
// make sure we find b
143+
sub = nci.protocol.subscriptions.all().find((s) => {
144+
return s.subject === "b";
145+
});
146+
assertExists(sub);
147+
148+
// assert server thinks the same thing
149+
await assertEqualSubs(ns, nc);
150+
await cleanup(ns, nc);
151+
});

0 commit comments

Comments
 (0)