Skip to content

Commit 59b625c

Browse files
authored
Additional hardening of the object store client (#256)
* Additional hardening of the object store client - made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance. - changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset. enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects Signed-off-by: Alberto Ricart <[email protected]> * - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations. - objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically. - objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed. Signed-off-by: Alberto Ricart <[email protected]> --------- Signed-off-by: Alberto Ricart <[email protected]>
1 parent 77f2f06 commit 59b625c

14 files changed

+96
-27
lines changed

jetstream/deno.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/jetstream",
3-
"version": "3.0.0",
3+
"version": "3.0.1-2",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"

jetstream/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/jetstream",
3-
"version": "3.0.0",
3+
"version": "3.0.1-2",
44
"files": [
55
"lib/",
66
"LICENSE",

jetstream/src/internal_mod.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ export type {
9595
ThresholdMessages,
9696
} from "./types.ts";
9797

98+
export type { PushConsumerMessagesImpl } from "./pushconsumer.ts";
99+
98100
export type { StreamNames } from "./jsbaseclient_api.ts";
99101
export type {
100102
AccountLimits,

jetstream/src/jsmstream_api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ export class ConsumersImpl implements Consumers {
185185
cc.inactive_threshold = nanos(5 * 60 * 1000);
186186
cc.num_replicas = 1;
187187
cc.max_deliver = 1;
188+
cc.flow_control = true;
189+
cc.idle_heartbeat = nanos(30_000);
188190

189191
if (Array.isArray(filter_subjects)) {
190192
cc.filter_subjects = filter_subjects;

jetstream/src/pushconsumer.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
252252
ms,
253253
(count): boolean => {
254254
this.notify({ type: "heartbeats_missed", count });
255+
if (this.ordered) {
256+
this.reset();
257+
}
255258
return false;
256259
},
257260
{ maxOut: 2 },

jetstream/tests/pushconsumers_ordered_test.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
* limitations under the License.
1414
*/
1515

16-
import { assertEquals, assertExists, assertRejects } from "jsr:@std/assert";
16+
import {
17+
assertEquals,
18+
assertExists,
19+
assertFalse,
20+
assertRejects,
21+
} from "jsr:@std/assert";
1722
import { DeliverPolicy, jetstream, jetstreamManager } from "../src/mod.ts";
1823
import type { JsMsg } from "../src/mod.ts";
1924

@@ -241,3 +246,48 @@ Deno.test("ordered push consumers - sub leak", async () => {
241246

242247
await cleanup(ns, nc);
243248
});
249+
250+
Deno.test("push consumers - flow control", async () => {
251+
const { ns, nc } = await setup(jetstreamServerConf());
252+
const jsm = await jetstreamManager(nc);
253+
await jsm.streams.add({ name: "test", subjects: ["test.*"] });
254+
const js = jetstream(nc);
255+
256+
async function checkPushConsumer(c: PushConsumerImpl) {
257+
const ci = await c.info(true);
258+
assertEquals(c.ordered, true);
259+
assertEquals(ci.config.flow_control, true);
260+
}
261+
262+
await (checkPushConsumer(
263+
await js.consumers.getPushConsumer("test") as PushConsumerImpl,
264+
));
265+
await (checkPushConsumer(
266+
await js.consumers.getPushConsumer("test", {
267+
headers_only: true,
268+
}) as PushConsumerImpl,
269+
));
270+
271+
let ci = await jsm.consumers.add("test", { deliver_subject: "foo" });
272+
const c = await js.consumers.getPushConsumer(
273+
"test",
274+
ci.name,
275+
) as PushConsumerImpl;
276+
assertFalse(c.ordered);
277+
ci = await c.info(true);
278+
assertEquals(ci.config.flow_control, undefined);
279+
assertEquals(ci.config.idle_heartbeat, undefined);
280+
281+
const buf = [];
282+
nc.subscribe(`$JS.CONSUMER.CREATE.>`, {
283+
callback: (_, msg) => {
284+
buf.push(msg.subject);
285+
},
286+
});
287+
288+
await js.consumers.getBoundPushConsumer({ deliver_subject: "foo" });
289+
await nc.flush();
290+
assertEquals(buf.length, 0);
291+
292+
await cleanup(ns, nc);
293+
});

kv/deno.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/kv",
3-
"version": "3.0.0",
3+
"version": "3.0.1-1",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"
@@ -34,6 +34,6 @@
3434
},
3535
"imports": {
3636
"@nats-io/nats-core": "jsr:@nats-io/[email protected]",
37-
"@nats-io/jetstream": "jsr:@nats-io/[email protected].0"
37+
"@nats-io/jetstream": "jsr:@nats-io/[email protected].1-2"
3838
}
3939
}

kv/import_map.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"imports": {
33
"@nats-io/nats-core": "jsr:@nats-io/[email protected]",
44
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0/internal",
5-
"@nats-io/jetstream": "jsr:@nats-io/[email protected].0",
6-
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0/internal",
5+
"@nats-io/jetstream": "jsr:@nats-io/[email protected].1-2",
6+
"@nats-io/jetstream/internal": "jsr:@nats-io/[email protected].1-2/internal",
77
"test_helpers": "../test_helpers/mod.ts",
88
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
99
"@nats-io/nuid": "jsr:@nats-io/[email protected]",

kv/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/kv",
3-
"version": "3.0.0",
3+
"version": "3.0.1-1",
44
"files": [
55
"lib/",
66
"LICENSE",
@@ -33,7 +33,7 @@
3333
},
3434
"description": "kv library - this library implements all the base functionality for NATS KV javascript clients",
3535
"dependencies": {
36-
"@nats-io/jetstream": "3.0.0",
36+
"@nats-io/jetstream": "3.0.1-2",
3737
"@nats-io/nats-core": "3.0.0"
3838
},
3939
"devDependencies": {

obj/deno.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/obj",
3-
"version": "3.0.1",
3+
"version": "3.0.2-2",
44
"exports": {
55
".": "./src/mod.ts",
66
"./internal": "./src/internal_mod.ts"
@@ -34,7 +34,7 @@
3434
},
3535
"imports": {
3636
"@nats-io/nats-core": "jsr:@nats-io/[email protected]",
37-
"@nats-io/jetstream": "jsr:@nats-io/[email protected].0",
37+
"@nats-io/jetstream": "jsr:@nats-io/[email protected].1-2",
3838
"js-sha256": "npm:[email protected]"
3939
}
4040
}

obj/import_map.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"imports": {
33
"@nats-io/nats-core": "jsr:@nats-io/[email protected]",
44
"@nats-io/nats-core/internal": "jsr:@nats-io/nats-core@~3.0.0/internal",
5-
"@nats-io/jetstream": "jsr:@nats-io/[email protected].0",
6-
"@nats-io/jetstream/internal": "jsr:@nats-io/jetstream@~3.0.0/internal",
5+
"@nats-io/jetstream": "jsr:@nats-io/[email protected].1-2",
6+
"@nats-io/jetstream/internal": "jsr:@nats-io/[email protected].1-2/internal",
77
"test_helpers": "../test_helpers/mod.ts",
88
"@nats-io/nkeys": "jsr:@nats-io/[email protected]",
99
"@nats-io/nuid": "jsr:@nats-io/[email protected]",

obj/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@nats-io/obj",
3-
"version": "3.0.1",
3+
"version": "3.0.2-2",
44
"files": [
55
"lib/",
66
"LICENSE",
@@ -33,7 +33,7 @@
3333
},
3434
"description": "obj library - this library implements all the base functionality for NATS objectstore for javascript clients",
3535
"dependencies": {
36-
"@nats-io/jetstream": "3.0.0",
36+
"@nats-io/jetstream": "3.0.1-2",
3737
"@nats-io/nats-core": "3.0.0",
3838
"js-sha256": "^0.11.0"
3939
},

obj/src/objectstore.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
Feature,
2525
headers,
2626
MsgHdrsImpl,
27+
nanos,
2728
nuid,
2829
QueuedIteratorImpl,
2930
} from "@nats-io/nats-core/internal";
@@ -38,6 +39,7 @@ import type {
3839
ListerFieldFilter,
3940
PubAck,
4041
PurgeResponse,
42+
PushConsumerMessagesImpl,
4143
StorageType,
4244
StreamConfig,
4345
StreamInfo,
@@ -467,7 +469,6 @@ export class ObjectStoreImpl implements ObjectStore {
467469

468470
const d = deferred<ObjectInfo>();
469471

470-
const proms: Promise<unknown>[] = [];
471472
const db = new DataBuffer();
472473
try {
473474
const reader = rs ? rs.getReader() : null;
@@ -484,11 +485,8 @@ export class ObjectStoreImpl implements ObjectStore {
484485
sha.update(payload);
485486
info.chunks!++;
486487
info.size! += payload.length;
487-
proms.push(this.js.publish(chunkSubj, payload, { timeout }));
488+
await this.js.publish(chunkSubj, payload, { timeout });
488489
}
489-
// wait for all the chunks to write
490-
await Promise.all(proms);
491-
proms.length = 0;
492490

493491
// prepare the metadata
494492
info.mtime = new Date().toISOString();
@@ -541,9 +539,7 @@ export class ObjectStoreImpl implements ObjectStore {
541539
info.size! += maxChunk;
542540
const payload = db.drain(meta.options.max_chunk_size);
543541
sha.update(payload);
544-
proms.push(
545-
this.js.publish(chunkSubj, payload, { timeout }),
546-
);
542+
await this.js.publish(chunkSubj, payload, { timeout });
547543
}
548544
}
549545
}
@@ -669,8 +665,10 @@ export class ObjectStoreImpl implements ObjectStore {
669665

670666
const cc: Partial<ConsumerConfig> = {};
671667
cc.filter_subject = `$O.${this.name}.C.${info.nuid}`;
668+
cc.idle_heartbeat = nanos(30_000);
669+
cc.flow_control = true;
672670
const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
673-
const iter = await oc.consume();
671+
const iter = await oc.consume() as PushConsumerMessagesImpl;
674672

675673
(async () => {
676674
for await (const jm of iter) {
@@ -876,6 +874,20 @@ export class ObjectStoreImpl implements ObjectStore {
876874
},
877875
});
878876

877+
(async () => {
878+
for await (const s of iter.status()) {
879+
switch (s.type) {
880+
case "heartbeat":
881+
if (historyOnly) {
882+
// we got all the keys...
883+
qi.push(() => {
884+
qi.stop();
885+
});
886+
}
887+
}
888+
}
889+
})().then();
890+
879891
if (historyOnly && count === 0) {
880892
iter.stop();
881893
}

transport-node/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@
6060
"@nats-io/nuid": "2.0.3"
6161
},
6262
"devDependencies": {
63-
"@nats-io/jetstream": "3.0.0",
64-
"@nats-io/kv": "3.0.0",
65-
"@nats-io/obj": "3.0.0",
63+
"@nats-io/jetstream": "3.0.1-2",
64+
"@nats-io/kv": "3.0.1-1",
65+
"@nats-io/obj": "3.0.2-2",
6666
"@types/node": "^22.10.10",
6767
"minimist": "^1.2.8",
6868
"shx": "^0.3.3",

0 commit comments

Comments
 (0)