Skip to content

Commit 354198c

Browse files
committed
fix(kv): fixed an issue where kv sources were not properly processed
1 parent 0664232 commit 354198c

File tree

3 files changed

+39
-1
lines changed

3 files changed

+39
-1
lines changed

jetstream/jsmstream_api.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
254254
context: string,
255255
src: Partial<StreamSource>,
256256
): void {
257-
const count = src.subject_transforms?.length || 0;
257+
const count = src?.subject_transforms?.length || 0;
258258
if (count > 0) {
259259
const { min, ok } = nci.features.get(
260260
Feature.JS_STREAM_SOURCE_SUBJECT_TRANSFORM,

jetstream/kv.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,14 +267,25 @@ export class Bucket implements KV, KvRemove {
267267
} else if (opts.sources) {
268268
const sources = opts.sources.map((s) => {
269269
const c = Object.assign({}, s) as StreamSource;
270+
const srcBucketName = c.name.startsWith(kvPrefix)
271+
? c.name.substring(kvPrefix.length)
272+
: c.name;
270273
if (!c.name.startsWith(kvPrefix)) {
271274
c.name = `${kvPrefix}${c.name}`;
272275
}
276+
if (!s.external && srcBucketName !== this.bucket) {
277+
c.subject_transforms = [
278+
{ src: `$KV.${srcBucketName}.>`, dest: `$KV.${this.bucket}.>` },
279+
];
280+
}
281+
return c;
273282
});
274283
sc.sources = sources as unknown[] as StreamSource[];
284+
sc.subjects = [this.subjectForBucket()];
275285
} else {
276286
sc.subjects = [this.subjectForBucket()];
277287
}
288+
278289
if (opts.metadata) {
279290
sc.metadata = opts.metadata;
280291
}

jetstream/tests/kv_test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2210,3 +2210,30 @@ Deno.test("kv - keys filter", async () => {
22102210

22112211
await cleanup(ns, nc);
22122212
});
2213+
2214+
Deno.test("kv - sourced", async () => {
2215+
const { ns, nc } = await setup(
2216+
jetstreamServerConf({}),
2217+
);
2218+
if (await notCompatible(ns, nc, "2.6.3")) {
2219+
return;
2220+
}
2221+
2222+
const js = nc.jetstream();
2223+
const source = await js.views.kv("source");
2224+
const target = await js.views.kv("target", {
2225+
sources: [{ name: "source" }],
2226+
});
2227+
2228+
await source.put("hello", "world");
2229+
for (let i = 0; i < 10; i++) {
2230+
const v = await target.get("hello");
2231+
if (v === null) {
2232+
await delay(250);
2233+
continue;
2234+
}
2235+
assertEquals(v.string(), "world");
2236+
}
2237+
2238+
await cleanup(ns, nc);
2239+
});

0 commit comments

Comments
 (0)