Skip to content

Commit eb06fad

Browse files
authored
Merge pull request #715 from nats-io/mux-leaks
fixed noMux (request/requestMany) leaked subscriptions
2 parents f5a4365 + abb35f7 commit eb06fad

File tree

3 files changed

+163
-0
lines changed

3 files changed

+163
-0
lines changed

nats-base-client/nats.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ export class NatsConnectionImpl implements NatsConnection {
266266
}
267267
},
268268
});
269+
(sub as SubscriptionImpl).requestSubject = subject;
269270

270271
sub.closed
271272
.then(() => {
@@ -389,6 +390,7 @@ export class NatsConnectionImpl implements NatsConnection {
389390
if (errCtx && err.code !== ErrorCode.Timeout) {
390391
err.stack += `\n\n${errCtx.stack}`;
391392
}
393+
sub.unsubscribe();
392394
d.reject(err);
393395
} else {
394396
err = isRequestError(msg);

tests/basics_test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,85 @@ Deno.test("basics - no mux requests timeout", async () => {
621621
await cleanup(ns, nc);
622622
});
623623

624+
Deno.test("basics - no mux request timeout doesn't leak subs", async () => {
625+
const { ns, nc } = await setup();
626+
627+
nc.subscribe("q", { callback: () => {} });
628+
const nci = nc as NatsConnectionImpl;
629+
assertEquals(nci.protocol.subscriptions.size(), 1);
630+
631+
await assertRejects(
632+
async () => {
633+
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
634+
},
635+
Error,
636+
"TIMEOUT",
637+
);
638+
639+
assertEquals(nci.protocol.subscriptions.size(), 1);
640+
await cleanup(ns, nc);
641+
});
642+
643+
Deno.test("basics - no mux request no responders doesn't leak subs", async () => {
644+
const { ns, nc } = await setup();
645+
646+
const nci = nc as NatsConnectionImpl;
647+
assertEquals(nci.protocol.subscriptions.size(), 0);
648+
649+
await assertRejects(
650+
async () => {
651+
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
652+
},
653+
Error,
654+
"503",
655+
);
656+
657+
assertEquals(nci.protocol.subscriptions.size(), 0);
658+
await cleanup(ns, nc);
659+
});
660+
661+
Deno.test("basics - no mux request no perms doesn't leak subs", async () => {
662+
const { ns, nc } = await setup({
663+
authorization: {
664+
users: [{
665+
user: "s",
666+
password: "s",
667+
permission: {
668+
publish: "q",
669+
subscribe: "response",
670+
allow_responses: true,
671+
},
672+
}],
673+
},
674+
}, { user: "s", pass: "s" });
675+
676+
const nci = nc as NatsConnectionImpl;
677+
assertEquals(nci.protocol.subscriptions.size(), 0);
678+
679+
await assertRejects(
680+
async () => {
681+
await nc.request("qq", Empty, {
682+
noMux: true,
683+
reply: "response",
684+
timeout: 1000,
685+
});
686+
},
687+
Error,
688+
"Permissions Violation for Publish",
689+
);
690+
691+
await assertRejects(
692+
async () => {
693+
await nc.request("q", Empty, { noMux: true, reply: "r", timeout: 1000 });
694+
},
695+
Error,
696+
"Permissions Violation for Subscription",
697+
);
698+
699+
assertEquals(nci.protocol.subscriptions.size(), 0);
700+
await cleanup(ns, nc);
701+
});
702+
624703
Deno.test("basics - no mux requests", async () => {
625704
const { ns, nc } = await setup({ max_payload: 2048 });
626705
const subj = createInbox();

tests/mrequest_test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,3 +424,85 @@ Deno.test("mreq - lost sub permission", async () => {
424424
await d;
425425
await cleanup(ns, nc);
426426
});
427+
428+
Deno.test("mreq - timeout doesn't leak subs", async () => {
429+
const { ns, nc } = await setup();
430+
431+
nc.subscribe("q", { callback: () => {} });
432+
const nci = nc as NatsConnectionImpl;
433+
assertEquals(nci.protocol.subscriptions.size(), 1);
434+
435+
// there's no error here - the empty response is the timeout
436+
const iter = await nc.requestMany("q", Empty, {
437+
maxWait: 1000,
438+
maxMessages: 10,
439+
noMux: true,
440+
});
441+
for await (const _ of iter) {
442+
// nothing
443+
}
444+
445+
assertEquals(nci.protocol.subscriptions.size(), 1);
446+
await cleanup(ns, nc);
447+
});
448+
449+
Deno.test("mreq - no responder doesn't leak subs", async () => {
450+
const { ns, nc } = await setup();
451+
452+
const nci = nc as NatsConnectionImpl;
453+
assertEquals(nci.protocol.subscriptions.size(), 0);
454+
455+
await assertRejects(
456+
async () => {
457+
const iter = await nc.requestMany("q", Empty, {
458+
noMux: true,
459+
maxWait: 1000,
460+
maxMessages: 10,
461+
});
462+
for await (const _ of iter) {
463+
// nothing
464+
}
465+
},
466+
Error,
467+
"503",
468+
);
469+
470+
// the mux subscription
471+
assertEquals(nci.protocol.subscriptions.size(), 0);
472+
await cleanup(ns, nc);
473+
});
474+
475+
Deno.test("mreq - no mux request no perms doesn't leak subs", async () => {
476+
const { ns, nc } = await setup({
477+
authorization: {
478+
users: [{
479+
user: "s",
480+
password: "s",
481+
permission: {
482+
publish: "q",
483+
subscribe: ">",
484+
allow_responses: true,
485+
},
486+
}],
487+
},
488+
}, { user: "s", pass: "s" });
489+
490+
const nci = nc as NatsConnectionImpl;
491+
assertEquals(nci.protocol.subscriptions.size(), 0);
492+
493+
await assertRejects(
494+
async () => {
495+
const iter = await nc.requestMany("qq", Empty, {
496+
noMux: true,
497+
maxWait: 1000,
498+
});
499+
for await (const _ of iter) {
500+
// nothing
501+
}
502+
},
503+
Error,
504+
"Permissions Violation for Publish",
505+
);
506+
507+
await cleanup(ns, nc);
508+
});

0 commit comments

Comments
 (0)