Skip to content

Commit 5e441c8

Browse files
committed
removed console.log, review comment
Signed-off-by: Alberto Ricart <[email protected]>
2 parents ef33ed0 + 65c2034 commit 5e441c8

File tree

11 files changed

+468
-41
lines changed

11 files changed

+468
-41
lines changed

.github/workflows/test.yml

+6-14
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,12 @@ jobs:
2929
with:
3030
deno-version: ${{ matrix.deno-version }}
3131

32-
- name: Set NATS Server Version
33-
run: echo "NATS_VERSION=v2.10.24" >> $GITHUB_ENV
34-
35-
# this here because dns seems to be wedged on gha
36-
# - name: Add hosts to /etc/hosts
37-
# run: |
38-
# sudo echo "145.40.102.131 demo.nats.io" | sudo tee -a /etc/hosts
39-
40-
- name: Get nats-server
41-
run: |
42-
wget "https://github.com/nats-io/nats-server/releases/download/$NATS_VERSION/nats-server-$NATS_VERSION-linux-amd64.zip" -O tmp.zip
43-
unzip tmp.zip
44-
mv nats-server-$NATS_VERSION-linux-amd64 nats-server
45-
rm nats-server/README.md LICENSE
32+
- name: Install nats-server
33+
uses: aricart/[email protected]
34+
with:
35+
repo: nats-io/nats-server
36+
name: nats-server
37+
cache: true
4638

4739
- name: Lint Deno Module
4840
run: deno fmt --check --ignore=docs/

jetstream/consumer.ts

+29-3
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ export enum ConsumerEvents {
219219
* the consumer is recreated. The argument is the name of the newly created consumer.
220220
*/
221221
OrderedConsumerRecreated = "ordered_consumer_recreated",
222+
223+
/**
224+
* This notification means that either both the stream and consumer were not
225+
* found or that JetStream is not available.
226+
*/
227+
NoResponders = "no_responders",
222228
}
223229

