Skip to content

Commit 9661dfc

Browse files
authored
KV Watch Multiple Filters (#1113)
1 parent bb8a5b3 commit 9661dfc

9 files changed

+105
-24
lines changed

src/main/java/io/nats/client/KeyValue.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public interface KeyValue {
169169

170170
/**
171171
* Watch updates for a specific key.
172-
* @param key the key
172+
* @param key the key. Also accepts a comma delimited list.
173173
* @param watcher the watcher the implementation to receive changes
174174
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
175175
* @return The KeyValueWatchSubscription
@@ -182,7 +182,7 @@ public interface KeyValue {
182182

183183
/**
184184
* Watch updates for a specific key, starting at a specific revision.
185-
* @param key the key
185+
* @param key the key. Also accepts a comma delimited list.
186186
* @param watcher the watcher the implementation to receive changes
187187
* @param fromRevision the revision to start from
188188
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
@@ -194,6 +194,33 @@ public interface KeyValue {
194194
*/
195195
NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
196196

197+
/**
198+
* Watch updates for specific keys.
199+
* @param keys the keys
200+
* @param watcher the watcher the implementation to receive changes
201+
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
202+
* @return The KeyValueWatchSubscription
203+
* @throws IOException covers various communication issues with the NATS
204+
* server such as timeout or interruption
205+
* @throws JetStreamApiException the request had an error related to the data
206+
* @throws InterruptedException if the thread is interrupted
207+
*/
208+
NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
209+
210+
/**
211+
* Watch updates for specific keys, starting at a specific revision.
212+
* @param keys the keys
213+
* @param watcher the watcher the implementation to receive changes
214+
* @param fromRevision the revision to start from
215+
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
216+
* @return The KeyValueWatchSubscription
217+
* @throws IOException covers various communication issues with the NATS
218+
* server such as timeout or interruption
219+
* @throws JetStreamApiException the request had an error related to the data
220+
* @throws InterruptedException if the thread is interrupted
221+
*/
222+
NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
223+
197224
/**
198225
* Watch updates for all keys.
199226
* @param watcher the watcher the implementation to receive changes

src/main/java/io/nats/client/impl/NatsKeyValue.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.nio.charset.StandardCharsets;
2626
import java.time.ZonedDateTime;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Collections;
2830
import java.util.List;
2931

3032
import static io.nats.client.support.NatsConstants.DOT;
@@ -237,28 +239,41 @@ private PublishAck _write(String key, byte[] data, Headers h) throws IOException
237239

238240
@Override
239241
public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
240-
validateKvKeyWildcardAllowedRequired(key);
241-
validateNotNull(watcher, "Watcher is required");
242-
return new NatsKeyValueWatchSubscription(this, key, watcher, -1, watchOptions);
242+
if (key.contains(",")) {
243+
return watch(Arrays.asList(key.split(",")), watcher, -1, watchOptions);
244+
}
245+
return watch(Collections.singletonList(key), watcher, -1, watchOptions);
243246
}
244247

245248
@Override
246249
public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
247-
validateKvKeyWildcardAllowedRequired(key);
250+
if (key.contains(",")) {
251+
return watch(Arrays.asList(key.split(",")), watcher, fromRevision, watchOptions);
252+
}
253+
return watch(Collections.singletonList(key), watcher, fromRevision, watchOptions);
254+
}
255+
256+
@Override
257+
public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
258+
return watch(keys, watcher, -1, watchOptions);
259+
}
260+
261+
@Override
262+
public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
263+
// all watch methods (watch, watch all) delegate to here
264+
validateKvKeyWildcardAllowedRequired(keys);
248265
validateNotNull(watcher, "Watcher is required");
249-
return new NatsKeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions);
266+
return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
250267
}
251268

252269
@Override
253270
public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
254-
validateNotNull(watcher, "Watcher is required");
255-
return new NatsKeyValueWatchSubscription(this, ">", watcher, -1, watchOptions);
271+
return new NatsKeyValueWatchSubscription(this, Collections.singletonList(">"), watcher, -1, watchOptions);
256272
}
257273

258274
@Override
259275
public NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
260-
validateNotNull(watcher, "Watcher is required");
261-
return new NatsKeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions);
276+
return new NatsKeyValueWatchSubscription(this, Collections.singletonList(">"), watcher, fromRevision, watchOptions);
262277
}
263278

264279
/**

src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,22 @@
1818
import io.nats.client.api.*;
1919

2020
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
2124

2225
public class NatsKeyValueWatchSubscription extends NatsWatchSubscription<KeyValueEntry> {
2326

2427
public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException {
28+
this(kv, Collections.singletonList(keyPattern), watcher, fromRevision, watchOptions);
29+
}
30+
31+
public NatsKeyValueWatchSubscription(NatsKeyValue kv, List<String> keyPatterns, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException {
2532
super(kv.js);
33+
kwWatchInit(kv, keyPatterns, watcher, fromRevision, watchOptions);
34+
}
2635

36+
private void kwWatchInit(NatsKeyValue kv, List<String> keyPatterns, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption[] watchOptions) throws IOException, JetStreamApiException {
2737
// figure out the result options
2838
boolean headersOnly = false;
2939
boolean ignoreDeletes = false;
@@ -54,6 +64,12 @@ public void onMessage(Message m) throws InterruptedException {
5464
}
5565
};
5666

57-
finishInit(kv, kv.readSubject(keyPattern), deliverPolicy, headersOnly, fromRevision, handler);
67+
// convert each key to a read subject
68+
List<String> readSubjects = new ArrayList<>();
69+
for (String keyPattern : keyPatterns) {
70+
readSubjects.add(kv.readSubject(keyPattern.trim()));
71+
}
72+
73+
finishInit(kv, readSubjects, deliverPolicy, headersOnly, fromRevision, handler);
5874
}
5975
}

src/main/java/io/nats/client/impl/NatsObjectStoreWatchSubscription.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.nats.client.api.ObjectStoreWatcher;
2222

2323
import java.io.IOException;
24+
import java.util.Collections;
2425

2526
import static io.nats.client.api.ConsumerConfiguration.ULONG_UNSET;
2627

@@ -58,6 +59,6 @@ public void onMessage(Message m) throws InterruptedException {
5859
}
5960
};
6061

61-
finishInit(os, os.rawAllMetaSubject(), deliverPolicy, headersOnly, ULONG_UNSET, handler);
62+
finishInit(os, Collections.singletonList(os.rawAllMetaSubject()), deliverPolicy, headersOnly, ULONG_UNSET, handler);
6263
}
6364
}

src/main/java/io/nats/client/impl/NatsWatchSubscription.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.nats.client.api.Watcher;
2121

2222
import java.io.IOException;
23+
import java.util.List;
2324

2425
import static io.nats.client.api.ConsumerConfiguration.ULONG_UNSET;
2526

@@ -32,15 +33,15 @@ public NatsWatchSubscription(JetStream js) {
3233
this.js = js;
3334
}
3435

35-
protected void finishInit(NatsFeatureBase fb, String subscribeSubject, DeliverPolicy deliverPolicy, boolean headersOnly, long fromRevision, WatchMessageHandler<T> handler)
36+
protected void finishInit(NatsFeatureBase fb, List<String> subscribeSubjects, DeliverPolicy deliverPolicy, boolean headersOnly, long fromRevision, WatchMessageHandler<T> handler)
3637
throws IOException, JetStreamApiException
3738
{
3839
if (fromRevision > ULONG_UNSET) {
3940
deliverPolicy = DeliverPolicy.ByStartSequence;
4041
}
4142
else {
4243
fromRevision = ULONG_UNSET; // easier on the builder since we aren't starting at a fromRevision
43-
if (deliverPolicy == DeliverPolicy.New || fb._getLast(subscribeSubject) == null) {
44+
if (deliverPolicy == DeliverPolicy.New) {
4445
handler.sendEndOfData();
4546
}
4647
}
@@ -53,12 +54,12 @@ protected void finishInit(NatsFeatureBase fb, String subscribeSubject, DeliverPo
5354
.deliverPolicy(deliverPolicy)
5455
.startSequence(fromRevision)
5556
.headersOnly(headersOnly)
56-
.filterSubject(subscribeSubject)
57+
.filterSubjects(subscribeSubjects)
5758
.build())
5859
.build();
5960

6061
dispatcher = (NatsDispatcher) ((NatsJetStream) js).conn.createDispatcher();
61-
sub = js.subscribe(subscribeSubject, dispatcher, handler, false, pso);
62+
sub = js.subscribe(null, dispatcher, handler, false, pso);
6263
if (!handler.endOfDataSent) {
6364
long pending = sub.getConsumerInfo().getCalculatedPending();
6465
if (pending == 0) {

src/main/java/io/nats/client/support/Validator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ public static String validatePrefixOrDomain(String s, String label, boolean requ
124124
});
125125
}
126126

127+
public static List<String> validateKvKeyWildcardAllowedRequired(List<String> keys) {
128+
required(keys, "Key");
129+
for (String key : keys) {
130+
validateWildcardKvKey(key, "Key", true);
131+
}
132+
return keys;
133+
}
134+
127135
public static String validateKvKeyWildcardAllowedRequired(String s) {
128136
return validateWildcardKvKey(s, "Key", true);
129137
}

src/test/java/io/nats/client/impl/KeyValueTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,8 @@ public void testWatch() throws Exception {
912912
TestKeyValueWatcher starMetaWatcher = new TestKeyValueWatcher("starMetaWatcher", true, META_ONLY);
913913
TestKeyValueWatcher gtFullWatcher = new TestKeyValueWatcher("gtFullWatcher", true);
914914
TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher("gtMetaWatcher", true, META_ONLY);
915+
TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher("multipleFullWatcher", true);
916+
TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher("multipleMetaWatcher", true, META_ONLY);
915917
TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher("key1AfterWatcher", false, META_ONLY);
916918
TestKeyValueWatcher key1AfterIgDelWatcher = new TestKeyValueWatcher("key1AfterIgDelWatcher", false, META_ONLY, IGNORE_DELETE);
917919
TestKeyValueWatcher key1AfterStartNewWatcher = new TestKeyValueWatcher("key1AfterStartNewWatcher", false, META_ONLY, UPDATES_ONLY);
@@ -922,6 +924,8 @@ public void testWatch() throws Exception {
922924
TestKeyValueWatcher key1FromRevisionAfterWatcher = new TestKeyValueWatcher("key1FromRevisionAfterWatcher", false);
923925
TestKeyValueWatcher allFromRevisionAfterWatcher = new TestKeyValueWatcher("allFromRevisionAfterWatcher", false);
924926

927+
List<String> allKeys = Arrays.asList(TEST_WATCH_KEY_1, TEST_WATCH_KEY_2, TEST_WATCH_KEY_NULL);
928+
925929
runInJsServer(nc -> {
926930
_testWatch(nc, key1FullWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1FullWatcher, key1FullWatcher.watchOptions));
927931
_testWatch(nc, key1MetaWatcher, key1AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1MetaWatcher, key1MetaWatcher.watchOptions));
@@ -937,6 +941,8 @@ public void testWatch() throws Exception {
937941
_testWatch(nc, starMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.*", starMetaWatcher, starMetaWatcher.watchOptions));
938942
_testWatch(nc, gtFullWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtFullWatcher, gtFullWatcher.watchOptions));
939943
_testWatch(nc, gtMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtMetaWatcher, gtMetaWatcher.watchOptions));
944+
_testWatch(nc, multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions));
945+
_testWatch(nc, multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions));
940946
_testWatch(nc, key1AfterWatcher, purgeOnlyExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterWatcher, key1AfterWatcher.watchOptions));
941947
_testWatch(nc, key1AfterIgDelWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.watchOptions));
942948
_testWatch(nc, key1AfterStartNewWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.watchOptions));

src/test/java/io/nats/client/impl/ObjectStoreTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ public void testWorkflow() throws Exception {
5454
.build();
5555

5656
ObjectStoreStatus status = osm.create(osc);
57-
validateStatus(status, bucket, desc, metadata);
58-
validateStatus(osm.getStatus(bucket), bucket, desc, metadata);
57+
validateStatus(status, bucket, desc);
58+
validateStatus(osm.getStatus(bucket), bucket, desc);
5959

6060
JetStreamManagement jsm = nc.jetStreamManagement();
6161
assertNotNull(jsm.getStreamInfo("OBJ_" + bucket));
@@ -68,7 +68,7 @@ public void testWorkflow() throws Exception {
6868
ObjectStore os = nc.objectStore(bucket);
6969
nc.objectStore(bucket, ObjectStoreOptions.builder(DEFAULT_JS_OPTIONS).build()); // coverage;
7070

71-
validateStatus(os.getStatus(), bucket, desc, metadata);
71+
validateStatus(os.getStatus(), bucket, desc);
7272

7373
// object not found errors
7474
assertClientError(OsObjectNotFound, () -> os.get("notFound", new ByteArrayOutputStream()));
@@ -159,7 +159,7 @@ public void testWorkflow() throws Exception {
159159
});
160160
}
161161

162-
private static void validateStatus(ObjectStoreStatus status, String bucket, String desc, Map<String, String> metadata) {
162+
private static void validateStatus(ObjectStoreStatus status, String bucket, String desc) {
163163
assertEquals(bucket, status.getBucketName());
164164
assertEquals(desc, status.getDescription());
165165
assertFalse(status.isSealed());
@@ -229,7 +229,7 @@ private static Object[] getInput(int size) throws IOException {
229229
File found = null;
230230
long foundLen = Long.MAX_VALUE;
231231
final String classPath = System.getProperty("java.class.path", ".");
232-
final String[] classPathElements = classPath.split(System.getProperty("path.separator"));
232+
final String[] classPathElements = classPath.split(File.pathSeparator);
233233
for(final String element : classPathElements){
234234
File f = new File(element);
235235
if (f.isFile()) {

src/test/java/io/nats/client/support/ValidatorTests.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public void testValidateBucketName() {
266266
}
267267

268268
@Test
269-
public void testValidateWildcardKeyRequired() {
269+
public void testValidateKvKeyWildcardAllowedRequired() {
270270
validateKvKeyWildcardAllowedRequired(PLAIN);
271271
validateKvKeyWildcardAllowedRequired(PLAIN.toUpperCase());
272272
validateKvKeyWildcardAllowedRequired(HAS_DASH);
@@ -277,14 +277,21 @@ public void testValidateWildcardKeyRequired() {
277277
validateKvKeyWildcardAllowedRequired(STAR_NOT_SEGMENT);
278278
validateKvKeyWildcardAllowedRequired(GT_NOT_SEGMENT);
279279
validateKvKeyWildcardAllowedRequired("numbers9ok");
280-
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(null));
280+
String nullKey = null;
281+
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(nullKey));
281282
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_SPACE));
282283
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_DOLLAR));
283284
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_LOW));
284285
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_127));
285286
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(HAS_TIC));
286287
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired("colon:isbetween9andA"));
287288
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(".starts.with.dot.not.allowed"));
289+
290+
List<String> nullList = null;
291+
//noinspection ConstantValue
292+
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(nullList));
293+
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(Collections.singletonList(null)));
294+
assertThrows(IllegalArgumentException.class, () -> validateKvKeyWildcardAllowedRequired(Collections.singletonList(HAS_SPACE)));
288295
}
289296

290297
@Test

0 commit comments

Comments
 (0)