Skip to content

fix(jetstream): ordered pull consumer would never fail to reset on initial creation if there was an error #726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions jetstream/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ export class OrderedPullConsumerImpl implements Consumer {
iter: OrderedConsumerMessages | null;
type: PullConsumerType;
startSeq: number;
maxInitialReset: number;

constructor(
api: ConsumerAPI,
Expand All @@ -998,6 +999,7 @@ export class OrderedPullConsumerImpl implements Consumer {
this.iter = null;
this.type = PullConsumerType.Unset;
this.consumerOpts = opts;
this.maxInitialReset = 30;

// to support a random start sequence we need to update the cursor
this.startSeq = this.consumerOpts.opt_start_seq || 0;
Expand Down Expand Up @@ -1067,6 +1069,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}

async resetConsumer(seq = 0): Promise<ConsumerInfo> {
const isNew = this.serial === 0;
// try to delete the consumer
this.consumer?.delete().catch(() => {});
seq = seq === 0 ? 1 : seq;
Expand Down Expand Up @@ -1096,7 +1099,7 @@ export class OrderedPullConsumerImpl implements Consumer {
}
}

if (seq === 0 && i >= 30) {
if (isNew && i >= this.maxInitialReset) {
// consumer was never created, so we can fail this
throw err;
} else {
Expand Down Expand Up @@ -1294,7 +1297,7 @@ export class OrderedPullConsumerImpl implements Consumer {

async info(cached?: boolean): Promise<ConsumerInfo> {
if (this.currentConsumer == null) {
this.currentConsumer = await this.resetConsumer(this.serial);
this.currentConsumer = await this.resetConsumer(this.startSeq);
return Promise.resolve(this.currentConsumer);
}
if (cached && this.currentConsumer) {
Expand Down
21 changes: 21 additions & 0 deletions jetstream/tests/consumers_ordered_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1052,3 +1052,24 @@ Deno.test("ordered consumers - next reset", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered consumers - initial creation fails, consumer fails", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();

await jsm.streams.add({ name: "A", subjects: ["a"] });
const js = nc.jetstream();

const c = await js.consumers.get("A") as OrderedPullConsumerImpl;
await jsm.streams.delete("A");
c.maxInitialReset = 3;
await assertRejects(
() => {
return c.consume();
},
Error,
"stream not found",
);

await cleanup(ns, nc);
});
Loading