Skip to content

Commit 1fbee9b

Browse files
authored
Extract non 211 Part 2 (#1242)
1 parent dd8a3a1 commit 1fbee9b

9 files changed

+62
-64
lines changed

src/main/java/io/nats/client/FeatureOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public abstract class FeatureOptions {
2222

2323
private final JetStreamOptions jso;
2424

25-
@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
26-
protected FeatureOptions(Builder b) {
25+
protected FeatureOptions(Builder<?, ?> b) {
2726
jso = b.jsoBuilder.build();
2827
}
2928

src/main/java/io/nats/client/SubscribeOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ public abstract class SubscribeOptions {
3838
protected final long pendingByteLimit; // Only applicable for non dispatched (sync) push consumers.
3939
protected final String name;
4040

41-
@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
42-
protected SubscribeOptions(Builder builder, boolean isPull,
41+
protected SubscribeOptions(Builder<?, ?> builder, boolean isPull,
4342
String deliverSubject, String deliverGroup,
4443
long pendingMessageLimit, long pendingByteLimit) {
4544

src/test/java/io/nats/client/impl/JetStreamGeneralTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ public void testBindPush() throws Exception {
462462

463463
jsPublish(js, tsc.subject(), 1, 1);
464464
PushSubscribeOptions pso = PushSubscribeOptions.builder()
465-
.durable(tsc.name())
465+
.durable(tsc.consumerName())
466466
.build();
467467
JetStreamSubscription s = js.subscribe(tsc.subject(), pso);
468468
Message m = s.nextMessage(DEFAULT_TIMEOUT);
@@ -474,7 +474,7 @@ public void testBindPush() throws Exception {
474474
jsPublish(js, tsc.subject(), 2, 1);
475475
pso = PushSubscribeOptions.builder()
476476
.stream(tsc.stream)
477-
.durable(tsc.name())
477+
.durable(tsc.consumerName())
478478
.bind(true)
479479
.build();
480480
s = js.subscribe(tsc.subject(), pso);
@@ -485,7 +485,7 @@ public void testBindPush() throws Exception {
485485
unsubscribeEnsureNotBound(s);
486486

487487
jsPublish(js, tsc.subject(), 3, 1);
488-
pso = PushSubscribeOptions.bind(tsc.stream, tsc.name());
488+
pso = PushSubscribeOptions.bind(tsc.stream, tsc.consumerName());
489489
s = js.subscribe(tsc.subject(), pso);
490490
m = s.nextMessage(DEFAULT_TIMEOUT);
491491
assertNotNull(m);
@@ -495,7 +495,7 @@ public void testBindPush() throws Exception {
495495
() -> PushSubscribeOptions.builder().stream(tsc.stream).bind(true).build());
496496

497497
assertThrows(IllegalArgumentException.class,
498-
() -> PushSubscribeOptions.builder().durable(tsc.name()).bind(true).build());
498+
() -> PushSubscribeOptions.builder().durable(tsc.consumerName()).bind(true).build());
499499

500500
assertThrows(IllegalArgumentException.class,
501501
() -> PushSubscribeOptions.builder().stream(EMPTY).bind(true).build());
@@ -514,7 +514,7 @@ public void testBindPull() throws Exception {
514514
jsPublish(js, tsc.subject(), 1, 1);
515515

516516
PullSubscribeOptions pso = PullSubscribeOptions.builder()
517-
.durable(tsc.name())
517+
.durable(tsc.consumerName())
518518
.build();
519519
JetStreamSubscription s = js.subscribe(tsc.subject(), pso);
520520
s.pull(1);
@@ -527,7 +527,7 @@ public void testBindPull() throws Exception {
527527
jsPublish(js, tsc.subject(), 2, 1);
528528
pso = PullSubscribeOptions.builder()
529529
.stream(tsc.stream)
530-
.durable(tsc.name())
530+
.durable(tsc.consumerName())
531531
.bind(true)
532532
.build();
533533
s = js.subscribe(tsc.subject(), pso);
@@ -539,7 +539,7 @@ public void testBindPull() throws Exception {
539539
unsubscribeEnsureNotBound(s);
540540

541541
jsPublish(js, tsc.subject(), 3, 1);
542-
pso = PullSubscribeOptions.bind(tsc.stream, tsc.name());
542+
pso = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName());
543543
s = js.subscribe(tsc.subject(), pso);
544544
s.pull(1);
545545
m = s.nextMessage(DEFAULT_TIMEOUT);
@@ -960,9 +960,9 @@ public void testInternalLookupConsumerInfoCoverage() throws Exception {
960960
// - consumer not found
961961
// - stream does not exist
962962
JetStreamSubscription sub = js.subscribe(tsc.subject());
963-
assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.name()));
963+
assertNull(((NatsJetStream)js).lookupConsumerInfo(tsc.stream, tsc.consumerName()));
964964
assertThrows(JetStreamApiException.class,
965-
() -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.name()));
965+
() -> ((NatsJetStream)js).lookupConsumerInfo(stream(999), tsc.consumerName()));
966966
});
967967
}
968968

src/test/java/io/nats/client/impl/JetStreamManagementTests.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -827,7 +827,7 @@ public void testAddPausedConsumer() throws Exception {
827827

828828
ZonedDateTime pauseUntil = ZonedDateTime.now(ZONE_ID_GMT).plusMinutes(2);
829829
ConsumerConfiguration cc = ConsumerConfiguration.builder()
830-
.durable(tsc.name())
830+
.durable(tsc.consumerName())
831831
.pauseUntil(pauseUntil)
832832
.build();
833833

@@ -849,7 +849,7 @@ public void testPauseResumeConsumer() throws Exception {
849849
assertEquals(0, list.size());
850850

851851
ConsumerConfiguration cc = ConsumerConfiguration.builder()
852-
.durable(tsc.name())
852+
.durable(tsc.consumerName())
853853
.build();
854854

855855
// durable and name can both be null
@@ -886,9 +886,9 @@ public void testPauseResumeConsumer() throws Exception {
886886
ci = jsm.getConsumerInfo(tsc.stream, ci.getName());
887887
assertFalse(ci.getPaused());
888888

889-
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.name(), pauseUntil));
889+
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(stream(), tsc.consumerName(), pauseUntil));
890890
assertThrows(JetStreamApiException.class, () -> jsm.pauseConsumer(tsc.stream, name(), pauseUntil));
891-
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.name()));
891+
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(stream(), tsc.consumerName()));
892892
assertThrows(JetStreamApiException.class, () -> jsm.resumeConsumer(tsc.stream, name()));
893893
});
894894
}
@@ -1009,7 +1009,7 @@ public void testConsumerMetadata() throws Exception {
10091009
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
10101010

10111011
ConsumerConfiguration cc = ConsumerConfiguration.builder()
1012-
.durable(tsc.name())
1012+
.durable(tsc.consumerName())
10131013
.metadata(metaData)
10141014
.build();
10151015

@@ -1065,14 +1065,14 @@ public void testGetConsumerInfo() throws Exception {
10651065
jsServer.run(nc -> {
10661066
JetStreamManagement jsm = nc.jetStreamManagement();
10671067
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
1068-
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.name()));
1069-
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
1068+
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, tsc.consumerName()));
1069+
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
10701070
ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc);
10711071
assertEquals(tsc.stream, ci.getStreamName());
1072-
assertEquals(tsc.name(), ci.getName());
1073-
ci = jsm.getConsumerInfo(tsc.stream, tsc.name());
1072+
assertEquals(tsc.consumerName(), ci.getName());
1073+
ci = jsm.getConsumerInfo(tsc.stream, tsc.consumerName());
10741074
assertEquals(tsc.stream, ci.getStreamName());
1075-
assertEquals(tsc.name(), ci.getName());
1075+
assertEquals(tsc.consumerName(), ci.getName());
10761076
assertThrows(JetStreamApiException.class, () -> jsm.getConsumerInfo(tsc.stream, durable(999)));
10771077
if (nc.getServerInfo().isSameOrNewerThanVersion("2.10")) {
10781078
assertNotNull(ci.getTimestamp());
@@ -1228,14 +1228,14 @@ public void testConsumerReplica() throws Exception {
12281228
TestingStreamContainer tsc = new TestingStreamContainer(nc);
12291229

12301230
final ConsumerConfiguration cc0 = ConsumerConfiguration.builder()
1231-
.durable(tsc.name())
1231+
.durable(tsc.consumerName())
12321232
.build();
12331233
ConsumerInfo ci = jsm.addOrUpdateConsumer(tsc.stream, cc0);
12341234
// server returns 0 when value is not set
12351235
assertEquals(0, ci.getConsumerConfiguration().getNumReplicas());
12361236

12371237
final ConsumerConfiguration cc1 = ConsumerConfiguration.builder()
1238-
.durable(tsc.name())
1238+
.durable(tsc.consumerName())
12391239
.numReplicas(1)
12401240
.build();
12411241
ci = jsm.addOrUpdateConsumer(tsc.stream, cc1);

src/test/java/io/nats/client/impl/JetStreamPullTests.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ public void testFetch() throws Exception {
6464
.build();
6565

6666
PullSubscribeOptions options = PullSubscribeOptions.builder()
67-
.durable(tsc.name())
67+
.durable(tsc.consumerName())
6868
.configuration(cc)
6969
.build();
7070

7171
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
72-
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
72+
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
7373
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
7474

7575
List<Message> messages = sub.fetch(10, fetchDur);
@@ -139,12 +139,12 @@ public void testIterate() throws Exception {
139139
.build();
140140

141141
PullSubscribeOptions options = PullSubscribeOptions.builder()
142-
.durable(tsc.name())
142+
.durable(tsc.consumerName())
143143
.configuration(cc)
144144
.build();
145145

146146
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
147-
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
147+
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
148148
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
149149

150150
Iterator<Message> iterator = sub.iterate(10, fetchDur);
@@ -218,11 +218,11 @@ public void testBasic() throws Exception {
218218
TestingStreamContainer tsc = new TestingStreamContainer(nc);
219219

220220
// Build our subscription options.
221-
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
221+
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();
222222

223223
// Subscribe synchronously.
224224
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
225-
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
225+
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
226226
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
227227

228228
// publish some amount of messages, but not entire pull size
@@ -317,11 +317,11 @@ public void testNoWait() throws Exception {
317317
TestingStreamContainer tsc = new TestingStreamContainer(nc);
318318

319319
// Build our subscription options.
320-
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
320+
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();
321321

322322
// Subscribe synchronously.
323323
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
324-
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
324+
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
325325
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
326326

327327
// publish 10 messages
@@ -391,11 +391,11 @@ public void testPullExpires() throws Exception {
391391
TestingStreamContainer tsc = new TestingStreamContainer(nc);
392392

393393
// Build our subscription options.
394-
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.name()).build();
394+
PullSubscribeOptions options = PullSubscribeOptions.builder().durable(tsc.consumerName()).build();
395395

396396
// Subscribe synchronously.
397397
JetStreamSubscription sub = js.subscribe(tsc.subject(), options);
398-
assertSubscription(sub, tsc.stream, tsc.name(), null, true);
398+
assertSubscription(sub, tsc.stream, tsc.consumerName(), null, true);
399399
nc.flush(Duration.ofSeconds(1)); // flush outgoing communication with/to the server
400400

401401
long expires = 500; // millis
@@ -574,7 +574,7 @@ public void testAckWaitTimeout() throws Exception {
574574
.ackWait(1500)
575575
.build();
576576
PullSubscribeOptions pso = PullSubscribeOptions.builder()
577-
.durable(tsc.name())
577+
.durable(tsc.consumerName())
578578
.configuration(cc)
579579
.build();
580580

@@ -1108,10 +1108,10 @@ public void testReader() throws Exception {
11081108
JetStream js = nc.jetStream();
11091109

11101110
// Pre define a consumer
1111-
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).filterSubjects(tsc.subject()).build();
1111+
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).filterSubjects(tsc.subject()).build();
11121112
jsm.addOrUpdateConsumer(tsc.stream, cc);
11131113

1114-
PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.name());
1114+
PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, tsc.consumerName());
11151115
JetStreamSubscription sub = js.subscribe(tsc.subject(), so);
11161116
JetStreamReader reader = sub.reader(500, 125);
11171117

src/test/java/io/nats/client/impl/JetStreamPushQueueTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void testQueueSubWorkflow() throws Exception {
4343
// - the PushSubscribeOptions can be re-used since all the subscribers are the same
4444
// - use a concurrent integer to track all the messages received
4545
// - have a list of subscribers and threads so I can track them
46-
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.name()).build();
46+
PushSubscribeOptions pso = PushSubscribeOptions.builder().durable(tsc.consumerName()).build();
4747
AtomicInteger allReceived = new AtomicInteger();
4848
List<JsQueueSubscriber> subscribers = new ArrayList<>();
4949
List<Thread> subThreads = new ArrayList<>();

src/test/java/io/nats/client/impl/JetStreamTestBase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ public String subject(Object variant) {
141141
return subjects.computeIfAbsent(variant, TestBase::subject);
142142
}
143143

144-
public String name() {
145-
return name(defaultNameVariant);
144+
public String consumerName() {
145+
return consumerName(defaultNameVariant);
146146
}
147147

148-
public String name(Object variant) {
148+
public String consumerName(Object variant) {
149149
return names.computeIfAbsent(variant, TestBase::name);
150150
}
151151
}

src/test/java/io/nats/client/impl/SimplificationTests.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,11 @@ public void testIterableConsumer() throws Exception {
268268
JetStream js = nc.jetStream();
269269

270270
// Pre define a consumer
271-
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
271+
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
272272
jsm.addOrUpdateConsumer(tsc.stream, cc);
273273

274274
// Consumer[Context]
275-
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name());
275+
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName());
276276

277277
int stopCount = 500;
278278
// create the consumer then use it
@@ -355,11 +355,11 @@ public void testConsumeWithHandler() throws Exception {
355355
jsPublish(js, tsc.subject(), 2500);
356356

357357
// Pre define a consumer
358-
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.name()).build();
358+
ConsumerConfiguration cc = ConsumerConfiguration.builder().durable(tsc.consumerName()).build();
359359
jsm.addOrUpdateConsumer(tsc.stream, cc);
360360

361361
// Consumer[Context]
362-
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.name());
362+
ConsumerContext consumerContext = js.getConsumerContext(tsc.stream, tsc.consumerName());
363363

364364
int stopCount = 500;
365365

@@ -428,30 +428,30 @@ public void testCoverage() throws Exception {
428428
JetStream js = nc.jetStream();
429429

430430
// Pre define a consumer
431-
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(1)).build());
432-
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(2)).build());
433-
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(3)).build());
434-
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.name(4)).build());
431+
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(1)).build());
432+
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(2)).build());
433+
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(3)).build());
434+
jsm.addOrUpdateConsumer(tsc.stream, ConsumerConfiguration.builder().durable(tsc.consumerName(4)).build());
435435

