Skip to content

Commit 620fe9b

Browse files
committed
[fix][broker] One topic can be closed multiple times concurrently (apache#17524)
(cherry picked from commit 93afd89)
1 parent 6038bbf commit 620fe9b

File tree

4 files changed

+233
-28
lines changed

4 files changed

+233
-28
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 104 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.TimeUnit;
4949
import java.util.concurrent.atomic.AtomicBoolean;
5050
import java.util.concurrent.atomic.AtomicLong;
51+
import java.util.concurrent.atomic.AtomicReference;
5152
import java.util.function.BiFunction;
5253
import java.util.stream.Collectors;
5354
import javax.annotation.Nonnull;
@@ -258,6 +259,37 @@ protected TopicStatsHelper initialValue() {
258259
@Getter
259260
private final ExecutorService orderedExecutor;
260261

262+
private volatile CloseFutures closeFutures;
263+
264+
/***
265+
* We use 2 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
266+
* the in-progress one when it is called the second time.
267+
*
268+
* The topic closing will be called the below scenarios:
269+
* 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}.
270+
* 2. Namespace bundle unloading. The unloading topic triggered by unloading namespace bundles will not wait for
271+
* clients disconnect. See {@link CloseFutures#notWaitDisconnectClients}.
272+
*
273+
* The two futures will be setting as the below rule:
274+
* Event: Topic close.
275+
* - If the first one closing is called by "close and not wait for clients disconnect":
276+
* - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect".
277+
* - If the first one closing is called by "close and wait for clients disconnect", the two futures will be
278+
* initialized as "waiting for clients disconnect".
279+
* Event: Topic delete.
280+
* the three futures will be initialized as "waiting for clients disconnect".
281+
*/
282+
private class CloseFutures {
283+
private final CompletableFuture<Void> notWaitDisconnectClients;
284+
private final CompletableFuture<Void> waitDisconnectClients;
285+
286+
public CloseFutures(CompletableFuture<Void> waitDisconnectClients,
287+
CompletableFuture<Void> notWaitDisconnectClients) {
288+
this.waitDisconnectClients = waitDisconnectClients;
289+
this.notWaitDisconnectClients = notWaitDisconnectClients;
290+
}
291+
}
292+
261293
private static class TopicStatsHelper {
262294
public double averageMsgSize;
263295
public double aggMsgRateIn;
@@ -1349,8 +1381,10 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
13491381
}
13501382

13511383
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
1384+
// Mark the progress of close to prevent close calling concurrently.
1385+
this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture());
13521386

1353-
return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
1387+
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
13541388
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
13551389
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
13561390

@@ -1453,6 +1487,10 @@ public void deleteLedgerComplete(Object ctx) {
14531487
unfenceTopicToResume();
14541488
}
14551489
});
1490+
1491+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
1492+
FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
1493+
return res;
14561494
} finally {
14571495
lock.writeLock().unlock();
14581496
}
@@ -1463,6 +1501,11 @@ public CompletableFuture<Void> close() {
14631501
return close(false);
14641502
}
14651503

1504+
private enum CloseTypes {
1505+
notWaitDisconnectClients,
1506+
waitDisconnectClients;
1507+
}
1508+
14661509
/**
14671510
* Close this topic - close all producers and subscriptions associated with this topic.
14681511
*
@@ -1471,19 +1514,32 @@ public CompletableFuture<Void> close() {
14711514
*/
14721515
@Override
14731516
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
1474-
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
14751517

1476-
lock.writeLock().lock();
1477-
try {
1518+
CloseTypes closeType;
1519+
if (closeWithoutWaitingClientDisconnect) {
1520+
closeType = CloseTypes.notWaitDisconnectClients;
1521+
} else {
14781522
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
14791523
// forcefully wants to close managed-ledger without waiting all resources to be closed.
1480-
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
1481-
fenceTopicToCloseOrDelete();
1482-
} else {
1483-
log.warn("[{}] Topic is already being closed or deleted", topic);
1484-
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
1485-
return closeFuture;
1524+
closeType = CloseTypes.waitDisconnectClients;
1525+
}
1526+
1527+
lock.writeLock().lock();
1528+
try {
1529+
// Return in-progress future if exists.
1530+
if (isClosingOrDeleting) {
1531+
switch (closeType) {
1532+
case notWaitDisconnectClients -> {
1533+
return closeFutures.notWaitDisconnectClients;
1534+
}
1535+
case waitDisconnectClients -> {
1536+
return closeFutures.waitDisconnectClients;
1537+
}
1538+
}
14861539
}
1540+
// No in-progress closing.
1541+
fenceTopicToCloseOrDelete();
1542+
this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture());
14871543
} finally {
14881544
lock.writeLock().unlock();
14891545
}
@@ -1513,11 +1569,22 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
15131569
});
15141570
}
15151571

1516-
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
1517-
? CompletableFuture.completedFuture(null)
1518-
: FutureUtil.waitForAll(futures);
1572+
CompletableFuture<Void> disconnectClientsInCurrentCall = null;
1573+
AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
1574+
switch (closeType) {
1575+
case notWaitDisconnectClients -> {
1576+
disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
1577+
disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
1578+
break;
1579+
}
1580+
case waitDisconnectClients -> {
1581+
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
1582+
disconnectClientsToCache.set(disconnectClientsInCurrentCall);
1583+
}
1584+
}
1585+
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
15191586

