Skip to content

Commit 1bf06d5

Browse files
authored
Fix simplified ordered consuming when a delivery policy was set. (#1251)
* Fix simplified ordered consuming when a delivery policy was set. * more testing for ordered with start time or start sequence * more testing for ordered with start time or start sequence
1 parent 87cdd45 commit 1bf06d5

File tree

3 files changed

+170
-71
lines changed

3 files changed

+170
-71
lines changed

src/main/java/io/nats/client/impl/NatsJetStreamImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,13 @@ ConsumerConfiguration consumerConfigurationForOrdered(
196196
String consumerName,
197197
Long inactiveThreshold)
198198
{
199-
ConsumerConfiguration.Builder builder =
200-
ConsumerConfiguration.builder(originalCc)
201-
.deliverSubject(newDeliverSubject)
202-
.startTime(null); // clear start time in case it was originally set
199+
ConsumerConfiguration.Builder builder = ConsumerConfiguration.builder(originalCc).deliverSubject(newDeliverSubject);
203200

204201
if (lastStreamSeq > 0) {
205-
builder.deliverPolicy(DeliverPolicy.ByStartSequence)
206-
.startSequence(Math.max(1, lastStreamSeq + 1));
202+
builder
203+
.deliverPolicy(DeliverPolicy.ByStartSequence)
204+
.startSequence(Math.max(1, lastStreamSeq + 1))
205+
.startTime(null); // clear start time in case it was originally set
207206
}
208207

209208
if (consumerName != null && consumerCreate290Available) {
@@ -213,6 +212,7 @@ ConsumerConfiguration consumerConfigurationForOrdered(
213212
if (inactiveThreshold != null) {
214213
builder.inactiveThreshold(inactiveThreshold);
215214
}
215+
216216
return builder.build();
217217
}
218218

src/test/java/io/nats/client/impl/JetStreamTestBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,13 @@ public static void jsPublish(JetStream js, String subject, String prefix, int st
205205
}
206206
}
207207

208+
public static void jsPublish(JetStream js, String subject, int startId, int count, long sleep) throws IOException, JetStreamApiException {
209+
for (int x = 0; x < count; x++) {
210+
js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build());
211+
sleep(sleep);
212+
}
213+
}
214+
208215
public static void jsPublish(JetStream js, String subject, int startId, int count) throws IOException, JetStreamApiException {
209216
for (int x = 0; x < count; x++) {
210217
js.publish(NatsMessage.builder().subject(subject).data((dataBytes(startId++))).build());

src/test/java/io/nats/client/impl/SimplificationTests.java

Lines changed: 157 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.*;
2323
import java.time.Duration;
2424
import java.time.ZonedDateTime;
25+
import java.time.temporal.ChronoUnit;
2526
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicInteger;
@@ -265,7 +266,7 @@ public void testIterableConsumer() throws Exception {
265266
int stopCount = 500;
266267
// create the consumer then use it
267268
try (IterableConsumer consumer = consumerContext.iterate()) {
268-
_testIterable(js, stopCount, consumer, tsc.subject());
269+
_testIterableBasic(js, stopCount, consumer, tsc.subject());
269270
}
270271

271272
// coverage
@@ -275,6 +276,44 @@ public void testIterableConsumer() throws Exception {
275276
});
276277
}
277278

279+
@Test
280+
public void testOrderedConsumerDeliverPolices() throws Exception {
281+
jsServer.run(TestBase::atLeast2_9_1, nc -> {
282+
// Setup
283+
JetStream js = nc.jetStream();
284+
JetStreamManagement jsm = nc.jetStreamManagement();
285+
286+
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
287+
288+
jsPublish(js, tsc.subject(), 101, 3, 100);
289+
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);
290+
291+
StreamContext sctx = nc.getStreamContext(tsc.stream);
292+
293+
// test a start time
294+
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration()
295+
.filterSubject(tsc.subject())
296+
.deliverPolicy(DeliverPolicy.ByStartTime)
297+
.startTime(startTime);
298+
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
299+
try (IterableConsumer consumer = occtx.iterate()) {
300+
Message m = consumer.nextMessage(1000);
301+
assertEquals(2, m.metaData().streamSequence());
302+
}
303+
304+
// test a start sequence
305+
occ = new OrderedConsumerConfiguration()
306+
.filterSubject(tsc.subject())
307+
.deliverPolicy(DeliverPolicy.ByStartSequence)
308+
.startSequence(2);
309+
occtx = sctx.createOrderedConsumer(occ);
310+
try (IterableConsumer consumer = occtx.iterate()) {
311+
Message m = consumer.nextMessage(1000);
312+
assertEquals(2, m.metaData().streamSequence());
313+
}
314+
});
315+
}
316+
278317
@Test
279318
public void testOrderedIterableConsumerBasic() throws Exception {
280319
jsServer.run(TestBase::atLeast2_9_1, nc -> {
@@ -288,12 +327,12 @@ public void testOrderedIterableConsumerBasic() throws Exception {
288327
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
289328
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
290329
try (IterableConsumer consumer = occtx.iterate()) {
291-
_testIterable(js, stopCount, consumer, tsc.subject());
330+
_testIterableBasic(js, stopCount, consumer, tsc.subject());
292331
}
293332
});
294333
}
295334

296-
private static void _testIterable(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException {
335+
private static void _testIterableBasic(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException {
297336
AtomicInteger count = new AtomicInteger();
298337
Thread consumeThread = new Thread(() -> {
299338
try {
@@ -615,28 +654,49 @@ public void testOrderedBehaviorNext() throws Exception {
615654
JetStream js = nc.jetStream();
616655
JetStreamManagement jsm = nc.jetStreamManagement();
617656

618-
// Get this in place before subscriptions are made
619-
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
620-
621657
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
622658
StreamContext sctx = js.getStreamContext(tsc.stream);
623-
jsPublish(js, tsc.subject(), 101, 6);
624659

625-
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
626-
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
627-
// Loop through the messages to make sure I get stream sequence 1 to 6
628-
int expectedStreamSeq = 1;
629-
while (expectedStreamSeq <= 6) {
630-
Message m = occtx.next(1000);
631-
if (m != null) {
632-
assertEquals(expectedStreamSeq, m.metaData().streamSequence());
633-
assertEquals(1, m.metaData().consumerSequence());
634-
++expectedStreamSeq;
635-
}
636-
}
660+
jsPublish(js, tsc.subject(), 101, 6, 100);
661+
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);
662+
663+
// New pomm factory in place before each subscription is made
664+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
665+
_testOrderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));
666+
667+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
668+
_testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
669+
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));
670+
671+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
672+
_testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
673+
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
637674
});
638675
}
639676

677+
private static ZonedDateTime getStartTimeFirstMessage(JetStream js, TestingStreamContainer tsc) throws IOException, JetStreamApiException, InterruptedException {
678+
ZonedDateTime startTime;
679+
JetStreamSubscription sub = js.subscribe(tsc.subject());
680+
Message mt = sub.nextMessage(1000);
681+
startTime = mt.metaData().timestamp().plus(30, ChronoUnit.MILLIS);
682+
sub.unsubscribe();
683+
return startTime;
684+
}
685+
686+
private static void _testOrderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException {
687+
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
688+
// Loop through the messages to make sure I get stream sequence 1 to 6
689+
while (expectedStreamSeq <= 6) {
690+
Message m = occtx.next(1000);
691+
if (m != null) {
692+
assertEquals(expectedStreamSeq, m.metaData().streamSequence());
693+
assertEquals(1, m.metaData().consumerSequence());
694+
++expectedStreamSeq;
695+
}
696+
}
697+
}
698+
699+
public static long CS_FOR_SS_3 = 3;
640700
public static class PullOrderedTestDropSimulator extends PullOrderedMessageManager {
641701
@SuppressWarnings("ClassEscapesDefinedScope")
642702
public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, boolean syncMode) {
@@ -646,8 +706,8 @@ public PullOrderedTestDropSimulator(NatsConnection conn, NatsJetStream js, Strin
646706
@Override
647707
protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
648708
if (msg.isJetStream()
649-
&& msg.metaData().streamSequence() == 2
650-
&& msg.metaData().consumerSequence() == 2)
709+
&& msg.metaData().streamSequence() == 3
710+
&& msg.metaData().consumerSequence() == CS_FOR_SS_3)
651711
{
652712
return false;
653713
}
@@ -663,39 +723,55 @@ public void testOrderedBehaviorFetch() throws Exception {
663723
JetStream js = nc.jetStream();
664724
JetStreamManagement jsm = nc.jetStreamManagement();
665725

666-
// Get this in place before subscriptions are made
667-
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
668-
669726
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
670727
StreamContext sctx = js.getStreamContext(tsc.stream);
671-
jsPublish(js, tsc.subject(), 101, 5);
672-
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
673-
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
674-
int expectedStreamSeq = 1;
675-
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build();
676-
try (FetchConsumer fcon = occtx.fetch(fco)) {
677-
Message m = fcon.nextMessage();
678-
while (m != null) {
679-
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
680-
m = fcon.nextMessage();
681-
}
682-
// we know this because the simulator is designed to fail the first time at the second message
683-
assertEquals(2, expectedStreamSeq);
684-
// fetch failure will stop the consumer, but make sure it's done b/c with ordered
685-
// I can't have more than one consuming at a time.
686-
while (!fcon.isFinished()) {
687-
sleep(1);
688-
}
728+
729+
jsPublish(js, tsc.subject(), 101, 6, 100);
730+
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);
731+
732+
// New pomm factory in place before each subscription is made
733+
// Set the Consumer Sequence For Stream Sequence 3 statically for ease
734+
CS_FOR_SS_3 = 3;
735+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
736+
_testOrderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));
737+
738+
CS_FOR_SS_3 = 2;
739+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
740+
_testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
741+
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));
742+
743+
CS_FOR_SS_3 = 2;
744+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
745+
_testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
746+
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
747+
});
748+
}
749+
750+
private static void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
751+
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
752+
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build();
753+
try (FetchConsumer fcon = occtx.fetch(fco)) {
754+
Message m = fcon.nextMessage();
755+
while (m != null) {
756+
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
757+
m = fcon.nextMessage();
689758
}
690-
// this should finish without error
691-
try (FetchConsumer fcon = occtx.fetch(fco)) {
692-
Message m = fcon.nextMessage();
693-
while (expectedStreamSeq <= 5) {
694-
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
695-
m = fcon.nextMessage();
696-
}
759+
// we know this because the simulator is designed to fail the first time at the third message
760+
assertEquals(3, expectedStreamSeq);
761+
// fetch failure will stop the consumer, but make sure it's done b/c with ordered
762+
// I can't have more than one consuming at a time.
763+
while (!fcon.isFinished()) {
764+
sleep(1);
697765
}
698-
});
766+
}
767+
// this should finish without error
768+
try (FetchConsumer fcon = occtx.fetch(fco)) {
769+
Message m = fcon.nextMessage();
770+
while (expectedStreamSeq <= 6) {
771+
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
772+
m = fcon.nextMessage();
773+
}
774+
}
699775
}
700776

