Skip to content

Commit 1e0bc90

Browse files
authored
KV Watch Multiple Filters Additional Test (#1114)
1 parent 9661dfc commit 1e0bc90

File tree

6 files changed

+53
-46
lines changed

6 files changed

+53
-46
lines changed

src/main/java/io/nats/client/KeyValue.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ public interface KeyValue {
168168
void purge(String key, long expectedRevision) throws IOException, JetStreamApiException;
169169

170170
/**
171-
* Watch updates for a specific key.
172-
* @param key the key. Also accepts a comma delimited list.
171+
* Watch updates for a specific key or keys.
172+
* @param key the key or a comma delimited list of keys.
173173
* @param watcher the watcher the implementation to receive changes
174174
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
175175
* @return The KeyValueWatchSubscription
@@ -181,8 +181,8 @@ public interface KeyValue {
181181
NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
182182

183183
/**
184-
* Watch updates for a specific key, starting at a specific revision.
185-
* @param key the key. Also accepts a comma delimited list.
184+
* Watch updates for a specific key or keys, starting at a specific revision.
185+
* @param key the key or a comma delimited list of keys.
186186
* @param watcher the watcher the implementation to receive changes
187187
* @param fromRevision the revision to start from
188188
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.

src/main/java/io/nats/client/impl/NatsKeyValue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,8 @@ public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher wa
260260

261261
@Override
262262
public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
263-
// all watch methods (watch, watch all) delegate to here
264-
validateKvKeyWildcardAllowedRequired(keys);
263+
// all watch methods (watch, watchAll) delegate to here
264+
validateKvKeysWildcardAllowedRequired(keys);
265265
validateNotNull(watcher, "Watcher is required");
266266
return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
267267
}

src/main/java/io/nats/client/support/Validator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public static String validatePrefixOrDomain(String s, String label, boolean requ
124124
});
125125
}
126126

127-
public static List<String> validateKvKeyWildcardAllowedRequired(List<String> keys) {
127+
public static List<String> validateKvKeysWildcardAllowedRequired(List<String> keys) {
128128
required(keys, "Key");
129129
for (String key : keys) {
130130
validateWildcardKvKey(key, "Key", true);

src/test/java/io/nats/client/impl/KeyValueTests.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,8 @@ public void testWatch() throws Exception {
914914
TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher("gtMetaWatcher", true, META_ONLY);
915915
TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher("multipleFullWatcher", true);
916916
TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher("multipleMetaWatcher", true, META_ONLY);
917+
TestKeyValueWatcher multipleFullWatcher2 = new TestKeyValueWatcher("multipleFullWatcher2", true);
918+
TestKeyValueWatcher multipleMetaWatcher2 = new TestKeyValueWatcher("multipleMetaWatcher2", true, META_ONLY);
917919
TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher("key1AfterWatcher", false, META_ONLY);
918920
TestKeyValueWatcher key1AfterIgDelWatcher = new TestKeyValueWatcher("key1AfterIgDelWatcher", false, META_ONLY, IGNORE_DELETE);
919921
TestKeyValueWatcher key1AfterStartNewWatcher = new TestKeyValueWatcher("key1AfterStartNewWatcher", false, META_ONLY, UPDATES_ONLY);
@@ -926,7 +928,7 @@ public void testWatch() throws Exception {
926928

927929
List<String> allKeys = Arrays.asList(TEST_WATCH_KEY_1, TEST_WATCH_KEY_2, TEST_WATCH_KEY_NULL);
928930

929-
runInJsServer(nc -> {
931+
jsServer.run(nc -> {
930932
_testWatch(nc, key1FullWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1FullWatcher, key1FullWatcher.watchOptions));
931933
_testWatch(nc, key1MetaWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1MetaWatcher, key1MetaWatcher.watchOptions));
932934
_testWatch(nc, key1StartNewWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1StartNewWatcher, key1StartNewWatcher.watchOptions));
@@ -943,6 +945,8 @@ public void testWatch() throws Exception {
943945
_testWatch(nc, gtMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtMetaWatcher, gtMetaWatcher.watchOptions));
944946
_testWatch(nc, multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions));
945947
_testWatch(nc, multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions));
948+
_testWatch(nc, multipleFullWatcher2, allExpecteds, -1, kv -> kv.watch(String.join(",", allKeys), multipleFullWatcher2, multipleFullWatcher.watchOptions));
949+
_testWatch(nc, multipleMetaWatcher2, allExpecteds, -1, kv -> kv.watch(String.join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher.watchOptions));
946950
_testWatch(nc, key1AfterWatcher, purgeOnlyExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterWatcher, key1AfterWatcher.watchOptions));
947951
_testWatch(nc, key1AfterIgDelWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.watchOptions));
948952
_testWatch(nc, key1AfterStartNewWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.watchOptions));
@@ -958,7 +962,7 @@ public void testWatch() throws Exception {
958962
private void _testWatch(Connection nc, TestKeyValueWatcher watcher, Object[] expectedKves, long fromRevision, TestWatchSubSupplier supplier) throws Exception {
959963
KeyValueManagement kvm = nc.keyValueManagement();
960964

961-
String bucket = watcher.name + "Bucket";
965+
String bucket = variant() + watcher.name + "Bucket";
962966
kvm.create(KeyValueConfiguration.builder()
963967
.name(bucket)
964968
.maxHistoryPerKey(10)
@@ -1022,7 +1026,6 @@ private void validateWatcher(Object[] expectedKves, TestKeyValueWatcher watcher)
10221026
long lastRevision = -1;
10231027

10241028
for (KeyValueEntry kve : watcher.entries) {
1025-
10261029
assertTrue(kve.getCreated().isAfter(lastCreated) || kve.getCreated().isEqual(lastCreated));
10271030
lastCreated = kve.getCreated();
10281031

@@ -1056,12 +1059,8 @@ else if (expected instanceof String) {
10561059
}
10571060
}
10581061

1059-
static final String BUCKET_CREATED_BY_USER_A = "bucketA";
1060-
static final String BUCKET_CREATED_BY_USER_I = "bucketI";
1061-
10621062
@Test
10631063
public void testWithAccount() throws Exception {
1064-
10651064
try (NatsTestServer ts = new NatsTestServer("src/test/resources/kv_account.conf", false)) {
10661065
Options acctA = new Options.Builder().server(ts.getURI()).userInfo("a", "a").build();
10671066
Options acctI = new Options.Builder().server(ts.getURI()).userInfo("i", "i").inboxPrefix("ForI").build();
@@ -1083,35 +1082,37 @@ public void testWithAccount() throws Exception {
10831082
KeyValueManagement kvmUserIBcktA = connUserI.keyValueManagement(jsOpt_UserI_BucketA_WithPrefix);
10841083
KeyValueManagement kvmUserIBcktI = connUserI.keyValueManagement(jsOpt_UserI_BucketI_WithPrefix);
10851084

1085+
String bucketA = bucket();
10861086
KeyValueConfiguration kvcA = KeyValueConfiguration.builder()
1087-
.name(BUCKET_CREATED_BY_USER_A).storageType(StorageType.Memory).maxHistoryPerKey(64).build();
1087+
.name(bucketA).storageType(StorageType.Memory).maxHistoryPerKey(64).build();
10881088

1089+
String bucketI = bucket();
10891090
KeyValueConfiguration kvcI = KeyValueConfiguration.builder()
1090-
.name(BUCKET_CREATED_BY_USER_I).storageType(StorageType.Memory).maxHistoryPerKey(64).build();
1091+
.name(bucketI).storageType(StorageType.Memory).maxHistoryPerKey(64).build();
10911092

10921093
// testing KVM API
1093-
assertEquals(BUCKET_CREATED_BY_USER_A, kvmUserA.create(kvcA).getBucketName());
1094-
assertEquals(BUCKET_CREATED_BY_USER_I, kvmUserIBcktI.create(kvcI).getBucketName());
1094+
assertEquals(bucketA, kvmUserA.create(kvcA).getBucketName());
1095+
assertEquals(bucketI, kvmUserIBcktI.create(kvcI).getBucketName());
10951096

1096-
assertKvAccountBucketNames(kvmUserA.getBucketNames());
1097-
assertKvAccountBucketNames(kvmUserIBcktI.getBucketNames());
1097+
assertKvAccountBucketNames(kvmUserA.getBucketNames(), bucketA, bucketI);
1098+
assertKvAccountBucketNames(kvmUserIBcktI.getBucketNames(), bucketA, bucketI);
10981099

1099-
assertEquals(BUCKET_CREATED_BY_USER_A, kvmUserA.getStatus(BUCKET_CREATED_BY_USER_A).getBucketName());
1100-
assertEquals(BUCKET_CREATED_BY_USER_A, kvmUserIBcktA.getStatus(BUCKET_CREATED_BY_USER_A).getBucketName());
1101-
assertEquals(BUCKET_CREATED_BY_USER_I, kvmUserA.getStatus(BUCKET_CREATED_BY_USER_I).getBucketName());
1102-
assertEquals(BUCKET_CREATED_BY_USER_I, kvmUserIBcktI.getStatus(BUCKET_CREATED_BY_USER_I).getBucketName());
1100+
assertEquals(bucketA, kvmUserA.getStatus(bucketA).getBucketName());
1101+
assertEquals(bucketA, kvmUserIBcktA.getStatus(bucketA).getBucketName());
1102+
assertEquals(bucketI, kvmUserA.getStatus(bucketI).getBucketName());
1103+
assertEquals(bucketI, kvmUserIBcktI.getStatus(bucketI).getBucketName());
11031104

11041105
// some more prep
1105-
KeyValue kv_connA_bucketA = connUserA.keyValue(BUCKET_CREATED_BY_USER_A);
1106-
KeyValue kv_connA_bucketI = connUserA.keyValue(BUCKET_CREATED_BY_USER_I);
1107-
KeyValue kv_connI_bucketA = connUserI.keyValue(BUCKET_CREATED_BY_USER_A, jsOpt_UserI_BucketA_WithPrefix);
1108-
KeyValue kv_connI_bucketI = connUserI.keyValue(BUCKET_CREATED_BY_USER_I, jsOpt_UserI_BucketI_WithPrefix);
1106+
KeyValue kv_connA_bucketA = connUserA.keyValue(bucketA);
1107+
KeyValue kv_connA_bucketI = connUserA.keyValue(bucketI);
1108+
KeyValue kv_connI_bucketA = connUserI.keyValue(bucketA, jsOpt_UserI_BucketA_WithPrefix);
1109+
KeyValue kv_connI_bucketI = connUserI.keyValue(bucketI, jsOpt_UserI_BucketI_WithPrefix);
11091110

11101111
// check the names
1111-
assertEquals(BUCKET_CREATED_BY_USER_A, kv_connA_bucketA.getBucketName());
1112-
assertEquals(BUCKET_CREATED_BY_USER_A, kv_connI_bucketA.getBucketName());
1113-
assertEquals(BUCKET_CREATED_BY_USER_I, kv_connA_bucketI.getBucketName());
1114-
assertEquals(BUCKET_CREATED_BY_USER_I, kv_connI_bucketI.getBucketName());
1112+
assertEquals(bucketA, kv_connA_bucketA.getBucketName());
1113+
assertEquals(bucketA, kv_connI_bucketA.getBucketName());
1114+
assertEquals(bucketI, kv_connA_bucketI.getBucketName());
1115+
assertEquals(bucketI, kv_connI_bucketI.getBucketName());
11151116

11161117
TestKeyValueWatcher watcher_connA_BucketA = new TestKeyValueWatcher("watcher_connA_BucketA", true);
11171118
TestKeyValueWatcher watcher_connA_BucketI = new TestKeyValueWatcher("watcher_connA_BucketI", true);
@@ -1154,10 +1155,10 @@ public void testWithAccount() throws Exception {
11541155
}
11551156
}
11561157

1157-
private void assertKvAccountBucketNames(List<String> bnames) {
1158+
private void assertKvAccountBucketNames(List<String> bnames, String bucketA, String bucketI) {
11581159
assertEquals(2, bnames.size());
1159-
assertTrue(bnames.contains(BUCKET_CREATED_BY_USER_A));
1160-
assertTrue(bnames.contains(BUCKET_CREATED_BY_USER_I));
1160+
assertTrue(bnames.contains(bucketA));
1161+
assertTrue(bnames.contains(bucketI));
11611162
}
11621163

11631164
private void assertKvAccountKeys(List<String> keys, String key1, String key2) {
@@ -1457,39 +1458,42 @@ public void testKeyValueMirrorCrossDomains() throws Exception {
14571458
KeyValueManagement leafKvm = leaf.keyValueManagement();
14581459

14591460
// Create main KV on HUB
1461+
String hubBucket = variant();
14601462
KeyValueStatus hubStatus = hubKvm.create(KeyValueConfiguration.builder()
1461-
.name("TEST")
1463+
.name(hubBucket)
14621464
.storageType(StorageType.Memory)
14631465
.build());
14641466

1465-
KeyValue hubKv = hub.keyValue("TEST");
1467+
KeyValue hubKv = hub.keyValue(hubBucket);
14661468
hubKv.put("key1", "aaa0");
14671469
hubKv.put("key2", "bb0");
14681470
hubKv.put("key3", "c0");
14691471
hubKv.delete("key3");
14701472

1473+
String leafBucket = variant();
1474+
String leafStream = "KV_" + leafBucket;
14711475
leafKvm.create(KeyValueConfiguration.builder()
1472-
.name("MIRROR")
1476+
.name(leafBucket)
14731477
.mirror(Mirror.builder()
1474-
.sourceName("TEST")
1478+
.sourceName(hubBucket)
14751479
.domain(null) // just for coverage!
1476-
.domain("HUB") // it will take this since it comes last
1480+
.domain(HUB_DOMAIN) // it will take this since it comes last
14771481
.build())
14781482
.build());
14791483

14801484
sleep(200); // make sure things get a chance to propagate
1481-
StreamInfo si = leaf.jetStreamManagement().getStreamInfo("KV_MIRROR");
1485+
StreamInfo si = leaf.jetStreamManagement().getStreamInfo(leafStream);
14821486
if (hub.getServerInfo().isSameOrNewerThanVersion("2.9")) {
14831487
assertTrue(si.getConfiguration().getMirrorDirect());
14841488
}
14851489
assertEquals(3, si.getStreamState().getMsgCount());
14861490

1487-
KeyValue leafKv = leaf.keyValue("MIRROR");
1491+
KeyValue leafKv = leaf.keyValue(leafBucket);
14881492
_testMirror(hubKv, leafKv, 1);
14891493

14901494
// Bind through leafnode connection but to origin KV.
14911495
KeyValue hubViaLeafKv =
1492-
leaf.keyValue("TEST", KeyValueOptions.builder().jsDomain("HUB").build());
1496+
leaf.keyValue(hubBucket, KeyValueOptions.builder().jsDomain(HUB_DOMAIN).build());
14931497
_testMirror(hubKv, hubViaLeafKv, 2);
14941498
});
14951499
}

src/test/java/io/nats/client/support/ValidatorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,9 @@ public void testValidateKvKeyWildcardAllowedRequired() {
289289

290290
List<String> nullList = null;
291291
//noinspection ConstantValue
292-
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(nullList));
293-
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(Collections.singletonList(null)));
294-
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(Collections.singletonList(HAS_SPACE)));
292+
assertThrows(IllegalArgumentException.class, () -> validateKvKeysWildcardAllowedRequired(nullList));
293+
assertThrows(IllegalArgumentException.class, () -> validateKvKeysWildcardAllowedRequired(Collections.singletonList(null)));
294+
assertThrows(IllegalArgumentException.class, () -> validateKvKeysWildcardAllowedRequired(Collections.singletonList(HAS_SPACE)));
295295
}
296296

297297
@Test

src/test/java/io/nats/client/utils/TestBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,9 @@ public static void runInExternalServer(String url, InServerTest inServerTest) th
277277
}
278278
}
279279

280+
public static String HUB_DOMAIN = "HUB";
281+
public static String LEAF_DOMAIN = "LEAF";
282+
280283
public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception {
281284
int hubPort = NatsTestServer.nextPort();
282285
int hubLeafPort = NatsTestServer.nextPort();

0 commit comments

Comments
 (0)