Skip to content

Commit fca4bef

Browse files
committed
handle new "no responders" error for JetStream consumers
Updated test cases to handle "no responders" errors consistently for streams and consumers. Added new tests to verify behavior when streams or consumers are deleted and re-created. Improved imports and error assertion patterns for better clarity and reliability. Signed-off-by: Alberto Ricart <[email protected]>
2 parents ef33ed0 + 65c2034 commit fca4bef

File tree

11 files changed

+479
-43
lines changed

11 files changed

+479
-43
lines changed

.github/workflows/test.yml

+6-14
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,12 @@ jobs:
2929
with:
3030
deno-version: ${{ matrix.deno-version }}
3131

32-
- name: Set NATS Server Version
33-
run: echo "NATS_VERSION=v2.10.24" >> $GITHUB_ENV
34-
35-
# this here because dns seems to be wedged on gha
36-
# - name: Add hosts to /etc/hosts
37-
# run: |
38-
# sudo echo "145.40.102.131 demo.nats.io" | sudo tee -a /etc/hosts
39-
40-
- name: Get nats-server
41-
run: |
42-
wget "https://github.com/nats-io/nats-server/releases/download/$NATS_VERSION/nats-server-$NATS_VERSION-linux-amd64.zip" -O tmp.zip
43-
unzip tmp.zip
44-
mv nats-server-$NATS_VERSION-linux-amd64 nats-server
45-
rm nats-server/README.md LICENSE
32+
- name: Install nats-server
33+
uses: aricart/[email protected]
34+
with:
35+
repo: nats-io/nats-server
36+
name: nats-server
37+
cache: true
4638

4739
- name: Lint Deno Module
4840
run: deno fmt --check --ignore=docs/

jetstream/consumer.ts

+33-4
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ export enum ConsumerEvents {
219219
* the consumer is recreated. The argument is the name of the newly created consumer.
220220
*/
221221
OrderedConsumerRecreated = "ordered_consumer_recreated",
222+
223+
/**
224+
* This notification means that either both the stream and consumer were not
225+
* found or that JetStream is not available.
226+
*/
227+
NoResponders = "no_responders",
222228
}
223229

