Skip to content

Commit 35c3e80

Browse files
committed
Ability to supply a name for an ordered consumer.
* Original Ordered Push Consumers can supply a name. * Key Value and Object Store watchers can supply a name * Simplified ordered consumers can supply a prefix in OrderedConsumerConfiguration
1 parent 090e75f commit 35c3e80

15 files changed

+117
-59
lines changed

src/main/java/io/nats/client/KeyValueManagement.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface KeyValueManagement {
2828
* @param config the key value configuration
2929
* @return bucket info
3030
* @throws IOException covers various communication issues with the NATS
31-
* server such as timeout or interruption
31+
* server, such as timeout or interruption
3232
* @throws JetStreamApiException the request had an error related to the data
3333
* @throws IllegalArgumentException the server is not JetStream enabled
3434
*/
@@ -39,7 +39,7 @@ public interface KeyValueManagement {
3939
* @param config the key value configuration
4040
* @return bucket info
4141
* @throws IOException covers various communication issues with the NATS
42-
* server such as timeout or interruption
42+
* server, such as timeout or interruption
4343
* @throws JetStreamApiException the request had an error related to the data
4444
* @throws IllegalArgumentException the server is not JetStream enabled
4545
*/
@@ -49,7 +49,7 @@ public interface KeyValueManagement {
4949
* Get the list of bucket names.
5050
* @return list of bucket names
5151
* @throws IOException covers various communication issues with the NATS
52-
* server such as timeout or interruption
52+
* server, such as timeout or interruption
5353
* @throws JetStreamApiException the request had an error related to the data
5454
*/
5555
List<String> getBucketNames() throws IOException, JetStreamApiException;
@@ -59,7 +59,7 @@ public interface KeyValueManagement {
5959
* @deprecated Use {@link #getStatus(String)} instead.
6060
* @param bucketName the bucket name to use
6161
* @throws IOException covers various communication issues with the NATS
62-
* server such as timeout or interruption
62+
* server, such as timeout or interruption
6363
* @throws JetStreamApiException the request had an error related to the data
6464
* @return the bucket status object
6565
*/
@@ -70,7 +70,7 @@ public interface KeyValueManagement {
7070
* Gets the status for an existing bucket.
7171
* @param bucketName the bucket name to use
7272
* @throws IOException covers various communication issues with the NATS
73-
* server such as timeout or interruption
73+
* server, such as timeout or interruption
7474
* @throws JetStreamApiException the request had an error related to the data
7575
* @return the bucket status object
7676
*/
@@ -80,7 +80,7 @@ public interface KeyValueManagement {
8080
* Get the statuses for all buckets
8181
* @return list of statuses
8282
* @throws IOException covers various communication issues with the NATS
83-
* server such as timeout or interruption
83+
* server, such as timeout or interruption
8484
* @throws JetStreamApiException the request had an error related to the data
8585
*/
8686
List<KeyValueStatus> getStatuses() throws IOException, JetStreamApiException;
@@ -89,7 +89,7 @@ public interface KeyValueManagement {
8989
* Deletes an existing bucket. Will throw a JetStreamApiException if the delete fails.
9090
* @param bucketName the stream name to use.
9191
* @throws IOException covers various communication issues with the NATS
92-
* server such as timeout or interruption
92+
* server, such as timeout or interruption
9393
* @throws JetStreamApiException the request had an error related to the data
9494
*/
9595
void delete(String bucketName) throws IOException, JetStreamApiException;

src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ public class OrderedConsumerConfiguration implements JsonSerializable {
3636
private ZonedDateTime startTime;
3737
private ReplayPolicy replayPolicy;
3838
private Boolean headersOnly;
39+
private String consumerNamePrefix;
3940

4041
/**
4142
* OrderedConsumerConfiguration creation works like a builder.
4243
* The builder supports chaining and will create a default set of options if
43-
* no methods are calls, including setting the filter subject to "&gt;"
44+
* no methods are calls, including setting the filter subject to &gt;
4445
*/
4546
public OrderedConsumerConfiguration() {
4647
startSequence = ConsumerConfiguration.LONG_UNSET;
@@ -67,7 +68,7 @@ public OrderedConsumerConfiguration(JsonValue v) throws JsonParseException {
6768

6869
/**
6970
* Returns a JSON representation of this ordered consumer configuration.
70-
* @return json ordered consumer configuration json string
71+
* @return JSON ordered consumer configuration JSON string
7172
*/
7273
public String toJson() {
7374
StringBuilder sb = beginJson();
@@ -89,7 +90,7 @@ public String toJson() {
8990
/**
9091
* Sets the filter subject of the OrderedConsumerConfiguration.
9192
* @param filterSubject the filter subject
92-
* @return Builder
93+
* @return The Builder
9394
*/
9495
public OrderedConsumerConfiguration filterSubject(String filterSubject) {
9596
return filterSubjects(Collections.singletonList(filterSubject));
@@ -98,7 +99,7 @@ public OrderedConsumerConfiguration filterSubject(String filterSubject) {
9899
/**
99100
* Sets the filter subjects of the OrderedConsumerConfiguration.
100101
* @param filterSubject the filter subject
101-
* @return Builder
102+
* @return The Builder
102103
*/
103104
public OrderedConsumerConfiguration filterSubjects(String... filterSubject) {
104105
return filterSubjects(Arrays.asList(filterSubject));
@@ -107,14 +108,14 @@ public OrderedConsumerConfiguration filterSubjects(String... filterSubject) {
107108
/**
108109
* Sets the filter subject of the OrderedConsumerConfiguration.
109110
* @param filterSubjects one or more filter subjects
110-
* @return Builder
111+
* @return The Builder
111112
*/
112113
public OrderedConsumerConfiguration filterSubjects(List<String> filterSubjects) {
113114
this.filterSubjects.clear();
114115
for (String fs : filterSubjects) {
115-
String fsean = emptyAsNull(fs);
116-
if (fsean != null) {
117-
this.filterSubjects.add(fsean);
116+
String fsEan = emptyAsNull(fs);
117+
if (fsEan != null) {
118+
this.filterSubjects.add(fsEan);
118119
}
119120
}
120121
if (this.filterSubjects.isEmpty()) {
@@ -126,7 +127,7 @@ public OrderedConsumerConfiguration filterSubjects(List<String> filterSubjects)
126127
/**
127128
* Sets the delivery policy of the OrderedConsumerConfiguration.
128129
* @param deliverPolicy the delivery policy.
129-
* @return Builder
130+
* @return The Builder
130131
*/
131132
public OrderedConsumerConfiguration deliverPolicy(DeliverPolicy deliverPolicy) {
132133
this.deliverPolicy = deliverPolicy;
@@ -136,7 +137,7 @@ public OrderedConsumerConfiguration deliverPolicy(DeliverPolicy deliverPolicy) {
136137
/**
137138
* Sets the start sequence of the OrderedConsumerConfiguration.
138139
* @param startSequence the start sequence
139-
* @return Builder
140+
* @return The Builder
140141
*/
141142
public OrderedConsumerConfiguration startSequence(long startSequence) {
142143
this.startSequence = startSequence < 1 ? ConsumerConfiguration.LONG_UNSET : startSequence;
@@ -146,7 +147,7 @@ public OrderedConsumerConfiguration startSequence(long startSequence) {
146147
/**
147148
* Sets the start time of the OrderedConsumerConfiguration.
148149
* @param startTime the start time
149-
* @return Builder
150+
* @return The Builder
150151
*/
151152
public OrderedConsumerConfiguration startTime(ZonedDateTime startTime) {
152153
this.startTime = startTime;
@@ -156,7 +157,7 @@ public OrderedConsumerConfiguration startTime(ZonedDateTime startTime) {
156157
/**
157158
* Sets the replay policy of the OrderedConsumerConfiguration.
158159
* @param replayPolicy the replay policy.
159-
* @return Builder
160+
* @return The Builder
160161
*/
161162
public OrderedConsumerConfiguration replayPolicy(ReplayPolicy replayPolicy) {
162163
this.replayPolicy = replayPolicy;
@@ -167,13 +168,23 @@ public OrderedConsumerConfiguration replayPolicy(ReplayPolicy replayPolicy) {
167168
* set the headers only flag saying to deliver only the headers of
168169
* messages in the stream and not the bodies
169170
* @param headersOnly the flag
170-
* @return Builder
171+
* @return The Builder
171172
*/
172173
public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) {
173174
this.headersOnly = headersOnly != null && headersOnly ? true : null;
174175
return this;
175176
}
176177

178+
/**
179+
* Sets the consumer name prefix for consumers created by this configuration.
180+
* @param consumerNamePrefix the prefix or null to clear.
181+
* @return The Builder
182+
*/
183+
public OrderedConsumerConfiguration consumerNamePrefix(String consumerNamePrefix) {
184+
this.consumerNamePrefix = emptyAsNull(consumerNamePrefix);
185+
return this;
186+
}
187+
177188
public String getFilterSubject() {
178189
return filterSubjects == null || filterSubjects.size() != 1 ? null : filterSubjects.get(0);
179190
}
@@ -205,4 +216,8 @@ public ReplayPolicy getReplayPolicy() {
205216
public Boolean getHeadersOnly() {
206217
return headersOnly;
207218
}
219+
220+
public String getConsumerNamePrefix() {
221+
return consumerNamePrefix;
222+
}
208223
}

src/main/java/io/nats/client/api/Watcher.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
public interface Watcher<T> {
2020

2121
/**
22-
* Called when an object has been updated
23-
*
22+
* Called when an object has been updated.
2423
* @param t The watched object
2524
*/
2625
void watch(T t);
@@ -30,4 +29,13 @@ public interface Watcher<T> {
3029
* or if there is data, the first time the watch exhausts all existing data.
3130
*/
3231
void endOfData();
32+
33+
/**
34+
* The watcher can supply a consumer name to be used when creating the internal watch consumer,
35+
* improving the ability to monitor the consumer.
36+
* @return the name, or null if not needed, which is the default interface implementation.
37+
*/
38+
default String consumerName() {
39+
return null;
40+
}
3341
}

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
3737
private final ReentrantLock stateLock;
3838
private final NatsStreamContext streamCtx;
3939
private final boolean ordered;
40-
private final ConsumerConfiguration originalOrderedCc;
41-
private final String subscribeSubject;
40+
private final ConsumerConfiguration orderedConsumerConfigTemplate;
41+
private final String orderedConsumerNamePrefix;
4242
private final PullSubscribeOptions unorderedBindPso;
4343

4444
private final AtomicReference<ConsumerInfo> cachedConsumerInfo;
@@ -47,7 +47,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
4747
private final AtomicReference<Dispatcher> defaultDispatcher;
4848
private final AtomicReference<NatsMessageConsumerBase> lastConsumer;
4949

50-
NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration orderedCc) {
50+
NatsConsumerContext(NatsStreamContext sc, ConsumerInfo unorderedConsumerInfo, OrderedConsumerConfiguration occ) {
5151
stateLock = new ReentrantLock();
5252
streamCtx = sc;
5353
cachedConsumerInfo = new AtomicReference<>();
@@ -57,23 +57,23 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
5757
lastConsumer = new AtomicReference<>();
5858
if (unorderedConsumerInfo != null) {
5959
ordered = false;
60-
originalOrderedCc = null;
61-
subscribeSubject = null;
60+
orderedConsumerNamePrefix = null;
61+
orderedConsumerConfigTemplate = null;
6262
cachedConsumerInfo.set(unorderedConsumerInfo);
6363
consumerName.set(unorderedConsumerInfo.getName());
6464
unorderedBindPso = PullSubscribeOptions.fastBind(sc.streamName, unorderedConsumerInfo.getName());
6565
}
6666
else {
6767
ordered = true;
68-
originalOrderedCc = ConsumerConfiguration.builder()
69-
.filterSubjects(orderedCc.getFilterSubjects())
70-
.deliverPolicy(orderedCc.getDeliverPolicy())
71-
.startSequence(orderedCc.getStartSequence())
72-
.startTime(orderedCc.getStartTime())
73-
.replayPolicy(orderedCc.getReplayPolicy())
74-
.headersOnly(orderedCc.getHeadersOnly())
68+
orderedConsumerNamePrefix = occ.getConsumerNamePrefix();
69+
orderedConsumerConfigTemplate = ConsumerConfiguration.builder()
70+
.filterSubjects(occ.getFilterSubjects())
71+
.deliverPolicy(occ.getDeliverPolicy())
72+
.startSequence(occ.getStartSequence())
73+
.startTime(occ.getStartTime())
74+
.replayPolicy(occ.getReplayPolicy())
75+
.headersOnly(occ.getHeadersOnly())
7576
.build();
76-
subscribeSubject = Validator.validateSubject(originalOrderedCc.getFilterSubject(), false);
7777
unorderedBindPso = null;
7878
}
7979
}
@@ -86,6 +86,7 @@ static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Bui
8686
}
8787
}
8888

89+
int x = 0;
8990
@Override
9091
public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher, PullMessageManager optionalPmm, Long optionalInactiveThreshold) throws IOException, JetStreamApiException {
9192
PullSubscribeOptions pso;
@@ -94,8 +95,9 @@ public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Di
9495
if (lastCon != null) {
9596
highestSeq.set(Math.max(highestSeq.get(), lastCon.pmm.lastStreamSeq));
9697
}
98+
consumerName.set(orderedConsumerNamePrefix == null ? null : orderedConsumerNamePrefix + NUID.nextGlobalSequence());
9799
ConsumerConfiguration cc = streamCtx.js.consumerConfigurationForOrdered(
98-
originalOrderedCc, highestSeq.get(), null, null, optionalInactiveThreshold);
100+
orderedConsumerConfigTemplate, highestSeq.get(), null, consumerName.get(), optionalInactiveThreshold);
99101
pso = new OrderedPullSubscribeOptionsBuilder(streamCtx.streamName, cc).build();
100102
}
101103
else {
@@ -104,7 +106,7 @@ public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Di
104106

105107
if (messageHandler == null) {
106108
return (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
107-
subscribeSubject, null, pso, null, null, null, false, optionalPmm);
109+
null, null, pso, null, null, null, false, optionalPmm);
108110
}
109111

110112
Dispatcher d = userDispatcher;
@@ -116,7 +118,7 @@ public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Di
116118
}
117119
}
118120
return (NatsJetStreamPullSubscription) streamCtx.js.createSubscription(
119-
subscribeSubject, null, pso, null, (NatsDispatcher) d, messageHandler, false, optionalPmm);
121+
null, null, pso, null, (NatsDispatcher) d, messageHandler, false, optionalPmm);
120122
}
121123

122124
private void checkState() throws IOException {
@@ -197,7 +199,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
197199

198200
try {
199201
long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
200-
nmcb = new NatsMessageConsumerBase(cachedConsumerInfo.get());
202+
nmcb = new NatsMessageConsumerBase(consumerName.get(), cachedConsumerInfo.get());
201203
nmcb.initSub(subscribe(null, null, null, inactiveThreshold));
202204
nmcb.sub._pull(PullRequestOptions.builder(1)
203205
.expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT)
@@ -258,7 +260,7 @@ public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOExc
258260
stateLock.lock();
259261
checkState();
260262
Validator.required(fetchConsumeOptions, "Fetch Consume Options");
261-
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
263+
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, consumerName.get(), cachedConsumerInfo.get(), fetchConsumeOptions));
262264
}
263265
finally {
264266
stateLock.unlock();
@@ -282,7 +284,7 @@ public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOExceptio
282284
stateLock.lock();
283285
checkState();
284286
Validator.required(consumeOptions, "Consume Options");
285-
return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, cachedConsumerInfo.get(), consumeOptions));
287+
return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, consumerName.get(), cachedConsumerInfo.get(), consumeOptions));
286288
}
287289
finally {
288290
stateLock.unlock();
@@ -323,7 +325,7 @@ public MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher userDis
323325
checkState();
324326
Validator.required(handler, "Message Handler");
325327
Validator.required(consumeOptions, "Consume Options");
326-
return trackConsume(new NatsMessageConsumer(this, cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
328+
return trackConsume(new NatsMessageConsumer(this, consumerName.get(), cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
327329
}
328330
finally {
329331
stateLock.unlock();

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
2929
private long startNanos;
3030

3131
NatsFetchConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
32+
String consumerName,
3233
ConsumerInfo cachedConsumerInfo,
3334
FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException
3435
{
35-
super(cachedConsumerInfo);
36+
super(consumerName, cachedConsumerInfo);
3637

3738
boolean isNoWait = fetchConsumeOptions.isNoWait();
3839
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();

src/main/java/io/nats/client/impl/NatsIterableConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsumer {
2323

24-
NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException {
25-
super(subscriptionMaker, cachedConsumerInfo, opts, null, null);
24+
NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, String consumerName, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException {
25+
super(subscriptionMaker, consumerName, cachedConsumerInfo, opts, null, null);
2626
}
2727

2828
/**

src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,6 @@ public void onMessage(Message m) throws InterruptedException {
7070
readSubjects.add(kv.readSubject(keyPattern.trim()));
7171
}
7272

73-
finishInit(kv, readSubjects, deliverPolicy, headersOnly, fromRevision, handler);
73+
finishInit(kv, readSubjects, deliverPolicy, headersOnly, fromRevision, handler, watcher.consumerName());
7474
}
7575
}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
2727
protected final MessageHandler userMessageHandler;
2828

2929
NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
30+
String consumerName,
3031
ConsumerInfo cachedConsumerInfo,
3132
ConsumeOptions consumeOpts,
3233
Dispatcher userDispatcher,
3334
final MessageHandler userMessageHandler) throws IOException, JetStreamApiException
3435
{
35-
super(cachedConsumerInfo);
36+
super(consumerName, cachedConsumerInfo);
3637

3738
this.subscriptionMaker = subscriptionMaker;
3839
this.consumeOpts = consumeOpts;

0 commit comments

Comments
 (0)