Skip to content

Commit 1f20a4d

Browse files
committed
moved js cluster tests to be in the same file to prevent collisions
1 parent c7b7b4a commit 1f20a4d

File tree

5 files changed

+197
-181
lines changed

5 files changed

+197
-181
lines changed

jetstream/tests/jetstream_test.ts

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,36 +1025,6 @@ Deno.test("jetstream - bind without consumer should fail", async () => {
10251025
await cleanup(ns, nc);
10261026
});
10271027

1028-
Deno.test("jetstream - mirror alternates", async () => {
1029-
const servers = await NatsServer.jetstreamCluster(3);
1030-
const nc = await connect({ port: servers[0].port });
1031-
if (await notCompatible(servers[0], nc, "2.8.2")) {
1032-
await NatsServer.stopAll(servers, true);
1033-
return;
1034-
}
1035-
1036-
const jsm = await jetstreamManager(nc);
1037-
await jsm.streams.add({ name: "src", subjects: ["A", "B"] });
1038-
1039-
const nc1 = await connect({ port: servers[1].port });
1040-
const jsm1 = await jetstreamManager(nc1);
1041-
1042-
await jsm1.streams.add({
1043-
name: "mirror",
1044-
mirror: {
1045-
name: "src",
1046-
},
1047-
});
1048-
1049-
const n = await jsm1.streams.find("A");
1050-
const si = await jsm1.streams.info(n);
1051-
assertEquals(si.alternates?.length, 2);
1052-
1053-
await nc.close();
1054-
await nc1.close();
1055-
await NatsServer.stopAll(servers, true);
1056-
});
1057-
10581028
Deno.test("jetstream - backoff", async () => {
10591029
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
10601030
if (await notCompatible(ns, nc, "2.7.2")) {

jetstream/tests/jscluster_test.ts

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import { jetstream, jetstreamManager } from "../jsclient.ts";
2+
import { connect, NatsServer, notCompatible } from "test_helpers";
3+
import {
4+
DiscardPolicy,
5+
RetentionPolicy,
6+
StorageType,
7+
StreamConfig,
8+
StreamUpdateConfig,
9+
} from "../jsapi_types.ts";
10+
import { nanos } from "@nats-io/nats-core/internal";
11+
import { Kvm } from "../../kv/kv.ts";
12+
13+
import {
14+
assertArrayIncludes,
15+
assertEquals,
16+
assertExists,
17+
assertRejects,
18+
fail,
19+
} from "jsr:@std/assert";
20+
21+
Deno.test("jetstream - mirror alternates", async () => {
22+
const servers = await NatsServer.jetstreamCluster(3);
23+
const nc = await connect({ port: servers[0].port });
24+
if (await notCompatible(servers[0], nc, "2.8.2")) {
25+
await NatsServer.stopAll(servers, true);
26+
return;
27+
}
28+
29+
const jsm = await jetstreamManager(nc);
30+
await jsm.streams.add({ name: "src", subjects: ["A", "B"] });
31+
32+
const nc1 = await connect({ port: servers[1].port });
33+
const jsm1 = await jetstreamManager(nc1);
34+
35+
await jsm1.streams.add({
36+
name: "mirror",
37+
mirror: {
38+
name: "src",
39+
},
40+
});
41+
42+
const n = await jsm1.streams.find("A");
43+
const si = await jsm1.streams.info(n);
44+
assertEquals(si.alternates?.length, 2);
45+
46+
await nc.close();
47+
await nc1.close();
48+
await NatsServer.stopAll(servers, true);
49+
});
50+
51+
Deno.test("jsm - stream update properties", async () => {
52+
const servers = await NatsServer.jetstreamCluster(3);
53+
const nc = await connect({ port: servers[0].port });
54+
55+
const jsm = await jetstreamManager(nc, { timeout: 3000 });
56+
57+
await jsm.streams.add({
58+
name: "a",
59+
storage: StorageType.File,
60+
subjects: ["x"],
61+
});
62+
63+
let sn = "n";
64+
await jsm.streams.add({
65+
name: sn,
66+
storage: StorageType.File,
67+
subjects: ["subj"],
68+
duplicate_window: nanos(30 * 1000),
69+
});
70+
71+
async function updateOption(
72+
opt: Partial<StreamUpdateConfig | StreamConfig>,
73+
shouldFail = false,
74+
): Promise<void> {
75+
try {
76+
const si = await jsm.streams.update(sn, opt);
77+
for (const v of Object.keys(opt)) {
78+
const sc = si.config;
79+
//@ts-ignore: test
80+
assertEquals(sc[v], opt[v]);
81+
}
82+
if (shouldFail) {
83+
fail("expected to fail with update: " + JSON.stringify(opt));
84+
}
85+
} catch (err) {
86+
if (!shouldFail) {
87+
fail(err.message);
88+
}
89+
}
90+
}
91+
92+
await updateOption({ name: "nn" }, true);
93+
await updateOption({ retention: RetentionPolicy.Interest }, true);
94+
await updateOption({ storage: StorageType.Memory }, true);
95+
await updateOption({ max_consumers: 5 }, true);
96+
97+
await updateOption({ subjects: ["subj", "a"] });
98+
await updateOption({ description: "xx" });
99+
await updateOption({ max_msgs_per_subject: 5 });
100+
await updateOption({ max_msgs: 100 });
101+
await updateOption({ max_age: nanos(45 * 1000) });
102+
await updateOption({ max_bytes: 10240 });
103+
await updateOption({ max_msg_size: 10240 });
104+
await updateOption({ discard: DiscardPolicy.New });
105+
await updateOption({ no_ack: true });
106+
await updateOption({ duplicate_window: nanos(15 * 1000) });
107+
await updateOption({ allow_rollup_hdrs: true });
108+
await updateOption({ allow_rollup_hdrs: false });
109+
await updateOption({ num_replicas: 3 });
110+
await updateOption({ num_replicas: 1 });
111+
await updateOption({ deny_delete: true });
112+
await updateOption({ deny_purge: true });
113+
await updateOption({ sources: [{ name: "a" }] });
114+
await updateOption({ sealed: true });
115+
await updateOption({ sealed: false }, true);
116+
117+
await jsm.streams.add({ name: "m", mirror: { name: "a" } });
118+
sn = "m";
119+
await updateOption({ mirror: { name: "nn" } }, true);
120+
121+
await nc.close();
122+
await NatsServer.stopAll(servers, true);
123+
});
124+
125+
Deno.test("streams - mirrors", async () => {
126+
const cluster = await NatsServer.jetstreamCluster(3);
127+
const nc = await connect({ port: cluster[0].port });
128+
const jsm = await jetstreamManager(nc);
129+
130+
// create a stream in a different server in the cluster
131+
await jsm.streams.add({
132+
name: "src",
133+
subjects: ["src.*"],
134+
placement: {
135+
cluster: cluster[1].config.cluster.name,
136+
tags: cluster[1].config.server_tags,
137+
},
138+
});
139+
140+
// create a mirror in the server we connected
141+
await jsm.streams.add({
142+
name: "mirror",
143+
placement: {
144+
cluster: cluster[2].config.cluster.name,
145+
tags: cluster[2].config.server_tags,
146+
},
147+
mirror: {
148+
name: "src",
149+
},
150+
});
151+
152+
const js = jetstream(nc);
153+
const s = await js.streams.get("src");
154+
assertExists(s);
155+
assertEquals(s.name, "src");
156+
157+
const alternates = await s.alternates();
158+
assertEquals(2, alternates.length);
159+
assertArrayIncludes(alternates.map((a) => a.name), ["src", "mirror"]);
160+
161+
await assertRejects(
162+
async () => {
163+
await js.streams.get("another");
164+
},
165+
Error,
166+
"stream not found",
167+
);
168+
169+
const s2 = await s.best();
170+
const selected = (await s.info(true)).alternates?.[0]?.name ?? "";
171+
assertEquals(s2.name, selected);
172+
173+
await nc.close();
174+
await NatsServer.stopAll(cluster, true);
175+
});
176+
177+
Deno.test("kv - replicas", async () => {
178+
const servers = await NatsServer.jetstreamCluster(3);
179+
const nc = await connect({ port: servers[0].port });
180+
const js = jetstream(nc);
181+
182+
const b = await new Kvm(js).create("a", { replicas: 3 });
183+
const status = await b.status();
184+
185+
const jsm = await jetstreamManager(nc);
186+
let si = await jsm.streams.info(status.streamInfo.config.name);
187+
assertEquals(si.config.num_replicas, 3);
188+
189+
si = await jsm.streams.update(status.streamInfo.config.name, {
190+
num_replicas: 1,
191+
});
192+
assertEquals(si.config.num_replicas, 1);
193+
194+
await nc.close();
195+
await NatsServer.stopAll(servers, true);
196+
});

jetstream/tests/jsm_test.ts

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import {
4343
DiscardPolicy,
4444
jetstream,
4545
jetstreamManager,
46-
RetentionPolicy,
4746
StorageType,
4847
} from "../mod.ts";
4948

@@ -55,7 +54,6 @@ import type {
5554
StreamConfig,
5655
StreamInfo,
5756
StreamSource,
58-
StreamUpdateConfig,
5957
} from "../mod.ts";
6058
import { initStream } from "./jstest_util.ts";
6159
import {
@@ -1180,80 +1178,6 @@ Deno.test("jsm - stream update preserves other value", async () => {
11801178
await cleanup(ns, nc);
11811179
});
11821180

1183-
Deno.test("jsm - stream update properties", async () => {
1184-
const servers = await NatsServer.jetstreamCluster(3);
1185-
const nc = await connect({ port: servers[0].port });
1186-
1187-
const jsm = await jetstreamManager(nc, { timeout: 3000 });
1188-
1189-
await jsm.streams.add({
1190-
name: "a",
1191-
storage: StorageType.File,
1192-
subjects: ["x"],
1193-
});
1194-
1195-
let sn = "n";
1196-
await jsm.streams.add({
1197-
name: sn,
1198-
storage: StorageType.File,
1199-
subjects: ["subj"],
1200-
duplicate_window: nanos(30 * 1000),
1201-
});
1202-
1203-
async function updateOption(
1204-
opt: Partial<StreamUpdateConfig | StreamConfig>,
1205-
shouldFail = false,
1206-
): Promise<void> {
1207-
try {
1208-
const si = await jsm.streams.update(sn, opt);
1209-
for (const v of Object.keys(opt)) {
1210-
const sc = si.config;
1211-
//@ts-ignore: test
1212-
assertEquals(sc[v], opt[v]);
1213-
}
1214-
if (shouldFail) {
1215-
fail("expected to fail with update: " + JSON.stringify(opt));
1216-
}
1217-
} catch (err) {
1218-
if (!shouldFail) {
1219-
fail(err.message);
1220-
}
1221-
}
1222-
}
1223-
1224-
await updateOption({ name: "nn" }, true);
1225-
await updateOption({ retention: RetentionPolicy.Interest }, true);
1226-
await updateOption({ storage: StorageType.Memory }, true);
1227-
await updateOption({ max_consumers: 5 }, true);
1228-
1229-
await updateOption({ subjects: ["subj", "a"] });
1230-
await updateOption({ description: "xx" });
1231-
await updateOption({ max_msgs_per_subject: 5 });
1232-
await updateOption({ max_msgs: 100 });
1233-
await updateOption({ max_age: nanos(45 * 1000) });
1234-
await updateOption({ max_bytes: 10240 });
1235-
await updateOption({ max_msg_size: 10240 });
1236-
await updateOption({ discard: DiscardPolicy.New });
1237-
await updateOption({ no_ack: true });
1238-
await updateOption({ duplicate_window: nanos(15 * 1000) });
1239-
await updateOption({ allow_rollup_hdrs: true });
1240-
await updateOption({ allow_rollup_hdrs: false });
1241-
await updateOption({ num_replicas: 3 });
1242-
await updateOption({ num_replicas: 1 });
1243-
await updateOption({ deny_delete: true });
1244-
await updateOption({ deny_purge: true });
1245-
await updateOption({ sources: [{ name: "a" }] });
1246-
await updateOption({ sealed: true });
1247-
await updateOption({ sealed: false }, true);
1248-
1249-
await jsm.streams.add({ name: "m", mirror: { name: "a" } });
1250-
sn = "m";
1251-
await updateOption({ mirror: { name: "nn" } }, true);
1252-
1253-
await nc.close();
1254-
await NatsServer.stopAll(servers, true);
1255-
});
1256-
12571181
Deno.test("jsm - direct getMessage", async () => {
12581182
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
12591183

0 commit comments

Comments
 (0)