701777
@Test
@@ -705,25 +781,41 @@ public void testOrderedBehaviorIterable() throws Exception {
705781
JetStream js = nc.jetStream();
706782
JetStreamManagement jsm = nc.jetStreamManagement();
707783

708-
// Get this in place before subscriptions are made
709-
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
710-
711784
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
712785
StreamContext sctx = js.getStreamContext(tsc.stream);
713-
jsPublish(js, tsc.subject(), 101, 5);
714-
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
715-
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
716-
try (IterableConsumer icon = occtx.iterate()) {
717-
// Loop through the messages to make sure I get stream sequence 1 to 5
718-
int expectedStreamSeq = 1;
719-
while (expectedStreamSeq <= 5) {
720-
Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage
721-
if (m != null) {
722-
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
723-
}
786+
787+
jsPublish(js, tsc.subject(), 101, 6, 100);
788+
ZonedDateTime startTime = getStartTimeFirstMessage(js, tsc);
789+
790+
// New pomm factory in place before each subscription is made
791+
// Set the Consumer Sequence For Stream Sequence 3 statically for ease
792+
CS_FOR_SS_3 = 3;
793+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
794+
_testOrderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));
795+
796+
CS_FOR_SS_3 = 2;
797+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
798+
_testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
799+
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));
800+
801+
CS_FOR_SS_3 = 2;
802+
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
803+
_testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
804+
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
805+
});
806+
}
807+
808+
private static void _testOrderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
809+
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
810+
try (IterableConsumer icon = occtx.iterate()) {
811+
// Loop through the messages to make sure I get stream sequence 1 to 5
812+
while (expectedStreamSeq <= 5) {
813+
Message m = icon.nextMessage(Duration.ofSeconds(1)); // use duration version here for coverage
814+
if (m != null) {
815+
assertEquals(expectedStreamSeq++, m.metaData().streamSequence());
724816
}
725817
}
726-
});
818+
}
727819
}
728820

729821
@Test

0 commit comments

Comments
 (0)