436436
// Stream[Context]
437437
StreamContext sctx1 = nc.getStreamContext(tsc.stream);
438438
nc.getStreamContext(tsc.stream, JetStreamOptions.DEFAULT_JS_OPTIONS);
439439
js.getStreamContext(tsc.stream);
440440

441441
// Consumer[Context]
442-
ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.name(1));
443-
ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.name(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
444-
ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.name(3));
445-
ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.name(4));
446-
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(5)).build());
447-
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.name(6)).build());
448-
449-
after(cctx1.iterate(), tsc.name(1), true);
450-
after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.name(2), true);
451-
after(cctx3.consume(m -> {}), tsc.name(3), true);
452-
after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.name(4), true);
453-
after(cctx5.fetchMessages(1), tsc.name(5), false);
454-
after(cctx6.fetchBytes(1000), tsc.name(6), false);
442+
ConsumerContext cctx1 = nc.getConsumerContext(tsc.stream, tsc.consumerName(1));
443+
ConsumerContext cctx2 = nc.getConsumerContext(tsc.stream, tsc.consumerName(2), JetStreamOptions.DEFAULT_JS_OPTIONS);
444+
ConsumerContext cctx3 = js.getConsumerContext(tsc.stream, tsc.consumerName(3));
445+
ConsumerContext cctx4 = sctx1.getConsumerContext(tsc.consumerName(4));
446+
ConsumerContext cctx5 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(5)).build());
447+
ConsumerContext cctx6 = sctx1.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(tsc.consumerName(6)).build());
448+
449+
after(cctx1.iterate(), tsc.consumerName(1), true);
450+
after(cctx2.iterate(ConsumeOptions.DEFAULT_CONSUME_OPTIONS), tsc.consumerName(2), true);
451+
after(cctx3.consume(m -> {}), tsc.consumerName(3), true);
452+
after(cctx4.consume(ConsumeOptions.DEFAULT_CONSUME_OPTIONS, m -> {}), tsc.consumerName(4), true);
453+
after(cctx5.fetchMessages(1), tsc.consumerName(5), false);
454+
after(cctx6.fetchBytes(1000), tsc.consumerName(6), false);
455455
});
456456
}
457457

0 commit comments

Comments
 (0)