diff --git a/jetstream/src/jsapi_types.ts b/jetstream/src/jsapi_types.ts index 830c9b36..80e93a9b 100644 --- a/jetstream/src/jsapi_types.ts +++ b/jetstream/src/jsapi_types.ts @@ -979,7 +979,8 @@ export type ConsumerUpdateConfig = PriorityGroups & { */ "ack_wait"?: Nanos; /** - * The number of times a message will be redelivered to consumers if not acknowledged in time + * The maximum number of times a message will be delivered to consumers if not acknowledged in time. + * Default is -1 (will redeliver until acknowledged). */ "max_deliver"?: number; /** @@ -1163,9 +1164,9 @@ export type DeliveryInfo = { */ consumer: string; /** - * The number of times the message has been redelivered. + * The number of times the message has been delivered. */ - redeliveryCount: number; + deliveryCount: number; /** * The sequence number of the message in the stream */ diff --git a/jetstream/src/jsmsg.ts b/jetstream/src/jsmsg.ts index d0601785..3d28ecc4 100644 --- a/jetstream/src/jsmsg.ts +++ b/jetstream/src/jsmsg.ts @@ -162,17 +162,17 @@ export function parseInfo(s: string): DeliveryInfo { } // old - // "$JS.ACK....." + // "$JS.ACK....." // new - // $JS.ACK.......... + // $JS.ACK.......... const di = {} as DeliveryInfo; // if domain is "_", replace with blank di.domain = tokens[2] === "_" ? "" : tokens[2]; di.account_hash = tokens[3]; di.stream = tokens[4]; di.consumer = tokens[5]; - di.redeliveryCount = parseInt(tokens[6], 10); - di.redelivered = di.redeliveryCount > 1; + di.deliveryCount = parseInt(tokens[6], 10); + di.redelivered = di.deliveryCount > 1; di.streamSequence = parseInt(tokens[7], 10); di.deliverySequence = parseInt(tokens[8], 10); di.timestampNanos = parseInt(tokens[9], 10); @@ -216,7 +216,7 @@ export class JsMsgImpl implements JsMsg { } get redelivered(): boolean { - return this.info.redeliveryCount > 1; + return this.info.deliveryCount > 1; } get reply(): string { diff --git a/jetstream/tests/jetstream_test.ts b/jetstream/tests/jetstream_test.ts index 4b219204..31f236ad 100644 --- a/jetstream/tests/jetstream_test.ts +++ b/jetstream/tests/jetstream_test.ts @@ -730,7 +730,7 @@ Deno.test("jetstream - backoff", async () => { const iter = await c.consume({ callback: (m) => { when.push(Date.now()); - if (m.info.redeliveryCount === 4) { + if (m.info.deliveryCount === 4) { iter.stop(); } }, @@ -779,7 +779,7 @@ Deno.test("jetstream - redelivery", async () => { if (m.redelivered) { redeliveries++; } - if (m.info.redeliveryCount === 4) { + if (m.info.deliveryCount === 4) { setTimeout(() => { iter.stop(); }, 2000); diff --git a/jetstream/tests/jsmsg_test.ts b/jetstream/tests/jsmsg_test.ts index 6294330f..63d08d4b 100644 --- a/jetstream/tests/jsmsg_test.ts +++ b/jetstream/tests/jsmsg_test.ts @@ -43,18 +43,18 @@ import type { JetStreamManagerImpl } from "../src/jsclient.ts"; import { errors } from "../../core/src/mod.ts"; Deno.test("jsmsg - parse", () => { - // "$JS.ACK....." + // "$JS.ACK....." const rs = `$JS.ACK.streamname.consumername.2.3.4.${nanos(Date.now())}.100`; const info = parseInfo(rs); assertEquals(info.stream, "streamname"); assertEquals(info.consumer, "consumername"); - assertEquals(info.redeliveryCount, 2); + assertEquals(info.deliveryCount, 2); assertEquals(info.streamSequence, 3); assertEquals(info.pending, 100); }); Deno.test("jsmsg - parse long", () => { - // $JS.ACK.......... + // $JS.ACK.......... const rs = `$JS.ACK.domain.account.streamname.consumername.2.3.4.${ nanos(Date.now()) }.100.rand`; @@ -63,7 +63,7 @@ Deno.test("jsmsg - parse long", () => { assertEquals(info.account_hash, "account"); assertEquals(info.stream, "streamname"); assertEquals(info.consumer, "consumername"); - assertEquals(info.redeliveryCount, 2); + assertEquals(info.deliveryCount, 2); assertEquals(info.streamSequence, 3); assertEquals(info.pending, 100); }); @@ -100,7 +100,7 @@ Deno.test("jsmsg - acks", async () => { fail(err.message); } msg.respond(Empty, { - // "$JS.ACK....." + // "$JS.ACK....." reply: `MY.TEST.streamname.consumername.1.${counter}.${counter}.${Date.now()}.0`, }); diff --git a/jetstream/tests/next_test.ts b/jetstream/tests/next_test.ts index d12313f0..9f7bd72d 100644 --- a/jetstream/tests/next_test.ts +++ b/jetstream/tests/next_test.ts @@ -107,7 +107,7 @@ Deno.test("next - listener leaks", async () => { const m = await consumer.next(); if (m) { m.nak(); - if (m.info?.redeliveryCount > 100) { + if (m.info?.deliveryCount > 100) { break; } } @@ -203,3 +203,33 @@ Deno.test("next - consumer bind", async () => { await cleanup(ns, nc); }); + +Deno.test("next - delivery count", async () => { + const { ns, nc } = await setup(jetstreamServerConf()); + + const jsm = await jetstreamManager(nc); + await jsm.streams.add({ name: "A", subjects: ["hello"] }); + + await jsm.consumers.add("A", { + durable_name: "a", + deliver_policy: DeliverPolicy.All, + ack_policy: AckPolicy.Explicit, + max_deliver: 2, + ack_wait: nanos(1000), + }); + + const js = jetstream(nc); + await js.publish("hello"); + + const c = await js.consumers.get("A", "a"); + let m = await c.next(); + assertEquals(m?.info.deliveryCount, 1); + await delay(1500); + m = await c.next(); + await delay(1500); + assertEquals(m?.info.deliveryCount, 2); + m = await c.next({ expires: 1000 }); + assertEquals(m, null); + + await cleanup(ns, nc); +}); diff --git a/migration.md b/migration.md index a92b7c4b..52ea276e 100644 --- a/migration.md +++ b/migration.md @@ -144,6 +144,8 @@ To use JetStream, you must install and import `@nats/jetstream`. - The `ConsumerEvents` and `ConsumerDebugEvents` enum has been removed and replaced with `ConsumerNotification` which have a discriminating field `type`. The status objects provide a more specific API for querying those events. +- `JsMsg.info.redeliveryCount` was renamed to `JsMsg.info.deliveryCount` as it + tracks all delivery attempts, not just redeliveries ## Changes to KV