224230
/**
@@ -304,6 +310,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
304310
resetHandler?: () => void;
305311
abortOnMissingResource?: boolean;
306312
bind: boolean;
313+
inBackOff: boolean;
307314

308315
// callback: ConsumerCallbackFn;
309316
constructor(
@@ -329,6 +336,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
329336
this.forOrderedConsumer = false;
330337
this.abortOnMissingResource = copts.abort_on_missing_resource === true;
331338
this.bind = copts.bind === true;
339+
this.inBackOff = false;
332340

333341
this.start();
334342
}
@@ -411,6 +419,17 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
411419
this.stop(error);
412420
return;
413421
}
422+
} else if (code === 503) {
423+
// this is a no responders - possibly the stream/consumer were
424+
// deleted from under the client
425+
this.notify(ConsumerEvents.NoResponders, `${code} No Responders`);
426+
// in cases that we are in consume, the idle heartbeat will kick in
427+
// which will do a reset, and possibly refine the error
428+
if (!this.refilling || this.abortOnMissingResource) {
429+
const error = new NatsError("no responders", `${code}`);
430+
this.stop(error);
431+
return;
432+
}
414433
} else {
415434
this.notify(
416435
ConsumerDebugEvents.DebugEvent,
@@ -563,9 +582,13 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
563582
}
564583

565584
async resetPendingWithInfo(): Promise<boolean> {
585+
if (this.inBackOff) {
586+
// already failing to get stream or consumer
587+
return false;
588+
}
566589
let notFound = 0;
567590
let streamNotFound = 0;
568-
const bo = backoff();
591+
const bo = backoff([this.opts.expires]);
569592
let attempt = 0;
570593
while (true) {
571594
if (this.done) {
@@ -578,6 +601,8 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
578601
try {
579602
// check we exist
580603
await this.consumer.info();
604+
this.inBackOff = false;
605+
581606
notFound = 0;
582607
// we exist, so effectively any pending state is gone
583608
// so reset and re-pull
@@ -616,6 +641,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
616641
notFound = 0;
617642
streamNotFound = 0;
618643
}
644+
this.inBackOff = true;
619645
const to = bo.backoff(attempt);
620646
// wait for delay or till the client closes
621647
const de = delay(to);
@@ -1069,6 +1095,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10691095
}
10701096

10711097
async resetConsumer(seq = 0): Promise<ConsumerInfo> {
1098+
const id = nuid.next();
10721099
const isNew = this.serial === 0;
10731100
// try to delete the consumer
10741101
this.consumer?.delete().catch(() => {});
@@ -1078,7 +1105,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10781105
const config = this.getConsumerOpts(seq);
10791106
config.max_deliver = 1;
10801107
config.mem_storage = true;
1081-
const bo = backoff();
1108+
const bo = backoff([this.opts?.expires || 30_000]);
10821109
let ci;
10831110
for (let i = 0;; i++) {
10841111
try {
@@ -1098,7 +1125,6 @@ export class OrderedPullConsumerImpl implements Consumer {
10981125
return Promise.reject(err);
10991126
}
11001127
}
1101-
11021128
if (isNew && i >= this.maxInitialReset) {
11031129
// consumer was never created, so we can fail this
11041130
throw err;

jetstream/tests/consume_test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ Deno.test("consume - stream not found request abort", async () => {
310310
}
311311
},
312312
Error,
313-
"stream not found",
313+
"no responders",
314314
);
315315

316316
await cleanup(ns, nc);
@@ -380,7 +380,7 @@ Deno.test("consume - consumer not found request abort", async () => {
380380
}
381381
},
382382
Error,
383-
"consumer not found",
383+
"no responders",
384384
);
385385

386386
await cleanup(ns, nc);

jetstream/tests/consumers_ordered_test.ts

+106
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import {
2323
} from "https://deno.land/[email protected]/assert/mod.ts";
2424
import {
2525
ConsumerDebugEvents,
26+
ConsumerEvents,
2627
ConsumerMessages,
28+
ConsumerStatus,
2729
DeliverPolicy,
2830
JsMsg,
2931
} from "../mod.ts";
@@ -1073,3 +1075,107 @@ Deno.test("ordered consumers - initial creation fails, consumer fails", async ()
10731075

10741076
await cleanup(ns, nc);
10751077
});
1078+
1079+
Deno.test("ordered consumers - no responders - stream deleted", async () => {
1080+
const { ns, nc } = await setup(jetstreamServerConf());
1081+
const jsm = await nc.jetstreamManager();
1082+
1083+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1084+
1085+
// stream is deleted
1086+
const c = await jsm.jetstream().consumers.get("messages");
1087+
const iter = await c.consume({ expires: 10_000 });
1088+
// FIXME: this impl of the consumer has a bug in notification
1089+
// that unless it has pulled, it has no way of returning events.
1090+
// for a test we want to miss a few, and then recreate.
1091+
await jsm.streams.delete("messages");
1092+
1093+
const buf: ConsumerStatus[] = [];
1094+
const snfP = deferred();
1095+
(async () => {
1096+
const status = await iter.status();
1097+
for await (const s of status) {
1098+
console.log(s);
1099+
buf.push(s);
1100+
if (s.type === ConsumerEvents.HeartbeatsMissed) {
1101+
if (s.data === 5) {
1102+
snfP.resolve();
1103+
}
1104+
}
1105+
}
1106+
})().then();
1107+
1108+
const process = (async () => {
1109+
for await (const m of iter) {
1110+
if (m) {
1111+
break;
1112+
}
1113+
}
1114+
})();
1115+
1116+
await snfP;
1117+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1118+
1119+
await nc.jetstream().publish("hello");
1120+
await deadline(process, 15_000);
1121+
await cleanup(ns, nc);
1122+
});
1123+
1124+
Deno.test("ordered consumers fetch - no responders - stream deleted", async () => {
1125+
const { ns, nc } = await setup(jetstreamServerConf());
1126+
const jsm = await nc.jetstreamManager();
1127+
1128+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1129+
1130+
// stream is deleted
1131+
const c = await jsm.jetstream().consumers.get("messages");
1132+
await jsm.streams.delete("messages");
1133+
await assertRejects(
1134+
async () => {
1135+
const iter = await c.fetch({ expires: 10_000 });
1136+
for await (const _ of iter) {
1137+
// ignored
1138+
}
1139+
},
1140+
Error,
1141+
"stream not found",
1142+
);
1143+
1144+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1145+
await nc.jetstream().publish("hello");
1146+
1147+
const d = deferred();
1148+
const iter = await c.fetch({ expires: 10_000, max_messages: 1 });
1149+
for await (const _ of iter) {
1150+
d.resolve();
1151+
}
1152+
await d;
1153+
1154+
await cleanup(ns, nc);
1155+
});
1156+
1157+
Deno.test("ordered consumers next - no responders - stream deleted", async () => {
1158+
const { ns, nc } = await setup(jetstreamServerConf());
1159+
const jsm = await nc.jetstreamManager();
1160+
1161+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1162+
1163+
// stream is deleted
1164+
const c = await jsm.jetstream().consumers.get("messages");
1165+
await jsm.streams.delete("messages");
1166+
await assertRejects(
1167+
() => {
1168+
return c.next({ expires: 10_000 });
1169+
},
1170+
Error,
1171+
"stream not found",
1172+
);
1173+
1174+
await jsm.streams.add({ name: "messages", subjects: ["hello"] });
1175+
await nc.jetstream().publish("hello");
1176+
1177+
const m = await c.next({ expires: 10_000 });
1178+
assertExists(m);
1179+
1180+
await cleanup(ns, nc);
1181+
});

0 commit comments

Comments
 (0)