Skip to content

Commit d7e8ea1

Browse files
authored
[fix][broker] Introduce the last sent position to fix message ordering issues in Key_Shared (PIP-282) (#21953)
1 parent 59136a0 commit d7e8ea1

File tree

13 files changed

+1158
-61
lines changed

13 files changed

+1158
-61
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3472,6 +3472,19 @@ public LongPairRangeSet<Position> getIndividuallyDeletedMessagesSet() {
34723472
return individualDeletedMessages;
34733473
}
34743474

3475+
public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
3476+
LongPairRangeSet.RangeProcessor<Position> processor) {
3477+
final Position mdp;
3478+
lock.readLock().lock();
3479+
try {
3480+
mdp = markDeletePosition;
3481+
individualDeletedMessages.forEach(processor);
3482+
} finally {
3483+
lock.readLock().unlock();
3484+
}
3485+
return mdp;
3486+
}
3487+
34753488
public boolean isMessageDeleted(Position position) {
34763489
lock.readLock().lock();
34773490
try {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3497,7 +3497,7 @@ private CompletableFuture<Void> completeLedgerInfoForOffloaded(long ledgerId, UU
34973497
* the position range
34983498
* @return the count of entries
34993499
*/
3500-
long getNumberOfEntries(Range<Position> range) {
3500+
public long getNumberOfEntries(Range<Position> range) {
35013501
Position fromPosition = range.lowerEndpoint();
35023502
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
35033503
Position toPosition = range.upperEndpoint();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public class Consumer {
145145

146146
private static final double avgPercent = 0.9;
147147
private boolean preciseDispatcherFlowControl;
148-
private Position readPositionWhenJoining;
148+
private Position lastSentPositionWhenJoining;
149149
private final String clientAddress; // IP address only, no port number included
150150
private final MessageId startMessageId;
151151
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
@@ -931,8 +931,8 @@ public ConsumerStatsImpl getStats() {
931931
stats.unackedMessages = unackedMessages;
932932
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
933933
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
934-
if (readPositionWhenJoining != null) {
935-
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
934+
if (lastSentPositionWhenJoining != null) {
935+
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
936936
}
937937
return stats;
938938
}
@@ -1166,8 +1166,8 @@ public boolean isPreciseDispatcherFlowControl() {
11661166
return preciseDispatcherFlowControl;
11671167
}
11681168

1169-
public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
1170-
this.readPositionWhenJoining = readPositionWhenJoining;
1169+
public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
1170+
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
11711171
}
11721172

11731173
public int getMaxUnackedMessages() {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 169 additions & 26 deletions
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1305,9 +1305,26 @@ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
13051305
.getRecentlyJoinedConsumers();
13061306
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
13071307
recentlyJoinedConsumers.forEach((k, v) -> {
1308-
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
1308+
// The dispatcher allows same name consumers
1309+
final StringBuilder stringBuilder = new StringBuilder();
1310+
stringBuilder.append("consumerName=").append(k.consumerName())
1311+
.append(", consumerId=").append(k.consumerId());
1312+
if (k.cnx() != null) {
1313+
stringBuilder.append(", address=").append(k.cnx().clientAddress());
1314+
}
1315+
subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString());
13091316
});
13101317
}
1318+
final String lastSentPosition = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
1319+
.getLastSentPosition();
1320+
if (lastSentPosition != null) {
1321+
subStats.lastSentPosition = lastSentPosition;
1322+
}
1323+
final String individuallySentPositions = ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher)
1324+
.getIndividuallySentPositions();
1325+
if (individuallySentPositions != null) {
1326+
subStats.individuallySentPositions = individuallySentPositions;
1327+
}
13111328
}
13121329
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
13131330
subStats.nonContiguousDeletedMessagesRangesSerializedSize =

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java

Lines changed: 180 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.mockito.ArgumentMatchers.anyInt;
22+
import static org.mockito.Mockito.doAnswer;
2123
import static org.mockito.Mockito.doReturn;
2224
import static org.mockito.Mockito.spy;
2325
import static org.mockito.Mockito.times;
@@ -56,6 +58,7 @@
5658
import java.util.concurrent.Future;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.concurrent.atomic.AtomicInteger;
61+
import java.util.function.Function;
5962
import javax.ws.rs.client.InvocationCallback;
6063
import javax.ws.rs.client.WebTarget;
6164
import javax.ws.rs.core.Response.Status;
@@ -65,6 +68,7 @@
6568
import lombok.Value;
6669
import lombok.extern.slf4j.Slf4j;
6770
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
71+
import org.apache.bookkeeper.mledger.Position;
6872
import org.apache.bookkeeper.mledger.PositionFactory;
6973
import org.apache.pulsar.broker.BrokerTestUtil;
7074
import org.apache.pulsar.broker.PulsarServerException;
@@ -75,6 +79,8 @@
7579
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
7680
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
7781
import org.apache.pulsar.broker.namespace.NamespaceService;
82+
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
83+
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
7884
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
7985
import org.apache.pulsar.broker.testcontext.SpyConfig;
8086
import org.apache.pulsar.client.admin.GetStatsOptions;
@@ -139,7 +145,10 @@
139145
import org.apache.pulsar.common.policies.data.TopicHashPositions;
140146
import org.apache.pulsar.common.policies.data.TopicStats;
141147
import org.apache.pulsar.common.util.Codec;
148+
import org.apache.pulsar.common.util.Murmur3_32Hash;
142149
import org.apache.pulsar.common.util.ObjectMapperFactory;
150+
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
151+
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
143152
import org.apache.pulsar.compaction.Compactor;
144153
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
145154
import org.awaitility.Awaitility;
@@ -3449,43 +3458,198 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception {
34493458
}
34503459

34513460
@Test
3452-
public void testGetReadPositionWhenJoining() throws Exception {
3453-
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
3461+
public void testGetLastSentPositionWhenJoining() throws Exception {
3462+
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + UUID.randomUUID().toString();
34543463
final String subName = "my-sub";
34553464
@Cleanup
34563465
Producer<byte[]> producer = pulsarClient.newProducer()
34573466
.topic(topic)
34583467
.enableBatching(false)
34593468
.create();
34603469

3470+
@Cleanup
3471+
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
3472+
.topic(topic)
3473+
.subscriptionType(SubscriptionType.Key_Shared)
3474+
.subscriptionName(subName)
3475+
.subscribe();
3476+
34613477
final int messages = 10;
34623478
MessageIdImpl messageId = null;
34633479
for (int i = 0; i < messages; i++) {
34643480
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
3481+
consumer1.receive();
34653482
}
34663483

3467-
List<Consumer<byte[]>> consumers = new ArrayList<>();
3468-
for (int i = 0; i < 2; i++) {
3469-
Consumer<byte[]> consumer = pulsarClient.newConsumer()
3470-
.topic(topic)
3471-
.subscriptionType(SubscriptionType.Key_Shared)
3472-
.subscriptionName(subName)
3473-
.subscribe();
3474-
consumers.add(consumer);
3475-
}
3484+
@Cleanup
3485+
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
3486+
.topic(topic)
3487+
.subscriptionType(SubscriptionType.Key_Shared)
3488+
.subscriptionName(subName)
3489+
.subscribe();
34763490

34773491
TopicStats stats = admin.topics().getStats(topic);
34783492
Assert.assertEquals(stats.getSubscriptions().size(), 1);
34793493
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
34803494
Assert.assertNotNull(subStats);
34813495
Assert.assertEquals(subStats.getConsumers().size(), 2);
3482-
ConsumerStats consumerStats = subStats.getConsumers().get(0);
3483-
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
3484-
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());
3496+
ConsumerStats consumerStats = subStats.getConsumers().stream()
3497+
.filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
3498+
Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
3499+
PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
3500+
}
3501+
3502+
@Test
3503+
public void testGetLastSentPosition() throws Exception {
3504+
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPosition-" + UUID.randomUUID().toString();
3505+
final String subName = "my-sub";
3506+
@Cleanup
3507+
final Producer<byte[]> producer = pulsarClient.newProducer()
3508+
.topic(topic)
3509+
.enableBatching(false)
3510+
.create();
3511+
final AtomicInteger counter = new AtomicInteger();
3512+
@Cleanup
3513+
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
3514+
.topic(topic)
3515+
.subscriptionType(SubscriptionType.Key_Shared)
3516+
.subscriptionName(subName)
3517+
.messageListener((c, msg) -> {
3518+
try {
3519+
c.acknowledge(msg);
3520+
counter.getAndIncrement();
3521+
} catch (Exception e) {
3522+
throw new RuntimeException(e);
3523+
}
3524+
})
3525+
.subscribe();
3526+
3527+
TopicStats stats = admin.topics().getStats(topic);
3528+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3529+
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
3530+
Assert.assertNotNull(subStats);
3531+
Assert.assertNull(subStats.getLastSentPosition());
34853532

3486-
for (Consumer<byte[]> consumer : consumers) {
3487-
consumer.close();
3533+
final int messages = 10;
3534+
MessageIdImpl messageId = null;
3535+
for (int i = 0; i < messages; i++) {
3536+
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
34883537
}
3538+
3539+
Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), messages));
3540+
3541+
stats = admin.topics().getStats(topic);
3542+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3543+
subStats = stats.getSubscriptions().get(subName);
3544+
Assert.assertNotNull(subStats);
3545+
Assert.assertEquals(subStats.getLastSentPosition(), PositionFactory.create(messageId.getLedgerId(), messageId.getEntryId()).toString());
3546+
}
3547+
3548+
@Test
3549+
public void testGetIndividuallySentPositions() throws Exception {
3550+
// The producer sends messages with two types of keys.
3551+
// The dispatcher sends keyA messages to consumer1.
3552+
// Consumer1 will not receive any messages. Its receiver queue size is 1.
3553+
// Consumer2 will receive and ack any messages immediately.
3554+
3555+
final String topic = "persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + UUID.randomUUID().toString();
3556+
final String subName = "my-sub";
3557+
@Cleanup
3558+
final Producer<byte[]> producer = pulsarClient.newProducer()
3559+
.topic(topic)
3560+
.enableBatching(false)
3561+
.create();
3562+
3563+
final String consumer1Name = "c1";
3564+
final String consumer2Name = "c2";
3565+
3566+
@Cleanup
3567+
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
3568+
.topic(topic)
3569+
.consumerName(consumer1Name)
3570+
.receiverQueueSize(1)
3571+
.subscriptionType(SubscriptionType.Key_Shared)
3572+
.subscriptionName(subName)
3573+
.subscribe();
3574+
3575+
final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
3576+
(PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
3577+
final String keyA = "key-a";
3578+
final String keyB = "key-b";
3579+
final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());
3580+
3581+
final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
3582+
selectorField.setAccessible(true);
3583+
final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
3584+
selectorField.set(dispatcher, selector);
3585+
3586+
// the selector returns consumer1 if keyA
3587+
doAnswer((invocationOnMock -> {
3588+
final int hash = invocationOnMock.getArgument(0);
3589+
3590+
final String consumerName = hash == hashA ? consumer1Name : consumer2Name;
3591+
return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get();
3592+
})).when(selector).select(anyInt());
3593+
3594+
final AtomicInteger consumer2AckCounter = new AtomicInteger();
3595+
@Cleanup
3596+
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
3597+
.topic(topic)
3598+
.consumerName(consumer2Name)
3599+
.subscriptionType(SubscriptionType.Key_Shared)
3600+
.subscriptionName(subName)
3601+
.messageListener((c, msg) -> {
3602+
try {
3603+
c.acknowledge(msg);
3604+
consumer2AckCounter.getAndIncrement();
3605+
} catch (Exception e) {
3606+
throw new RuntimeException(e);
3607+
}
3608+
})
3609+
.subscribe();
3610+
3611+
final LongPairRangeSet.LongPairConsumer<Position> positionRangeConverter = PositionFactory::create;
3612+
final LongPairRangeSet<Position> expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);
3613+
3614+
TopicStats stats = admin.topics().getStats(topic);
3615+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3616+
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
3617+
Assert.assertNotNull(subStats);
3618+
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
3619+
3620+
final Function<String, MessageIdImpl> sendFn = (key) -> {
3621+
try {
3622+
return (MessageIdImpl) producer.newMessage().key(key).value(("msg").getBytes()).send();
3623+
} catch (PulsarClientException e) {
3624+
throw new RuntimeException(e);
3625+
}
3626+
};
3627+
final List<MessageIdImpl> messageIdList = new ArrayList<>();
3628+
3629+
// the dispatcher can send keyA message, but then consumer1's receiver queue will be full
3630+
messageIdList.add(sendFn.apply(keyA));
3631+
3632+
// the dispatcher can send messages other than keyA
3633+
messageIdList.add(sendFn.apply(keyA));
3634+
messageIdList.add(sendFn.apply(keyB));
3635+
messageIdList.add(sendFn.apply(keyA));
3636+
messageIdList.add(sendFn.apply(keyB));
3637+
messageIdList.add(sendFn.apply(keyB));
3638+
3639+
assertEquals(messageIdList.size(), 6);
3640+
Awaitility.await().untilAsserted(() -> assertEquals(consumer2AckCounter.get(), 3));
3641+
3642+
// set expected value
3643+
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(), messageIdList.get(1).getEntryId(),
3644+
messageIdList.get(2).getLedgerId(), messageIdList.get(2).getEntryId());
3645+
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(), messageIdList.get(3).getEntryId(),
3646+
messageIdList.get(5).getLedgerId(), messageIdList.get(5).getEntryId());
3647+
3648+
stats = admin.topics().getStats(topic);
3649+
Assert.assertEquals(stats.getSubscriptions().size(), 1);
3650+
subStats = stats.getSubscriptions().get(subName);
3651+
Assert.assertNotNull(subStats);
3652+
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
34893653
}
34903654

34913655
@Test

0 commit comments

Comments
 (0)