224230
/**
@@ -304,6 +310,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
304310
resetHandler?: () => void;
305311
abortOnMissingResource?: boolean;
306312
bind: boolean;
313+
inBackOff: boolean;
307314

308315
// callback: ConsumerCallbackFn;
309316
constructor(
@@ -329,6 +336,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
329336
this.forOrderedConsumer = false;
330337
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
331338
this.bind = copts.bind === true;
339+
this.inBackOff = false;
332340

333341
this.start();
334342
}
@@ -411,6 +419,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
411419
this.stop(error);
412420
return;
413421
}
422+
} else if (code === 503 && description === "unknown") {
423+
// this is a no responders - possibly the stream/consumer were
424+
// deleted from under the client
425+
this.notify(ConsumerEvents.NoResponders, `${code} No Responders`);
426+
// in cases that we are in consume, the idle heartbeat will kick in
427+
// which will do a reset, and possibly refine the error
428+
if (!this.refilling || this.abortOnMissingResource) {
429+
const error = new NatsError("no responders", `${code}`);
430+
this.stop(error);
431+
return;
432+
}
414433
} else {
415434
this.notify(
416435
ConsumerDebugEvents.DebugEvent,
@@ -563,9 +582,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
563582
}
564583

565584
async resetPendingWithInfo(): Promise<boolean> {
585+
if (this.inBackOff) {
586+
// already failing to get stream or consumer
587+
return false;
588+
}
566589
let notFound = 0;
567590
let streamNotFound = 0;
568-
const bo = backoff();
591+
const bo = backoff([this.opts.expires]);
569592
let attempt = 0;
570593
while (true) {
571594
if (this.done) {
@@ -578,6 +601,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
578601
try {
579602
// check we exist
580603
await this.consumer.info();
604+
this.inBackOff = false;
605+
581606
notFound = 0;
582607
// we exist, so effectively any pending state is gone
583608
// so reset and re-pull
@@ -616,6 +641,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
616641
notFound = 0;
617642
streamNotFound = 0;
618643
}
644+
this.inBackOff = true;
619645
const to = bo.backoff(attempt);
620646
// wait for delay or till the client closes
621647
const de = delay(to);
@@ -1069,6 +1095,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10691095
}
10701096

10711097
async resetConsumer(seq = 0): Promise<ConsumerInfo> {
1098+
const id = nuid.next();
10721099
const isNew = this.serial === 0;
10731100
// try to delete the consumer
10741101
this.consumer?.delete().catch(() => {});
@@ -1078,7 +1105,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10781105
const config = this.getConsumerOpts(seq);
10791106
config.max_deliver = 1;
10801107
config.mem_storage = true;
1081-
const bo = backoff();
1108+
const bo = backoff([this.opts?.expires || 30_000]);
10821109
let ci;
10831110
for (let i = 0;; i++) {
10841111
try {
@@ -1098,12 +1125,14 @@ export class OrderedPullConsumerImpl implements Consumer {
10981125
return Promise.reject(err);
10991126
}
11001127
}
1101-
1128+
console.log("caught: ", err.message);
11021129
if (isNew && i >= this.maxInitialReset) {
11031130
// consumer was never created, so we can fail this
11041131
throw err;
11051132
} else {
1106-
await delay(bo.backoff(i + 1));
1133+
const to = bo.backoff(i + 1);
1134+
console.log(id, "is waiting for", to);
1135+
await delay(to);
11071136
}
11081137
}
11091138
}

jetstream/tests/consume_test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ Deno.test("consume - stream not found request abort", async () => {
310310
}
311311
},
312312
Error,
313-
"stream not found",
313+
"no responders",
314314
);
315315

316316
await cleanup(ns, nc);
@@ -380,7 +380,7 @@ Deno.test("consume - consumer not found request abort", async () => {
380380
}
381381
},
382382
Error,
383-
"consumer not found",
383+
"no responders",
384384
);
385385

386386
await cleanup(ns, nc);

jetstream/tests/consumers_ordered_test.ts

+106
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import {
2323
} from "https://deno.land/[email protected]/assert/mod.ts";
2424
import {
2525
ConsumerDebugEvents,
26+
ConsumerEvents,
2627
ConsumerMessages,
28+
ConsumerStatus,
2729
DeliverPolicy,
2830
JsMsg,
2931
} from "../mod.ts";
@@ -1073,3 +1075,107 @@ Deno.test("ordered consumers - initial creation fails, consumer fails", async ()
10731075

10741076
await cleanup(ns, nc);
10751077
});
1078+
1079+
Deno.test("ordered consumers - no responders - stream deleted", async () => {
1080+
const { ns, nc } = await setup(jetstreamServerConf());
1081+
const jsm = await nc.jetstreamManager();
1082+
1083+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1084+
1085+
// stream is deleted
1086+
const c = await jsm.jetstream().consumers.get("messages");
1087+
const iter = await c.consume({ expires: 10_000 });
1088+
// FIXME: this impl of the consumer has a bug in notification
1089+
// that unless it has pulled, it has no way of returning events.
1090+
// for a test we want to miss a few, and then recreate.
1091+
await jsm.streams.delete("messages");
1092+
1093+
const buf: ConsumerStatus[] = [];
1094+
const snfP = deferred();
1095+
(async () => {
1096+
const status = await iter.status();
1097+
for await (const s of status) {
1098+
console.log(s);
1099+
buf.push(s);
1100+
if (s.type === ConsumerEvents.HeartbeatsMissed) {
1101+
if (s.data === 5) {
1102+
snfP.resolve();
1103+
}
1104+
}
1105+
}
1106+
})().then();
1107+
1108+
const process = (async () => {
1109+
for await (const m of iter) {
1110+
if (m) {
1111+
break;
1112+
}
1113+
}
1114+
})();
1115+
1116+
await snfP;
1117+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1118+
1119+
await nc.jetstream().publish("hello");
1120+
await deadline(process, 15_000);
1121+
await cleanup(ns, nc);
1122+
});
1123+
1124+
Deno.test("ordered consumers fetch - no responders - stream deleted", async () => {
1125+
const { ns, nc } = await setup(jetstreamServerConf());
1126+
const jsm = await nc.jetstreamManager();
1127+
1128+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1129+
1130+
// stream is deleted
1131+
const c = await jsm.jetstream().consumers.get("messages");
1132+
await jsm.streams.delete("messages");
1133+
await assertRejects(
1134+
async () => {
1135+
const iter = await c.fetch({ expires: 10_000 });
1136+
for await (const _ of iter) {
1137+
// ignored
1138+
}
1139+
},
1140+
Error,
1141+
"stream not found",
1142+
);
1143+
1144+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1145+
await nc.jetstream().publish("hello");
1146+
1147+
const d = deferred();
1148+
const iter = await c.fetch({ expires: 10_000, max_messages: 1 });
1149+
for await (const _ of iter) {
1150+
d.resolve();
1151+
}
1152+
await d;
1153+
1154+
await cleanup(ns, nc);
1155+
});
1156+
1157+
Deno.test("ordered consumers next - no responders - stream deleted", async () => {
1158+
const { ns, nc } = await setup(jetstreamServerConf());
1159+
const jsm = await nc.jetstreamManager();
1160+
1161+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1162+
1163+
// stream is deleted
1164+
const c = await jsm.jetstream().consumers.get("messages");
1165+
await jsm.streams.delete("messages");
1166+
await assertRejects(
1167+
() => {
1168+
return c.next({ expires: 10_000 });
1169+
},
1170+
Error,
1171+
"stream not found",
1172+
);
1173+
1174+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1175+
await nc.jetstream().publish("hello");
1176+
1177+
const m = await c.next({ expires: 10_000 });
1178+
assertExists(m);
1179+
1180+
await cleanup(ns, nc);
1181+
});

0 commit comments

Comments
 (0)