Skip to content

Commit 5168575

Browse files
committed
changes
1 parent 1c061a9 commit 5168575

File tree

3 files changed

+79
-9
lines changed

3 files changed

+79
-9
lines changed

jetstream/kv.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 The NATS Authors
2+
* Copyright 2021-2024 The NATS Authors
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
55
* You may obtain a copy of the License at
@@ -639,12 +639,16 @@ export class Bucket implements KV, KvRemove {
639639
}
640640

641641
_buildCC(
642-
k: string,
642+
k: string | string[],
643643
content: KvWatchInclude,
644644
opts: Partial<ConsumerConfig> = {},
645645
): Partial<ConsumerConfig> {
646-
const ek = this.encodeKey(k);
647-
this.validateSearchKey(k);
646+
const a = !Array.isArray(k) ? [k] : k;
647+
let filter_subjects: string[] | undefined = a.map((k) => {
648+
const ek = this.encodeKey(k);
649+
this.validateSearchKey(k);
650+
return this.fullKeyName(ek);
651+
});
648652

649653
let deliver_policy = DeliverPolicy.LastPerSubject;
650654
if (content === KvWatchInclude.AllHistory) {
@@ -654,10 +658,17 @@ export class Bucket implements KV, KvRemove {
654658
deliver_policy = DeliverPolicy.New;
655659
}
656660

661+
let filter_subject: undefined | string = undefined;
662+
if (filter_subjects.length === 1) {
663+
filter_subject = filter_subjects[0];
664+
filter_subjects = undefined;
665+
}
666+
657667
return Object.assign({
658668
deliver_policy,
659669
"ack_policy": AckPolicy.None,
660-
"filter_subject": this.fullKeyName(ek),
670+
filter_subjects,
671+
filter_subject,
661672
"flow_control": true,
662673
"idle_heartbeat": nanos(5 * 1000),
663674
}, opts) as Partial<ConsumerConfig>;
@@ -668,7 +679,7 @@ export class Bucket implements KV, KvRemove {
668679
}
669680

670681
async history(
671-
opts: { key?: string; headers_only?: boolean } = {},
682+
opts: { key?: string | string[]; headers_only?: boolean } = {},
672683
): Promise<QueuedIterator<KvEntry>> {
673684
const k = opts.key ?? ">";
674685
const qi = new QueuedIteratorImpl<KvEntry>();

jetstream/tests/kv_test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,31 @@ Deno.test("kv - history", async () => {
301301
await cleanup(ns, nc);
302302
});
303303

304+
Deno.test("kv - history multiple keys", async () => {
305+
const { ns, nc } = await setup(
306+
jetstreamServerConf({}),
307+
);
308+
const n = nuid.next();
309+
const js = nc.jetstream();
310+
const bucket = await js.views.kv(n, { history: 2 });
311+
312+
await bucket.put("A", Empty);
313+
await bucket.put("B", Empty);
314+
await bucket.put("C", Empty);
315+
await bucket.put("D", Empty);
316+
317+
const iter = await bucket.history({ key: ["A", "D"] });
318+
const buf = [];
319+
for await (const e of iter) {
320+
buf.push(e.key);
321+
}
322+
323+
assertEquals(buf.length, 2);
324+
assertArrayIncludes(buf, ["A", "D"]);
325+
326+
await cleanup(ns, nc);
327+
});
328+
304329
Deno.test("kv - cleanups/empty", async () => {
305330
const { ns, nc } = await setup(
306331
jetstreamServerConf({}),
@@ -1808,6 +1833,38 @@ Deno.test("kv - watch updates only", async () => {
18081833
await cleanup(ns, nc);
18091834
});
18101835

1836+
Deno.test("kv - watch multiple keys", async () => {
1837+
const { ns, nc } = await setup(jetstreamServerConf({}));
1838+
1839+
const js = nc.jetstream();
1840+
const kv = await js.views.kv("K");
1841+
1842+
await kv.put("a", "a");
1843+
await kv.put("b", "b");
1844+
await kv.put("c", "c");
1845+
1846+
const d = deferred();
1847+
const iter = await kv.watch({
1848+
key: ["a", "c"],
1849+
initializedFn: () => {
1850+
d.resolve();
1851+
},
1852+
});
1853+
1854+
const notifications: string[] = [];
1855+
(async () => {
1856+
for await (const e of iter) {
1857+
notifications.push(e.key);
1858+
}
1859+
})().then();
1860+
await d;
1861+
1862+
assertEquals(notifications.length, 2);
1863+
assertArrayIncludes(notifications, ["a", "c"]);
1864+
1865+
await cleanup(ns, nc);
1866+
});
1867+
18111868
Deno.test("kv - watch history", async () => {
18121869
const { ns, nc } = await setup(jetstreamServerConf({}));
18131870

jetstream/types.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 The NATS Authors
2+
* Copyright 2023-2024 The NATS Authors
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
55
* You may obtain a copy of the License at
@@ -1132,8 +1132,9 @@ export enum KvWatchInclude {
11321132
export type KvWatchOptions = {
11331133
/**
11341134
* A key or wildcarded key following keys as if they were NATS subject names.
1135+
* Note you can specify multiple keys if running on server 2.10.x or better.
11351136
*/
1136-
key?: string;
1137+
key?: string | string[];
11371138
/**
11381139
* Notification should only include entry headers
11391140
*/
@@ -1173,9 +1174,10 @@ export interface RoKV {
11731174

11741175
/**
11751176
* Returns an iterator of the specified key's history (or all keys).
1177+
* Note you can specify multiple keys if running on server 2.10.x or better.
11761178
* @param opts
11771179
*/
1178-
history(opts?: { key?: string }): Promise<QueuedIterator<KvEntry>>;
1180+
history(opts?: { key?: string | string[] }): Promise<QueuedIterator<KvEntry>>;
11791181

11801182
/**
11811183
* Returns an iterator that will yield KvEntry updates as they happen.

0 commit comments

Comments
 (0)