Skip to content

Commit fa4e1e5

Browse files
authored
Merge pull request #750 from nats-io/fix-no-responders
breaking: nats-server 2.10.26 emits no responder if stream/consumer are not found
2 parents feb7cd0 + 5e441c8 commit fa4e1e5

File tree

8 files changed

+460
-25
lines changed

8 files changed

+460
-25
lines changed

jetstream/consumer.ts

+29-3
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ export enum ConsumerEvents {
219219
* the consumer is recreated. The argument is the name of the newly created consumer.
220220
*/
221221
OrderedConsumerRecreated = "ordered_consumer_recreated",
222+
223+
/**
224+
* This notification means that either both the stream and consumer were not
225+
* found or that JetStream is not available.
226+
*/
227+
NoResponders = "no_responders",
222228
}
223229

224230
/**
@@ -304,6 +310,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
304310
resetHandler?: () => void;
305311
abortOnMissingResource?: boolean;
306312
bind: boolean;
313+
inBackOff: boolean;
307314

308315
// callback: ConsumerCallbackFn;
309316
constructor(
@@ -329,6 +336,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
329336
this.forOrderedConsumer = false;
330337
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
331338
this.bind = copts.bind === true;
339+
this.inBackOff = false;
332340

333341
this.start();
334342
}
@@ -411,6 +419,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
411419
this.stop(error);
412420
return;
413421
}
422+
} else if (code === 503) {
423+
// this is a no responders - possibly the stream/consumer were
424+
// deleted from under the client
425+
this.notify(ConsumerEvents.NoResponders, `${code} No Responders`);
426+
// in cases that we are in consume, the idle heartbeat will kick in
427+
// which will do a reset, and possibly refine the error
428+
if (!this.refilling || this.abortOnMissingResource) {
429+
const error = new NatsError("no responders", `${code}`);
430+
this.stop(error);
431+
return;
432+
}
414433
} else {
415434
this.notify(
416435
ConsumerDebugEvents.DebugEvent,
@@ -563,9 +582,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
563582
}
564583

565584
async resetPendingWithInfo(): Promise<boolean> {
585+
if (this.inBackOff) {
586+
// already failing to get stream or consumer
587+
return false;
588+
}
566589
let notFound = 0;
567590
let streamNotFound = 0;
568-
const bo = backoff();
591+
const bo = backoff([this.opts.expires]);
569592
let attempt = 0;
570593
while (true) {
571594
if (this.done) {
@@ -578,6 +601,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
578601
try {
579602
// check we exist
580603
await this.consumer.info();
604+
this.inBackOff = false;
605+
581606
notFound = 0;
582607
// we exist, so effectively any pending state is gone
583608
// so reset and re-pull
@@ -616,6 +641,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
616641
notFound = 0;
617642
streamNotFound = 0;
618643
}
644+
this.inBackOff = true;
619645
const to = bo.backoff(attempt);
620646
// wait for delay or till the client closes
621647
const de = delay(to);
@@ -1069,6 +1095,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10691095
}
10701096

10711097
async resetConsumer(seq = 0): Promise<ConsumerInfo> {
1098+
const id = nuid.next();
10721099
const isNew = this.serial === 0;
10731100
// try to delete the consumer
10741101
this.consumer?.delete().catch(() => {});
@@ -1078,7 +1105,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10781105
const config = this.getConsumerOpts(seq);
10791106
config.max_deliver = 1;
10801107
config.mem_storage = true;
1081-
const bo = backoff();
1108+
const bo = backoff([this.opts?.expires || 30_000]);
10821109
let ci;
10831110
for (let i = 0;; i++) {
10841111
try {
@@ -1098,7 +1125,6 @@ export class OrderedPullConsumerImpl implements Consumer {
10981125
return Promise.reject(err);
10991126
}
11001127
}
1101-
11021128
if (isNew && i >= this.maxInitialReset) {
11031129
// consumer was never created, so we can fail this
11041130
throw err;

jetstream/tests/consume_test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ Deno.test("consume - stream not found request abort", async () => {
310310
}
311311
},
312312
Error,
313-
"stream not found",
313+
"no responders",
314314
);
315315

316316
await cleanup(ns, nc);
@@ -380,7 +380,7 @@ Deno.test("consume - consumer not found request abort", async () => {
380380
}
381381
},
382382
Error,
383-
"consumer not found",
383+
"no responders",
384384
);
385385

386386
await cleanup(ns, nc);

jetstream/tests/consumers_ordered_test.ts

+106
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import {
2323
} from "https://deno.land/[email protected]/assert/mod.ts";
2424
import {
2525
ConsumerDebugEvents,
26+
ConsumerEvents,
2627
ConsumerMessages,
28+
ConsumerStatus,
2729
DeliverPolicy,
2830
JsMsg,
2931
} from "../mod.ts";
@@ -1073,3 +1075,107 @@ Deno.test("ordered consumers - initial creation fails, consumer fails", async ()
10731075

10741076
await cleanup(ns, nc);
10751077
});
1078+
1079+
Deno.test("ordered consumers - no responders - stream deleted", async () => {
1080+
const { ns, nc } = await setup(jetstreamServerConf());
1081+
const jsm = await nc.jetstreamManager();
1082+
1083+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1084+
1085+
// stream is deleted
1086+
const c = await jsm.jetstream().consumers.get("messages");
1087+
const iter = await c.consume({ expires: 10_000 });
1088+
// FIXME: this impl of the consumer has a bug in notification
1089+
// that unless it has pulled, it has no way of returning events.
1090+
// for a test we want to miss a few, and then recreate.
1091+
await jsm.streams.delete("messages");
1092+
1093+
const buf: ConsumerStatus[] = [];
1094+
const snfP = deferred();
1095+
(async () => {
1096+
const status = await iter.status();
1097+
for await (const s of status) {
1098+
console.log(s);
1099+
buf.push(s);
1100+
if (s.type === ConsumerEvents.HeartbeatsMissed) {
1101+
if (s.data === 5) {
1102+
snfP.resolve();
1103+
}
1104+
}
1105+
}
1106+
})().then();
1107+
1108+
const process = (async () => {
1109+
for await (const m of iter) {
1110+
if (m) {
1111+
break;
1112+
}
1113+
}
1114+
})();
1115+
1116+
await snfP;
1117+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1118+
1119+
await nc.jetstream().publish("hello");
1120+
await deadline(process, 15_000);
1121+
await cleanup(ns, nc);
1122+
});
1123+
1124+
Deno.test("ordered consumers fetch - no responders - stream deleted", async () => {
1125+
const { ns, nc } = await setup(jetstreamServerConf());
1126+
const jsm = await nc.jetstreamManager();
1127+
1128+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1129+
1130+
// stream is deleted
1131+
const c = await jsm.jetstream().consumers.get("messages");
1132+
await jsm.streams.delete("messages");
1133+
await assertRejects(
1134+
async () => {
1135+
const iter = await c.fetch({ expires: 10_000 });
1136+
for await (const _ of iter) {
1137+
// ignored
1138+
}
1139+
},
1140+
Error,
1141+
"stream not found",
1142+
);
1143+
1144+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1145+
await nc.jetstream().publish("hello");
1146+
1147+
const d = deferred();
1148+
const iter = await c.fetch({ expires: 10_000, max_messages: 1 });
1149+
for await (const _ of iter) {
1150+
d.resolve();
1151+
}
1152+
await d;
1153+
1154+
await cleanup(ns, nc);
1155+
});
1156+
1157+
Deno.test("ordered consumers next - no responders - stream deleted", async () => {
1158+
const { ns, nc } = await setup(jetstreamServerConf());
1159+
const jsm = await nc.jetstreamManager();
1160+
1161+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1162+
1163+
// stream is deleted
1164+
const c = await jsm.jetstream().consumers.get("messages");
1165+
await jsm.streams.delete("messages");
1166+
await assertRejects(
1167+
() => {
1168+
return c.next({ expires: 10_000 });
1169+
},
1170+
Error,
1171+
"stream not found",
1172+
);
1173+
1174+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1175+
await nc.jetstream().publish("hello");
1176+
1177+
const m = await c.next({ expires: 10_000 });
1178+
assertExists(m);
1179+
1180+
await cleanup(ns, nc);
1181+
});

jetstream/tests/consumers_test.ts

+126-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414
*/
1515
import { initStream } from "./jstest_util.ts";
1616
import {
17+
assert,
1718
assertEquals,
1819
assertExists,
1920
assertRejects,
2021
assertStringIncludes,
2122
} from "https://deno.land/[email protected]/assert/mod.ts";
22-
import { deferred, delay, nanos } from "../../nats-base-client/mod.ts";
23+
import {
24+
deadline,
25+
deferred,
26+
delay,
27+
nanos,
28+
} from "../../nats-base-client/mod.ts";
2329
import {
2430
AckPolicy,
2531
Consumer,
@@ -645,3 +651,122 @@ Deno.test("consumers - getPullConsumerFor non existing misses heartbeats", async
645651

646652
await cleanup(ns, nc);
647653
});
654+
655+
Deno.test("consumers - no responders - stream deleted", async () => {
656+
const { ns, nc } = await setup(jetstreamServerConf());
657+
const jsm = await nc.jetstreamManager();
658+
659+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
660+
await jsm.consumers.add("messages", {
661+
name: "c",
662+
deliver_policy: DeliverPolicy.All,
663+
});
664+
665+
// stream is deleted
666+
const c = await jsm.jetstream().consumers.get("messages", "c");
667+
await jsm.streams.delete("messages");
668+
const iter = await c.consume({ expires: 10_000 });
669+
const buf: ConsumerStatus[] = [];
670+
const hbmP = deferred();
671+
(async () => {
672+
const status = await iter.status();
673+
for await (const s of status) {
674+
console.log(s);
675+
buf.push(s);
676+
if (s.type === ConsumerEvents.HeartbeatsMissed) {
677+
if (s.data === 3) {
678+
hbmP.resolve();
679+
}
680+
}
681+
}
682+
})().then();
683+
684+
const process = (async () => {
685+
for await (const m of iter) {
686+
if (m) {
687+
break;
688+
}
689+
}
690+
})();
691+
692+
await hbmP;
693+
694+
const snfs = buf.filter((s) => {
695+
return s.type === ConsumerEvents.StreamNotFound;
696+
}).length;
697+
assert(snfs > 0);
698+
699+
const nrs = buf.filter((s) => {
700+
return s.type === ConsumerEvents.NoResponders;
701+
}).length;
702+
assertEquals(nrs, 1);
703+
704+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
705+
await jsm.consumers.add("messages", {
706+
name: "c",
707+
deliver_policy: DeliverPolicy.All,
708+
});
709+
710+
await nc.jetstream().publish("hello");
711+
await deadline(process, 5_000);
712+
await cleanup(ns, nc);
713+
});
714+
715+
Deno.test("consumers - no responders - consumer deleted", async () => {
716+
const { ns, nc } = await setup(jetstreamServerConf());
717+
const jsm = await nc.jetstreamManager();
718+
719+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
720+
await jsm.consumers.add("messages", {
721+
name: "c",
722+
deliver_policy: DeliverPolicy.All,
723+
});
724+
725+
// stream is deleted
726+
const c = await jsm.jetstream().consumers.get("messages", "c");
727+
await jsm.consumers.delete("messages", "c");
728+
const iter = await c.consume({ expires: 10_000 });
729+
const buf: ConsumerStatus[] = [];
730+
const hbmP = deferred();
731+
(async () => {
732+
const status = await iter.status();
733+
for await (const s of status) {
734+
console.log(s);
735+
buf.push(s);
736+
if (s.type === ConsumerEvents.HeartbeatsMissed) {
737+
if (s.data === 3) {
738+
hbmP.resolve();
739+
}
740+
}
741+
}
742+
})().then();
743+
744+
const process = (async () => {
745+
for await (const m of iter) {
746+
if (m) {
747+
break;
748+
}
749+
}
750+
})();
751+
752+
await hbmP;
753+
754+
const snfs = buf.filter((s) => {
755+
return s.type === ConsumerEvents.ConsumerNotFound;
756+
}).length;
757+
assert(snfs > 0);
758+
759+
const nrs = buf.filter((s) => {
760+
return s.type === ConsumerEvents.NoResponders;
761+
}).length;
762+
assertEquals(nrs, 1);
763+
764+
await jsm.consumers.add("messages", {
765+
name: "c",
766+
deliver_policy: DeliverPolicy.All,
767+
});
768+
769+
await nc.jetstream().publish("hello");
770+
await deadline(process, 5_000);
771+
await cleanup(ns, nc);
772+
});

0 commit comments

Comments
 (0)