Skip to content

Commit 9f46065

Browse files
committed
fix(jetstream): ordered pull consumer would never fail to reset on initial creation if there was an error
Fix #725
1 parent 9c09cd6 commit 9f46065

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

jetstream/consumer.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -977,6 +977,7 @@ export class OrderedPullConsumerImpl implements Consumer {
977977
iter: OrderedConsumerMessages | null;
978978
type: PullConsumerType;
979979
startSeq: number;
980+
maxInitialReset: number;
980981

981982
constructor(
982983
api: ConsumerAPI,
@@ -998,6 +999,7 @@ export class OrderedPullConsumerImpl implements Consumer {
998999
this.iter = null;
9991000
this.type = PullConsumerType.Unset;
10001001
this.consumerOpts = opts;
1002+
this.maxInitialReset = 30;
10011003

10021004
// to support a random start sequence we need to update the cursor
10031005
this.startSeq = this.consumerOpts.opt_start_seq || 0;
@@ -1067,6 +1069,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10671069
}
10681070

10691071
async resetConsumer(seq = 0): Promise<ConsumerInfo> {
1072+
const isNew = this.serial === 0;
10701073
// try to delete the consumer
10711074
this.consumer?.delete().catch(() => {});
10721075
seq = seq === 0 ? 1 : seq;
@@ -1096,7 +1099,7 @@ export class OrderedPullConsumerImpl implements Consumer {
10961099
}
10971100
}
10981101

1099-
if (seq === 0 && i >= 30) {
1102+
if (isNew && i >= this.maxInitialReset) {
11001103
// consumer was never created, so we can fail this
11011104
throw err;
11021105
} else {
@@ -1294,7 +1297,7 @@ export class OrderedPullConsumerImpl implements Consumer {
12941297

12951298
async info(cached?: boolean): Promise<ConsumerInfo> {
12961299
if (this.currentConsumer == null) {
1297-
this.currentConsumer = await this.resetConsumer(this.serial);
1300+
this.currentConsumer = await this.resetConsumer(this.startSeq);
12981301
return Promise.resolve(this.currentConsumer);
12991302
}
13001303
if (cached && this.currentConsumer) {

jetstream/tests/consumers_ordered_test.ts

+21
Original file line numberDiff line numberDiff line change
@@ -1052,3 +1052,24 @@ Deno.test("ordered consumers - next reset", async () => {
10521052

10531053
await cleanup(ns, nc);
10541054
});
1055+
1056+
Deno.test("ordered consumers - initial creation fails, consumer fails", async () => {
1057+
const { ns, nc } = await setup(jetstreamServerConf());
1058+
const jsm = await nc.jetstreamManager();
1059+
1060+
await jsm.streams.add({ name: "A", subjects: ["a"] });
1061+
const js = nc.jetstream();
1062+
1063+
const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
1064+
await jsm.streams.delete("A");
1065+
c.maxInitialReset = 3;
1066+
await assertRejects(
1067+
() => {
1068+
return c.consume();
1069+
},
1070+
Error,
1071+
"stream not found",
1072+
);
1073+
1074+
await cleanup(ns, nc);
1075+
});

0 commit comments

Comments
 (0)