1520-
clientCloseFuture.thenRun(() -> {
1587+
Runnable closeLedgerAfterCloseClients = () -> {
15211588
// After having disconnected all producers/consumers, close the managed ledger
15221589
ledger.asyncClose(new CloseCallback() {
15231590
@Override
@@ -1532,13 +1599,32 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
15321599
disposeTopic(closeFuture);
15331600
}
15341601
}, null);
1535-
}).exceptionally(exception -> {
1602+
};
1603+
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
15361604
log.error("[{}] Error closing topic", topic, exception);
15371605
unfenceTopicToResume();
15381606
closeFuture.completeExceptionally(exception);
15391607
return null;
15401608
});
15411609

1610+
switch (closeType) {
1611+
case notWaitDisconnectClients -> {
1612+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
1613+
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
1614+
closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
1615+
// Since the managed ledger has been closed, eat the error of clients disconnection.
1616+
log.error("[{}] Closed managed ledger, but disconnect clients failed,"
1617+
+ " this topic will be marked closed", topic, ex);
1618+
return null;
1619+
})));
1620+
break;
1621+
}
1622+
case waitDisconnectClients -> {
1623+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
1624+
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
1625+
}
1626+
}
1627+
15421628
return closeFuture;
15431629
}
15441630

@@ -1824,10 +1910,10 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
18241910
lock.readLock().lock();
18251911
try {
18261912
if (isClosingOrDeleting) {
1827-
// Whether is "transferring" or not, do not create new replicator.
1913+
// Do not create new replicator.
18281914
log.info("[{}] Skip to create replicator because this topic is closing."
1829-
+ " remote cluster: {}. State of transferring : {}",
1830-
topic, remoteCluster, transferring);
1915+
+ " remote cluster: {}.",
1916+
topic, remoteCluster);
18311917
return;
18321918
}
18331919
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertFalse;
2727
import static org.testng.Assert.assertNotEquals;
28+
import static org.testng.Assert.assertNull;
2829
import static org.testng.Assert.assertTrue;
2930
import static org.testng.Assert.fail;
3031
import com.google.common.collect.Sets;
@@ -232,7 +233,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
232233
});
233234
}
234235

235-
private void injectMockReplicatorProducerBuilder(
236+
private Runnable injectMockReplicatorProducerBuilder(
236237
BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator)
237238
throws Exception {
238239
String cluster2 = pulsar2.getConfig().getClusterName();
@@ -252,7 +253,8 @@ private void injectMockReplicatorProducerBuilder(
252253
replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients");
253254
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
254255
PulsarClient spyClient = spy(internalClient);
255-
replicationClients.put(cluster2, spyClient);
256+
assertTrue(replicationClients.remove(cluster2, internalClient));
257+
assertNull(replicationClients.putIfAbsent(cluster2, spyClient));
256258

257259
// Inject producer decorator.
258260
doAnswer(invocation -> {
@@ -281,6 +283,12 @@ private void injectMockReplicatorProducerBuilder(
281283
}).when(spyProducerBuilder).createAsync();
282284
return spyProducerBuilder;
283285
}).when(spyClient).newProducer(any(Schema.class));
286+
287+
// Return a cleanup injection task;
288+
return () -> {
289+
assertTrue(replicationClients.remove(cluster2, spyClient));
290+
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
291+
};
284292
}
285293

286294
private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception {
@@ -374,7 +382,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
374382
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
375383
final AtomicInteger createProducerCounter = new AtomicInteger();
376384
final int failTimes = 6;
377-
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
385+
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
378386
if (topicName.equals(producerCnf.getTopicName())) {
379387
// There is a switch to determine create producer successfully or not.
380388
if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -433,6 +441,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
433441
});
434442

435443
// cleanup.
444+
taskToClearInjection.run();
436445
cleanupTopics(() -> {
437446
admin1.topics().delete(topicName);
438447
admin2.topics().delete(topicName);
@@ -537,7 +546,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
537546
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
538547
final AtomicInteger createProducerCounter = new AtomicInteger();
539548
final int failTimes = 6;
540-
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
549+
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
541550
if (topicName.equals(producerCnf.getTopicName())) {
542551
// There is a switch to determine create producer successfully or not.
543552
if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -599,6 +608,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
599608
});
600609

601610
// cleanup.
611+
taskToClearInjection.run();
602612
cleanupTopics(namespaceName, () -> {
603613
admin1.topics().delete(topicName);
604614
admin2.topics().delete(topicName);
@@ -619,8 +629,6 @@ public void testUnFenceTopicToReuse() throws Exception {
619629
final String mockProducerName = UUID.randomUUID().toString();
620630
final org.apache.pulsar.broker.service.Producer mockProducer =
621631
mock(org.apache.pulsar.broker.service.Producer.class);
622-
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
623-
.when(mockProducer).disconnect(any());
624632
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
625633
.when(mockProducer).disconnect();
626634
PersistentTopic persistentTopic =
@@ -631,7 +639,7 @@ public void testUnFenceTopicToReuse() throws Exception {
631639
GeoPersistentReplicator replicator1 =
632640
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
633641
try {
634-
persistentTopic.close(true, false).join();
642+
persistentTopic.close(false).join();
635643
fail("Expected close fails due to a producer close fails");
636644
} catch (Exception ex) {
637645
log.info("Expected error: {}", ex.getMessage());
@@ -650,8 +658,9 @@ public void testUnFenceTopicToReuse() throws Exception {
650658
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
651659
});
652660

653-
// cleanup.
661+
// cleanup the injection.
654662
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
663+
// cleanup.
655664
producer1.close();
656665
cleanupTopics(() -> {
657666
admin1.topics().delete(topicName);

0 commit comments

Comments
 (0)