Skip to content

Commit e6d1688

Browse files
committed
merge of #715
1 parent f7c823d commit e6d1688

File tree

3 files changed

+163
-0
lines changed

3 files changed

+163
-0
lines changed

core/src/nats.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ export class NatsConnectionImpl implements NatsConnection {
253253
}
254254
},
255255
});
256+
(sub as SubscriptionImpl).requestSubject = subject;
256257

257258
sub.closed
258259
.then(() => {
@@ -375,6 +376,7 @@ export class NatsConnectionImpl implements NatsConnection {
375376
if (errCtx && err.code !== ErrorCode.Timeout) {
376377
err.stack += `\n\n${errCtx.stack}`;
377378
}
379+
sub.unsubscribe();
378380
d.reject(err);
379381
} else {
380382
err = isRequestError(msg);

core/tests/basics_test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,85 @@ Deno.test("basics - no mux requests", async () => {
635635
await cleanup(ns, nc);
636636
});
637637

638+
Deno.test("basics - no mux request timeout doesn't leak subs", async () => {
639+
const { ns, nc } = await _setup(connect);
640+
641+
nc.subscribe("q", { callback: () => {} });
642+
const nci = nc as NatsConnectionImpl;
643+
assertEquals(nci.protocol.subscriptions.size(), 1);
644+
645+
await assertRejects(
646+
async () => {
647+
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
648+
},
649+
Error,
650+
"TIMEOUT",
651+
);
652+
653+
assertEquals(nci.protocol.subscriptions.size(), 1);
654+
await cleanup(ns, nc);
655+
});
656+
657+
Deno.test("basics - no mux request no responders doesn't leak subs", async () => {
658+
const { ns, nc } = await _setup(connect);
659+
660+
const nci = nc as NatsConnectionImpl;
661+
assertEquals(nci.protocol.subscriptions.size(), 0);
662+
663+
await assertRejects(
664+
async () => {
665+
await nc.request("q", Empty, { noMux: true, timeout: 1000 });
666+
},
667+
Error,
668+
"503",
669+
);
670+
671+
assertEquals(nci.protocol.subscriptions.size(), 0);
672+
await cleanup(ns, nc);
673+
});
674+
675+
Deno.test("basics - no mux request no perms doesn't leak subs", async () => {
676+
const { ns, nc } = await _setup(connect, {
677+
authorization: {
678+
users: [{
679+
user: "s",
680+
password: "s",
681+
permission: {
682+
publish: "q",
683+
subscribe: "response",
684+
allow_responses: true,
685+
},
686+
}],
687+
},
688+
}, { user: "s", pass: "s" });
689+
690+
const nci = nc as NatsConnectionImpl;
691+
assertEquals(nci.protocol.subscriptions.size(), 0);
692+
693+
await assertRejects(
694+
async () => {
695+
await nc.request("qq", Empty, {
696+
noMux: true,
697+
reply: "response",
698+
timeout: 1000,
699+
});
700+
},
701+
Error,
702+
"Permissions Violation for Publish",
703+
);
704+
705+
await assertRejects(
706+
async () => {
707+
await nc.request("q", Empty, { noMux: true, reply: "r", timeout: 1000 });
708+
},
709+
Error,
710+
"Permissions Violation for Subscription",
711+
);
712+
713+
assertEquals(nci.protocol.subscriptions.size(), 0);
714+
await cleanup(ns, nc);
715+
});
716+
638717
Deno.test("basics - no max_payload messages", async () => {
639718
const { ns, nc } = await _setup(connect, { max_payload: 2048 });
640719
const nci = nc as NatsConnectionImpl;

core/tests/mrequest_test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,3 +420,85 @@ Deno.test("mreq - lost sub permission", async () => {
420420
await d;
421421
await cleanup(ns, nc);
422422
});
423+
424+
Deno.test("mreq - timeout doesn't leak subs", async () => {
425+
const { ns, nc } = await _setup(connect);
426+
427+
nc.subscribe("q", { callback: () => {} });
428+
const nci = nc as NatsConnectionImpl;
429+
assertEquals(nci.protocol.subscriptions.size(), 1);
430+
431+
// there's no error here - the empty response is the timeout
432+
const iter = await nc.requestMany("q", Empty, {
433+
maxWait: 1000,
434+
maxMessages: 10,
435+
noMux: true,
436+
});
437+
for await (const _ of iter) {
438+
// nothing
439+
}
440+
441+
assertEquals(nci.protocol.subscriptions.size(), 1);
442+
await cleanup(ns, nc);
443+
});
444+
445+
Deno.test("mreq - no responder doesn't leak subs", async () => {
446+
const { ns, nc } = await _setup(connect);
447+
448+
const nci = nc as NatsConnectionImpl;
449+
assertEquals(nci.protocol.subscriptions.size(), 0);
450+
451+
await assertRejects(
452+
async () => {
453+
const iter = await nc.requestMany("q", Empty, {
454+
noMux: true,
455+
maxWait: 1000,
456+
maxMessages: 10,
457+
});
458+
for await (const _ of iter) {
459+
// nothing
460+
}
461+
},
462+
Error,
463+
"503",
464+
);
465+
466+
// the mux subscription
467+
assertEquals(nci.protocol.subscriptions.size(), 0);
468+
await cleanup(ns, nc);
469+
});
470+
471+
Deno.test("mreq - no mux request no perms doesn't leak subs", async () => {
472+
const { ns, nc } = await _setup(connect, {
473+
authorization: {
474+
users: [{
475+
user: "s",
476+
password: "s",
477+
permission: {
478+
publish: "q",
479+
subscribe: ">",
480+
allow_responses: true,
481+
},
482+
}],
483+
},
484+
}, { user: "s", pass: "s" });
485+
486+
const nci = nc as NatsConnectionImpl;
487+
assertEquals(nci.protocol.subscriptions.size(), 0);
488+
489+
await assertRejects(
490+
async () => {
491+
const iter = await nc.requestMany("qq", Empty, {
492+
noMux: true,
493+
maxWait: 1000,
494+
});
495+
for await (const _ of iter) {
496+
// nothing
497+
}
498+
},
499+
Error,
500+
"Permissions Violation for Publish",
501+
);
502+
503+
await cleanup(ns, nc);
504+
});

0 commit comments

Comments
 (0)