Skip to content

Commit a630282

Browse files
pdoliflhotari
authored andcommitted
[fix][test] Fix flaky NonPersistentTopicTest.testMsgDropStat (apache#24134)
(cherry picked from commit acec48c)
1 parent 271370d commit a630282

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -871,14 +871,15 @@ public void testMsgDropStat() throws Exception {
871871
.messageRoutingMode(MessageRoutingMode.SinglePartition)
872872
.create();
873873
@Cleanup("shutdownNow")
874-
ExecutorService executor = Executors.newFixedThreadPool(5);
874+
ExecutorService executor = Executors.newFixedThreadPool(10);
875875
byte[] msgData = "testData".getBytes();
876876
final int totalProduceMessages = 1000;
877877
CountDownLatch latch = new CountDownLatch(1);
878878
AtomicInteger messagesSent = new AtomicInteger(0);
879879
for (int i = 0; i < totalProduceMessages; i++) {
880880
executor.submit(() -> {
881-
producer.sendAsync(msgData).handle((msgId, e) -> {
881+
try {
882+
MessageId msgId = producer.send(msgData);
882883
int count = messagesSent.incrementAndGet();
883884
// process at least 20% of messages before signalling the latch
884885
// a non-persistent message will return entryId as -1 when it has been dropped
@@ -888,8 +889,14 @@ public void testMsgDropStat() throws Exception {
888889
&& ((MessageIdImpl) msgId).getEntryId() == -1) {
889890
latch.countDown();
890891
}
891-
return null;
892-
});
892+
893+
Thread.sleep(10);
894+
} catch (PulsarClientException e) {
895+
throw new RuntimeException(e);
896+
} catch (InterruptedException e) {
897+
Thread.currentThread().interrupt();
898+
throw new RuntimeException(e);
899+
}
893900
});
894901
}
895902
assertTrue(latch.await(5, TimeUnit.SECONDS));

0 commit comments

Comments
 (0)