Skip to content

Commit f421c45

Browse files
poorbarcodenodece
authored andcommitted
[fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (apache#22533)
(cherry picked from commit 2badcf6)
1 parent 8d48096 commit f421c45

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,11 @@ private int getAvailablePermits(Consumer c) {
480480

481481
@Override
482482
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
483+
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
484+
// So skip this filter out.
485+
if (isAllowOutOfOrderDelivery()) {
486+
return src;
487+
}
483488
if (src.isEmpty()) {
484489
return src;
485490
}
@@ -524,6 +529,11 @@ protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarde
524529
*/
525530
@Override
526531
protected boolean hasConsumersNeededNormalRead() {
532+
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
533+
// So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here.
534+
if (isAllowOutOfOrderDelivery()) {
535+
return true;
536+
}
527537
for (Consumer consumer : consumerList) {
528538
if (consumer == null || consumer.isBlocked()) {
529539
continue;

pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,6 +1725,14 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
17251725
admin.topics().delete(topic, false);
17261726
}
17271727

1728+
@DataProvider(name = "allowKeySharedOutOfOrder")
1729+
public Object[][] allowKeySharedOutOfOrder() {
1730+
return new Object[][]{
1731+
{true},
1732+
{false}
1733+
};
1734+
}
1735+
17281736
/**
17291737
* This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
17301738
* 1. Start 3 consumers:
@@ -1739,8 +1747,8 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
17391747
* - no repeated Read-and-discard.
17401748
* - at last, all messages will be received.
17411749
*/
1742-
@Test(timeOut = 180 * 1000) // the test will be finished in 60s.
1743-
public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
1750+
@Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s.
1751+
public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception {
17441752
final int messagesSentPerTime = 100;
17451753
final Set<Integer> totalReceivedMessages = new TreeSet<>();
17461754
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
@@ -1759,6 +1767,8 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
17591767
log.info("Published message :{}", messageId);
17601768
}
17611769

1770+
KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange()
1771+
.setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder);
17621772
// 1. Start 3 consumers and make ack holes.
17631773
// - one consumer will be closed and trigger a messages redeliver.
17641774
// - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
@@ -1769,18 +1779,21 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
17691779
.subscriptionName(subName)
17701780
.receiverQueueSize(10)
17711781
.subscriptionType(SubscriptionType.Key_Shared)
1782+
.keySharedPolicy(keySharedPolicy)
17721783
.subscribe();
17731784
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
17741785
.topic(topic)
17751786
.subscriptionName(subName)
17761787
.receiverQueueSize(10)
17771788
.subscriptionType(SubscriptionType.Key_Shared)
1789+
.keySharedPolicy(keySharedPolicy)
17781790
.subscribe();
17791791
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
17801792
.topic(topic)
17811793
.subscriptionName(subName)
17821794
.receiverQueueSize(10)
17831795
.subscriptionType(SubscriptionType.Key_Shared)
1796+
.keySharedPolicy(keySharedPolicy)
17841797
.subscribe();
17851798
List<Message> msgList1 = new ArrayList<>();
17861799
List<Message> msgList2 = new ArrayList<>();
@@ -1829,6 +1842,7 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
18291842
.subscriptionName(subName)
18301843
.receiverQueueSize(1000)
18311844
.subscriptionType(SubscriptionType.Key_Shared)
1845+
.keySharedPolicy(keySharedPolicy)
18321846
.subscribe();
18331847
consumerWillBeClose.close();
18341848

0 commit comments

Comments
 (0)