Skip to content

Commit 8b427b3

Browse files
committed
test: add test cases about the last sent position and individually sent positions
1 parent b46221b commit 8b427b3

File tree

4 files changed

+933
-21
lines changed

4 files changed

+933
-21
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java

Lines changed: 180 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.mockito.ArgumentMatchers.anyInt;
22+
import static org.mockito.Mockito.doAnswer;
2123
import static org.mockito.Mockito.doReturn;
2224
import static org.mockito.Mockito.spy;
2325
import static org.mockito.Mockito.times;
@@ -56,6 +58,7 @@
5658
import java.util.concurrent.Future;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.concurrent.atomic.AtomicInteger;
61+
import java.util.function.Function;
5962
import javax.ws.rs.client.InvocationCallback;
6063
import javax.ws.rs.client.WebTarget;
6164
import javax.ws.rs.core.Response.Status;
@@ -65,6 +68,7 @@
6568
import lombok.Value;
6669
import lombok.extern.slf4j.Slf4j;
6770
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
71+
import org.apache.bookkeeper.mledger.Position;
6872
import org.apache.bookkeeper.mledger.PositionFactory;
6973
import org.apache.pulsar.broker.BrokerTestUtil;
7074
import org.apache.pulsar.broker.PulsarServerException;
@@ -75,6 +79,8 @@
7579
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
7680
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
7781
import org.apache.pulsar.broker.namespace.NamespaceService;
82+
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
83+
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
7884
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
7985
import org.apache.pulsar.broker.testcontext.SpyConfig;
8086
import org.apache.pulsar.client.admin.GetStatsOptions;
@@ -139,7 +145,10 @@
139145
import org.apache.pulsar.common.policies.data.TopicHashPositions;
140146
import org.apache.pulsar.common.policies.data.TopicStats;
141147
import org.apache.pulsar.common.util.Codec;
148+
import org.apache.pulsar.common.util.Murmur3_32Hash;
142149
import org.apache.pulsar.common.util.ObjectMapperFactory;
150+
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
151+
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
143152
import org.apache.pulsar.compaction.Compactor;
144153
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
145154
import org.awaitility.Awaitility;
@@ -3449,43 +3458,198 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception {
34493458
}
34503459

