Skip to content

Commit bec3be2

Browse files
committed
[fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet (#22283)
(cherry picked from commit a52945b)
1 parent 94edfe4 commit bec3be2

File tree

4 files changed

+131
-6
lines changed

4 files changed

+131
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,10 +1194,20 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
11941194
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
11951195
"Consumer is already present on the connection");
11961196
} else if (existingConsumerFuture.isCompletedExceptionally()){
1197+
log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection,"
1198+
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
11971199
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
1198-
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
1199-
remoteAddress, subscriptionName));
1200-
consumers.remove(consumerId, existingConsumerFuture);
1200+
String.format("A failed consumer with id is already present on the connection."
1201+
+ " consumerId: %s, remoteAddress: %s, subscription: %s",
1202+
consumerId, remoteAddress, subscriptionName));
1203+
/**
1204+
* This future may was failed due to the client closed a in-progress subscribing.
1205+
* See {@link #handleCloseConsumer(CommandCloseConsumer)}
1206+
* Do not remove the failed future at current line, it will be removed after the progress of
1207+
* the previous subscribing is done.
1208+
* Before the previous subscribing is done, the new subscribe request will always fail.
1209+
* This mechanism is in order to prevent more complex logic to handle the race conditions.
1210+
*/
12011211
commandSender.sendErrorResponse(requestId, error,
12021212
"Consumer that failed is already present on the connection");
12031213
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,15 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
177177
}
178178

179179
if (isConsumersExceededOnSubscription()) {
180-
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name);
180+
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit {}",
181+
name, consumer);
181182
return FutureUtil.failedFuture(new ConsumerBusyException("Subscription reached max consumers limit"));
182183
}
184+
// This is not an expected scenario, it will never happen in expected. Just print a warn log if the unexpected
185+
// scenario happens. See more detail: https://github.com/apache/pulsar/pull/22283.
186+
if (consumerSet.contains(consumer)) {
187+
log.warn("[{}] Attempting to add a consumer that already registered {}", name, consumer);
188+
}
183189

184190
consumerList.add(consumer);
185191
if (consumerList.size() > 1

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3380,8 +3380,9 @@ public boolean isCompletedExceptionally() {
33803380
};
33813381
// assert error response
33823382
assertTrue(responseAssert.test(responseAssert));
3383-
// assert consumer-delete event occur
3384-
assertEquals(1L,
3383+
// The delete event will only occur after the future is completed.
3384+
// assert consumer-delete event will not occur.
3385+
assertEquals(0L,
33853386
deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
33863387
// Server will not close the connection
33873388
assertTrue(channel.isOpen());
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import com.carrotsearch.hppc.ObjectSet;
22+
import java.time.Duration;
23+
import java.util.List;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import lombok.extern.slf4j.Slf4j;
27+
import org.apache.pulsar.broker.BrokerTestUtil;
28+
import org.apache.pulsar.broker.service.Dispatcher;
29+
import org.apache.pulsar.common.naming.TopicName;
30+
import org.awaitility.Awaitility;
31+
import org.awaitility.reflect.WhiteboxImpl;
32+
import org.testng.Assert;
33+
import org.testng.annotations.AfterClass;
34+
import org.testng.annotations.BeforeClass;
35+
import org.testng.annotations.Test;
36+
37+
@Slf4j
38+
@Test(groups = "broker-api")
39+
public class SimpleProducerConsumerMLInitializeDelayTest extends ProducerConsumerBase {
40+
41+
@BeforeClass(alwaysRun = true)
42+
@Override
43+
protected void setup() throws Exception {
44+
super.internalSetup();
45+
super.producerBaseSetup();
46+
}
47+
48+
@AfterClass(alwaysRun = true)
49+
@Override
50+
protected void cleanup() throws Exception {
51+
super.internalCleanup();
52+
}
53+
54+
@Override
55+
protected void doInitConf() throws Exception {
56+
super.doInitConf();
57+
conf.setTopicLoadTimeoutSeconds(60 * 5);
58+
}
59+
60+
@Test(timeOut = 30 * 1000)
61+
public void testConsumerListMatchesConsumerSet() throws Exception {
62+
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
63+
final String subName = "sub";
64+
final int clientOperationTimeout = 3;
65+
final int loadMLDelayMillis = clientOperationTimeout * 3 * 1000;
66+
final int clientMaxBackoffSeconds = clientOperationTimeout * 2;
67+
admin.topics().createNonPartitionedTopic(topicName);
68+
// Create a client with a low operation timeout.
69+
PulsarClient client = PulsarClient.builder()
70+
.serviceUrl(lookupUrl.toString())
71+
.operationTimeout(clientOperationTimeout, TimeUnit.SECONDS)
72+
.maxBackoffInterval(clientMaxBackoffSeconds, TimeUnit.SECONDS)
73+
.build();
74+
Consumer consumer = client.newConsumer()
75+
.topic(topicName)
76+
.subscriptionName(subName)
77+
.subscriptionType(SubscriptionType.Shared)
78+
.subscribe();
79+
// Inject a delay for the initialization of ML, to make the consumer to register twice.
80+
// Consumer register twice: the first will be timeout, and try again.
81+
AtomicInteger delayTimes = new AtomicInteger();
82+
mockZooKeeper.delay(loadMLDelayMillis, (op, s) -> {
83+
if (op.toString().equals("GET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
84+
return delayTimes.incrementAndGet() == 1;
85+
}
86+
return false;
87+
});
88+
admin.topics().unload(topicName);
89+
// Verify: at last, "dispatcher.consumers.size" equals "dispatcher.consumerList.size".
90+
Awaitility.await().atMost(Duration.ofSeconds(loadMLDelayMillis * 3))
91+
.ignoreExceptions().untilAsserted(() -> {
92+
Dispatcher dispatcher = pulsar.getBrokerService()
93+
.getTopic(topicName, false).join().get()
94+
.getSubscription(subName).getDispatcher();
95+
ObjectSet consumerSet = WhiteboxImpl.getInternalState(dispatcher, "consumerSet");
96+
List consumerList = WhiteboxImpl.getInternalState(dispatcher, "consumerList");
97+
log.info("consumerSet_size: {}, consumerList_size: {}", consumerSet.size(), consumerList.size());
98+
Assert.assertEquals(consumerList.size(), 1);
99+
Assert.assertEquals(consumerSet.size(), 1);
100+
});
101+
102+
// Verify: the topic can be deleted.
103+
consumer.close();
104+
admin.topics().delete(topicName);
105+
// cleanup.
106+
client.close();
107+
}
108+
}

0 commit comments

Comments
 (0)