Skip to content

Commit 4726883

Browse files
committed
Prefix only and comprehensive unit testing.
1 parent 35c3e80 commit 4726883

15 files changed

+283
-178
lines changed

src/main/java/io/nats/client/BaseConsumerContext.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public interface BaseConsumerContext {
3131
* Read the next message with max wait set to {@value BaseConsumeOptions#DEFAULT_EXPIRES_IN_MILLIS} ms
3232
* @return the next message or null if the max wait expires
3333
* @throws IOException covers various communication issues with the NATS
34-
* server such as timeout or interruption
35-
* @throws InterruptedException if one is thrown, in order to propagate it up
34+
* server, such as timeout or interruption
35+
* @throws InterruptedException if one is thrown, to propagate it up
3636
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
3737
* such as the consumer was deleted on the server in the middle of use.
3838
* @throws JetStreamApiException the request had an error related to the data
@@ -44,8 +44,8 @@ public interface BaseConsumerContext {
4444
* @param maxWait duration of max wait. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
4545
* @return the next message or null if the max wait expires
4646
* @throws IOException covers various communication issues with the NATS
47-
* server such as timeout or interruption
48-
* @throws InterruptedException if one is thrown, in order to propagate it up
47+
* server, such as timeout or interruption
48+
* @throws InterruptedException if one is thrown, to propagate it up
4949
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
5050
* such as the consumer was deleted on the server in the middle of use.
5151
* @throws JetStreamApiException the request had an error related to the data
@@ -57,8 +57,8 @@ public interface BaseConsumerContext {
5757
* @param maxWaitMillis the max wait value in milliseconds. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
5858
* @return the next message or null if the max wait expires
5959
* @throws IOException covers various communication issues with the NATS
60-
* server such as timeout or interruption
61-
* @throws InterruptedException if one is thrown, in order to propagate it up
60+
* server, such as timeout or interruption
61+
* @throws InterruptedException if one is thrown, to propagate it up
6262
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
6363
* such as the consumer was deleted on the server in the middle of use.
6464
* @throws JetStreamApiException the request had an error related to the data
@@ -67,10 +67,10 @@ public interface BaseConsumerContext {
6767

6868
/**
6969
* Start a one use Fetch Consumer using all defaults other than the number of messages. See {@link FetchConsumer}
70-
* @param maxMessages the maximum number of message to consume
70+
* @param maxMessages the maximum number of messages to consume
7171
* @return the FetchConsumer instance
7272
* @throws IOException covers various communication issues with the NATS
73-
* server such as timeout or interruption
73+
* server, such as timeout or interruption
7474
* @throws JetStreamApiException the request had an error related to the data
7575
*/
7676
FetchConsumer fetchMessages(int maxMessages) throws IOException, JetStreamApiException;
@@ -80,38 +80,38 @@ public interface BaseConsumerContext {
8080
* @param maxBytes the maximum number of bytes to consume
8181
* @return the FetchConsumer instance
8282
* @throws IOException covers various communication issues with the NATS
83-
* server such as timeout or interruption
83+
* server, such as timeout or interruption
8484
* @throws JetStreamApiException the request had an error related to the data
8585
*/
8686
FetchConsumer fetchBytes(int maxBytes) throws IOException, JetStreamApiException;
8787

8888
/**
89-
* Start a one use Fetch Consumer with complete custom consume options. See {@link FetchConsumer}
89+
* Start a one-use Fetch Consumer with complete custom consume options. See {@link FetchConsumer}
9090
* @param fetchConsumeOptions the custom fetch consume options. See {@link FetchConsumeOptions}
9191
* @return the FetchConsumer instance
9292
* @throws IOException covers various communication issues with the NATS
93-
* server such as timeout or interruption
93+
* server, such as timeout or interruption
9494
* @throws JetStreamApiException the request had an error related to the data
9595
*/
9696
FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException;
9797

9898
/**
9999
* Start a long-running IterableConsumer with default ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions}
100-
* IterableConsumer require the developer call nextMessage.
100+
* IterableConsumer require the developer calls nextMessage.
101101
* @return the IterableConsumer instance
102102
* @throws IOException covers various communication issues with the NATS
103-
* server such as timeout or interruption
103+
* server, such as timeout or interruption
104104
* @throws JetStreamApiException the request had an error related to the data
105105
*/
106106
IterableConsumer iterate() throws IOException, JetStreamApiException;
107107

108108
/**
109109
* Start a long-running IterableConsumer with custom ConsumeOptions. See {@link IterableConsumer} and {@link ConsumeOptions}
110-
* IterableConsumer requires the developer call nextMessage.
110+
* IterableConsumer requires the developer calls nextMessage.
111111
* @param consumeOptions the custom consume options
112112
* @return the IterableConsumer instance
113113
* @throws IOException covers various communication issues with the NATS
114-
* server such as timeout or interruption
114+
* server, such as timeout or interruption
115115
* @throws JetStreamApiException the request had an error related to the data
116116
*/
117117
IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOException, JetStreamApiException;
@@ -121,7 +121,7 @@ public interface BaseConsumerContext {
121121
* @param handler the MessageHandler used for receiving messages.
122122
* @return the MessageConsumer instance
123123
* @throws IOException covers various communication issues with the NATS
124-
* server such as timeout or interruption
124+
* server, such as timeout or interruption
125125
* @throws JetStreamApiException the request had an error related to the data
126126
*/
127127
MessageConsumer consume(MessageHandler handler) throws IOException, JetStreamApiException;
@@ -130,10 +130,10 @@ public interface BaseConsumerContext {
130130
* Start a long-running MessageConsumer with default ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
131131
*
132132
* @param dispatcher The dispatcher to handle this subscription
133-
* @param handler the MessageHandler used for receiving messages.
133+
* @param handler the MessageHandler used for receiving messages.
134134
* @return the MessageConsumer instance
135-
* @throws IOException covers various communication issues with the NATS
136-
* server such as timeout or interruption
135+
* @throws IOException covers various communication issues with the NATS
136+
* server, such as timeout or interruption
137137
* @throws JetStreamApiException the request had an error related to the data
138138
*/
139139
MessageConsumer consume(Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException;
@@ -142,10 +142,10 @@ public interface BaseConsumerContext {
142142
* Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
143143
*
144144
* @param consumeOptions the custom consume options
145-
* @param handler the MessageHandler used for receiving messages.
145+
* @param handler the MessageHandler used for receiving messages.
146146
* @return the MessageConsumer instance
147-
* @throws IOException covers various communication issues with the NATS
148-
* server such as timeout or interruption
147+
* @throws IOException covers various communication issues with the NATS
148+
* server, such as timeout or interruption
149149
* @throws JetStreamApiException the request had an error related to the data
150150
*/
151151
MessageConsumer consume(ConsumeOptions consumeOptions, MessageHandler handler) throws IOException, JetStreamApiException;
@@ -154,11 +154,11 @@ public interface BaseConsumerContext {
154154
* Start a long-running MessageConsumer with custom ConsumeOptions. See {@link MessageConsumer} and {@link ConsumeOptions}
155155
*
156156
* @param consumeOptions the custom consume options
157-
* @param dispatcher The dispatcher to handle this subscription
158-
* @param handler the MessageHandler used for receiving messages.
157+
* @param dispatcher the dispatcher to handle this subscription
158+
* @param handler the MessageHandler used for receiving messages.
159159
* @return the MessageConsumer instance
160-
* @throws IOException covers various communication issues with the NATS
161-
* server such as timeout or interruption
160+
* @throws IOException covers various communication issues with the NATS
161+
* server, such as timeout or interruption
162162
* @throws JetStreamApiException the request had an error related to the data
163163
*/
164164
MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException;

src/main/java/io/nats/client/MessageConsumer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
public interface MessageConsumer extends AutoCloseable {
2525
/**
2626
* Gets the consumer name associated with the subscription.
27-
* Some simplified consumer types do not support this, so it might be null.
27+
* For simplified consumers, this value can be null unless
28+
* the consumer info was manually read via {@link #getConsumerInfo()}.
2829
* @return the consumer name
2930
*/
3031
String getConsumerName();
@@ -33,21 +34,22 @@ public interface MessageConsumer extends AutoCloseable {
3334
* Gets information about the consumer behind this subscription.
3435
* @return consumer information
3536
* @throws IOException covers various communication issues with the NATS
36-
* server such as timeout or interruption
37+
* server, such as timeout or interruption
3738
* @throws JetStreamApiException the request had an error related to the data
3839
*/
3940
ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException;
4041

4142
/**
4243
* Gets information about the consumer behind this subscription.
43-
* This returns the last read version of Consumer Info, which could technically be out of date.
44+
* This returns the last read version of Consumer Info,
45+
* which could be null or out of date.
4446
* @return consumer information
4547
*/
4648
ConsumerInfo getCachedConsumerInfo();
4749

4850
/**
49-
* Use {@link close()} to unsubscribe. Stop will not unsubcribe or clean up resources.
50-
* The consumer will finish all pull request already in progress, but will not start any new ones.
51+
* Use {@link #close()} to unsubscribe. Stop will not unsubcribe or clean up resources.
52+
* The consumer will finish all pull requests already in progress, but will not start any new ones.
5153
*/
5254
void stop();
5355

src/main/java/io/nats/client/api/Watcher.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ public interface Watcher<T> {
3131
void endOfData();
3232

3333
/**
34-
* The watcher can supply a consumer name to be used when creating the internal watch consumer,
35-
* improving the ability to monitor the consumer.
34+
* The watcher can supply a prefix to use on the consumer name
35+
* that is generated when creating the internal watch consumer.
36+
* This can be useful for monitoring the consumer.
3637
* @return the name, or null if not needed, which is the default interface implementation.
3738
*/
38-
default String consumerName() {
39+
default String getConsumerNamePrefix() {
3940
return null;
4041
}
4142
}

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ static class OrderedPullSubscribeOptionsBuilder extends PullSubscribeOptions.Bui
8686
}
8787
}
8888

89-
int x = 0;
9089
@Override
9190
public NatsJetStreamPullSubscription subscribe(MessageHandler messageHandler, Dispatcher userDispatcher, PullMessageManager optionalPmm, Long optionalInactiveThreshold) throws IOException, JetStreamApiException {
9291
PullSubscribeOptions pso;
@@ -199,12 +198,13 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
199198

200199
try {
201200
long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
202-
nmcb = new NatsMessageConsumerBase(consumerName.get(), cachedConsumerInfo.get());
201+
nmcb = new NatsMessageConsumerBase(cachedConsumerInfo.get());
203202
nmcb.initSub(subscribe(null, null, null, inactiveThreshold));
203+
nmcb.setConsumerName(consumerName.get()); // the call to subscribe sets this
204+
trackConsume(nmcb); // this has to be done after the nmcb is fully set up
204205
nmcb.sub._pull(PullRequestOptions.builder(1)
205206
.expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT)
206207
.build(), false, null);
207-
trackConsume(nmcb);
208208
}
209209
catch (Exception e) {
210210
if (nmcb != null) {
@@ -220,7 +220,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
220220
stateLock.unlock();
221221
}
222222

223-
// intentionally outside of lock
223+
// intentionally outside the lock
224224
try {
225225
return nmcb.sub.nextMessage(maxWaitMillis);
226226
}
@@ -260,7 +260,7 @@ public FetchConsumer fetch(FetchConsumeOptions fetchConsumeOptions) throws IOExc
260260
stateLock.lock();
261261
checkState();
262262
Validator.required(fetchConsumeOptions, "Fetch Consume Options");
263-
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, consumerName.get(), cachedConsumerInfo.get(), fetchConsumeOptions));
263+
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
264264
}
265265
finally {
266266
stateLock.unlock();
@@ -284,7 +284,7 @@ public IterableConsumer iterate(ConsumeOptions consumeOptions) throws IOExceptio
284284
stateLock.lock();
285285
checkState();
286286
Validator.required(consumeOptions, "Consume Options");
287-
return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, consumerName.get(), cachedConsumerInfo.get(), consumeOptions));
287+
return (IterableConsumer) trackConsume(new NatsIterableConsumer(this, cachedConsumerInfo.get(), consumeOptions));
288288
}
289289
finally {
290290
stateLock.unlock();
@@ -325,7 +325,7 @@ public MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher userDis
325325
checkState();
326326
Validator.required(handler, "Message Handler");
327327
Validator.required(consumeOptions, "Consume Options");
328-
return trackConsume(new NatsMessageConsumer(this, consumerName.get(), cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
328+
return trackConsume(new NatsMessageConsumer(this, cachedConsumerInfo.get(), consumeOptions, userDispatcher, handler));
329329
}
330330
finally {
331331
stateLock.unlock();

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
2929
private long startNanos;
3030

3131
NatsFetchConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
32-
String consumerName,
3332
ConsumerInfo cachedConsumerInfo,
3433
FetchConsumeOptions fetchConsumeOptions) throws IOException, JetStreamApiException
3534
{
36-
super(consumerName, cachedConsumerInfo);
35+
super(cachedConsumerInfo);
3736

3837
boolean isNoWait = fetchConsumeOptions.isNoWait();
3938
long expiresInMillis = fetchConsumeOptions.getExpiresInMillis();

src/main/java/io/nats/client/impl/NatsIterableConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
class NatsIterableConsumer extends NatsMessageConsumer implements IterableConsumer {
2323

24-
NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, String consumerName, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException {
25-
super(subscriptionMaker, consumerName, cachedConsumerInfo, opts, null, null);
24+
NatsIterableConsumer(SimplifiedSubscriptionMaker subscriptionMaker, ConsumerInfo cachedConsumerInfo, ConsumeOptions opts) throws IOException, JetStreamApiException {
25+
super(subscriptionMaker, cachedConsumerInfo, opts, null, null);
2626
}
2727

2828
/**

src/main/java/io/nats/client/impl/NatsKeyValueWatchSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,6 @@ public void onMessage(Message m) throws InterruptedException {
7070
readSubjects.add(kv.readSubject(keyPattern.trim()));
7171
}
7272

73-
finishInit(kv, readSubjects, deliverPolicy, headersOnly, fromRevision, handler, watcher.consumerName());
73+
finishInit(kv, readSubjects, deliverPolicy, headersOnly, fromRevision, handler, watcher.getConsumerNamePrefix());
7474
}
7575
}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
2727
protected final MessageHandler userMessageHandler;
2828

2929
NatsMessageConsumer(SimplifiedSubscriptionMaker subscriptionMaker,
30-
String consumerName,
3130
ConsumerInfo cachedConsumerInfo,
3231
ConsumeOptions consumeOpts,
3332
Dispatcher userDispatcher,
3433
final MessageHandler userMessageHandler) throws IOException, JetStreamApiException
3534
{
36-
super(consumerName, cachedConsumerInfo);
35+
super(cachedConsumerInfo);
3736

3837
this.subscriptionMaker = subscriptionMaker;
3938
this.consumeOpts = consumeOpts;

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,19 @@ class NatsMessageConsumerBase implements MessageConsumer {
2828
protected ConsumerInfo cachedConsumerInfo;
2929
protected String consumerName;
3030

31-
NatsMessageConsumerBase(String consumerName, ConsumerInfo cachedConsumerInfo) {
32-
this.consumerName = consumerName;
31+
NatsMessageConsumerBase(ConsumerInfo cachedConsumerInfo) {
3332
this.cachedConsumerInfo = cachedConsumerInfo;
33+
if (cachedConsumerInfo != null) {
34+
this.consumerName = cachedConsumerInfo.getName();
35+
}
3436
this.stopped = new AtomicBoolean(false);
3537
this.finished = new AtomicBoolean(false);
3638
}
3739

40+
void setConsumerName(String consumerName) {
41+
this.consumerName = consumerName;
42+
}
43+
3844
void initSub(NatsJetStreamPullSubscription sub) {
3945
this.sub = sub;
4046
pmm = (PullMessageManager)sub.manager;
@@ -59,6 +65,9 @@ public boolean isFinished() {
5965
*/
6066
@Override
6167
public String getConsumerName() {
68+
if (consumerName == null) {
69+
consumerName = cachedConsumerInfo.getName();
70+
}
6271
return consumerName;
6372
}
6473

@@ -67,9 +76,9 @@ public String getConsumerName() {
6776
*/
6877
@Override
6978
public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
70-
// don't look up consumer info if it was never set - this check is for ordered consumer
71-
if (cachedConsumerInfo != null) {
79+
if (cachedConsumerInfo == null) {
7280
cachedConsumerInfo = sub.getConsumerInfo();
81+
consumerName = cachedConsumerInfo.getName();
7382
}
7483
return cachedConsumerInfo;
7584
}

src/main/java/io/nats/client/impl/NatsObjectStoreWatchSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,6 @@ public void onMessage(Message m) throws InterruptedException {
5959
}
6060
};
6161

62-
finishInit(os, Collections.singletonList(os.rawAllMetaSubject()), deliverPolicy, headersOnly, ULONG_UNSET, handler, watcher.consumerName());
62+
finishInit(os, Collections.singletonList(os.rawAllMetaSubject()), deliverPolicy, headersOnly, ULONG_UNSET, handler, watcher.getConsumerNamePrefix());
6363
}
6464
}

0 commit comments

Comments
 (0)