34513460
@Test
3452-
public void testGetReadPositionWhenJoining() throws Exception {
3453-
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
3461+
public void testGetLastSentPositionWhenJoining() throws Exception {
3462+
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + UUID.randomUUID().toString();
34543463
final String subName = "my-sub";
34553464
@Cleanup
34563465
Producer<byte[]> producer = pulsarClient.newProducer()
34573466
.topic(topic)
34583467
.enableBatching(false)
34593468
.create();
34603469

3470+
@Cleanup
3471+
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
3472+
.topic(topic)
3473+
.subscriptionType(SubscriptionType.Key_Shared)
3474+
.subscriptionName(subName)
3475+
.subscribe();
3476+
34613477
final int messages = 10;
34623478
MessageIdImpl messageId = null;
34633479
for (int i = 0; i < messages; i++) {
34643480
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
3481+
consumer1.receive();
34653482
}
34663483

3467-
List<Consumer<byte[]>> consumers = new ArrayList<>();
3468-
for (int i = 0; i < 2; i++) {
3469-
Consumer<byte[]> consumer = pulsarClient.newConsumer()
3470-
.topic(topic)
3471-
.subscriptionType(SubscriptionType.Key_Shared)
3472-
.subscriptionName(subName)
3473-
.subscribe();
3474-
consumers.add(consumer);
3475-
}
3484+
@Cleanup
3485+
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
3486+
.topic(topic)
3487+
.subscriptionType(SubscriptionType.Key_Shared)
3488+
.subscriptionName(subName)
3489+
.subscribe();
34763490

34773491
TopicStats stats = admin.topics().getStats(topic);
34783492
Assert.assertEquals(stats.getSubscriptions().size(), 1);
34793493
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
34803494
Assert.assertNotNull(subStats);
34813495
Assert.assertEquals(subStats.getConsumers().size(), 2);
3482-
ConsumerStats consumerStats = subStats.getConsumers().get(0);
3483-
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
3484-
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());
3496+
ConsumerStats consumerStats = subStats.getConsumers().stream()
3497+
.filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
3498+
Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
3499+
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
3500+
}
3501+
3502+
@Test
3503+
public void testGetLastSentPosition() throws Exception {
3504+
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPosition-" + UUID.randomUUID().toString();
3505+
final String subName = "my-sub";
3506+
@Cleanup
3507+
final Producer<byte[]> producer = pulsarClient.newProducer()
3508+
.topic(topic)
3509+
.enableBatching(false)
3510+
.create();
3511+
final AtomicInteger counter = new AtomicInteger();
3512+
@Cleanup
3513+
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
3514+
.topic(topic)
3515+
.subscriptionType(SubscriptionType.Key_Shared)
3516+
.subscriptionName(subName)
3517+
.messageListener((c, msg) -> {
3518+
try {
3519+
c.acknowledge(msg);
3520+
counter.getAndIncrement();
3521+
} catch (Exception e) {
3522+
throw new RuntimeException(e);
3523+
}
3524+
})
3525+
.subscribe();
3526+
3527+
TopicStats stats = admin.topics().getStats(topic);
3528+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3529+
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
3530+
Assert.assertNotNull(subStats);
3531+
Assert.assertNull(subStats.getLastSentPosition());
34853532

3486-
for (Consumer<byte[]> consumer : consumers) {
3487-
consumer.close();
3533+
final int messages = 10;
3534+
MessageIdImpl messageId = null;
3535+
for (int i = 0; i < messages; i++) {
3536+
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
34883537
}
3538+
3539+
Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), messages));
3540+
3541+
stats = admin.topics().getStats(topic);
3542+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3543+
subStats = stats.getSubscriptions().get(subName);
3544+
Assert.assertNotNull(subStats);
3545+
Assert.assertEquals(subStats.getLastSentPosition(), PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
3546+
}
3547+
3548+
@Test
3549+
public void testGetIndividuallySentPositions() throws Exception {
3550+
// The producer sends messages with two types of keys.
3551+
// The dispatcher sends keyA messages to consumer1.
3552+
// Consumer1 will not receive any messages. Its receiver queue size is 1.
3553+
// Consumer2 will receive and ack any messages immediately.
3554+
3555+
final String topic = "persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + UUID.randomUUID().toString();
3556+
final String subName = "my-sub";
3557+
@Cleanup
3558+
final Producer<byte[]> producer = pulsarClient.newProducer()
3559+
.topic(topic)
3560+
.enableBatching(false)
3561+
.create();
3562+
3563+
final String consumer1Name = "c1";
3564+
final String consumer2Name = "c2";
3565+
3566+
@Cleanup
3567+
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
3568+
.topic(topic)
3569+
.consumerName(consumer1Name)
3570+
.receiverQueueSize(1)
3571+
.subscriptionType(SubscriptionType.Key_Shared)
3572+
.subscriptionName(subName)
3573+
.subscribe();
3574+
3575+
final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
3576+
(PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
3577+
final String keyA = "key-a";
3578+
final String keyB = "key-b";
3579+
final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());
3580+
3581+
final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
3582+
selectorField.setAccessible(true);
3583+
final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
3584+
selectorField.set(dispatcher, selector);
3585+
3586+
// the selector returns consumer1 if keyA
3587+
doAnswer((invocationOnMock -> {
3588+
final int hash = invocationOnMock.getArgument(0);
3589+
3590+
final String consumerName = hash == hashA ? consumer1Name : consumer2Name;
3591+
return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get();
3592+
})).when(selector).select(anyInt());
3593+
3594+
final AtomicInteger consumer2AckCounter = new AtomicInteger();
3595+
@Cleanup
3596+
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
3597+
.topic(topic)
3598+
.consumerName(consumer2Name)
3599+
.subscriptionType(SubscriptionType.Key_Shared)
3600+
.subscriptionName(subName)
3601+
.messageListener((c, msg) -> {
3602+
try {
3603+
c.acknowledge(msg);
3604+
consumer2AckCounter.getAndIncrement();
3605+
} catch (Exception e) {
3606+
throw new RuntimeException(e);
3607+
}
3608+
})
3609+
.subscribe();
3610+
3611+
final LongPairRangeSet.LongPairConsumer<Position> positionRangeConverter = PositionFactory::create;
3612+
final LongPairRangeSet<Position> expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
3613+
3614+
TopicStats stats = admin.topics().getStats(topic);
3615+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3616+
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
3617+
Assert.assertNotNull(subStats);
3618+
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
3619+
3620+
final Function<String, MessageIdImpl> sendFn = (key) -> {
3621+
try {
3622+
return (MessageIdImpl) producer.newMessage().key(key).value(("msg").getBytes()).send();
3623+
} catch (PulsarClientException e) {
3624+
throw new RuntimeException(e);
3625+
}
3626+
};
3627+
final List<MessageIdImpl> messageIdList = new ArrayList<>();
3628+
3629+
// the dispatcher can send keyA message, but then consumer1's receiver queue will be full
3630+
messageIdList.add(sendFn.apply(keyA));
3631+
3632+
// the dispatcher can send messages other than keyA
3633+
messageIdList.add(sendFn.apply(keyA));
3634+
messageIdList.add(sendFn.apply(keyB));
3635+
messageIdList.add(sendFn.apply(keyA));
3636+
messageIdList.add(sendFn.apply(keyB));
3637+
messageIdList.add(sendFn.apply(keyB));
3638+
3639+
assertEquals(messageIdList.size(), 6);
3640+
Awaitility.await().untilAsserted(() -> assertEquals(consumer2AckCounter.get(), 3));
3641+
3642+
// set expected value
3643+
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(), messageIdList.get(1).getEntryId(),
3644+
messageIdList.get(2).getLedgerId(), messageIdList.get(2).getEntryId());
3645+
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(), messageIdList.get(3).getEntryId(),
3646+
messageIdList.get(5).getLedgerId(), messageIdList.get(5).getEntryId());
3647+
3648+
stats = admin.topics().getStats(topic);
3649+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3650+
subStats = stats.getSubscriptions().get(subName);
3651+
Assert.assertNotNull(subStats);
3652+
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
34893653
}
34903654

34913655
@Test

0 commit comments

Comments
 (0)