Skip to content

Commit 93afd89

Browse files
authored
[fix][broker] One topic can be closed multiple times concurrently (apache#17524)
1 parent 264722f commit 93afd89

File tree

4 files changed

+290
-44
lines changed

4 files changed

+290
-44
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 162 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.concurrent.TimeUnit;
5151
import java.util.concurrent.atomic.AtomicBoolean;
5252
import java.util.concurrent.atomic.AtomicLong;
53+
import java.util.concurrent.atomic.AtomicReference;
5354
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5455
import java.util.function.BiFunction;
5556
import java.util.stream.Collectors;
@@ -276,6 +277,8 @@ protected TopicStatsHelper initialValue() {
276277
@Getter
277278
private final ExecutorService orderedExecutor;
278279

280+
private volatile CloseFutures closeFutures;
281+
279282
@Getter
280283
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();
281284

@@ -299,6 +302,50 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult {
299302
Long estimatedOldestUnacknowledgedMessageTimestamp;
300303
}
301304

305+
/***
306+
* We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
307+
* the in-progress one when it is called the second time.
308+
*
309+
* The topic closing will be called the below scenarios:
310+
* 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}.
311+
* 2. Namespace bundle transfer or unloading.
312+
* a. The unloading topic triggered by unloading namespace bundles will not wait for clients disconnect. Relate
313+
* to {@link CloseFutures#notWaitDisconnectClients}.
314+
* b. The unloading topic triggered by unloading namespace bundles was seperated to two steps when using
315+
* {@link ExtensibleLoadManagerImpl}.
316+
* b-1. step-1: fence the topic on the original Broker, and do not trigger reconnections of clients. Relate
317+
* to {@link CloseFutures#transferring}. This step is a half closing.
318+
* b-2. step-2: send the owner broker information to clients and disconnect clients. Relate
319+
* to {@link CloseFutures#notWaitDisconnectClients}.
320+
*
321+
* The three futures will be setting as the below rule:
322+
* Event: Topic close.
323+
* - If the first one closing is called by "close and not disconnect clients":
324+
* - {@link CloseFutures#transferring} will be initialized as "close and not disconnect clients".
325+
* - {@link CloseFutures#waitDisconnectClients} ang {@link CloseFutures#notWaitDisconnectClients} will be empty,
326+
* the second closing will do a new close after {@link CloseFutures#transferring} is completed.
327+
* - If the first one closing is called by "close and not wait for clients disconnect":
328+
* - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect".
329+
* - {@link CloseFutures#notWaitDisconnectClients} ang {@link CloseFutures#transferring} will be
330+
* initialized as "not waiting for clients disconnect" .
331+
* - If the first one closing is called by "close and wait for clients disconnect", the three futures will be
332+
* initialized as "waiting for clients disconnect".
333+
* Event: Topic delete.
334+
* the three futures will be initialized as "waiting for clients disconnect".
335+
*/
336+
private class CloseFutures {
337+
private final CompletableFuture<Void> transferring;
338+
private final CompletableFuture<Void> notWaitDisconnectClients;
339+
private final CompletableFuture<Void> waitDisconnectClients;
340+
341+
public CloseFutures(CompletableFuture<Void> transferring, CompletableFuture<Void> waitDisconnectClients,
342+
CompletableFuture<Void> notWaitDisconnectClients) {
343+
this.transferring = transferring;
344+
this.waitDisconnectClients = waitDisconnectClients;
345+
this.notWaitDisconnectClients = notWaitDisconnectClients;
346+
}
347+
}
348+
302349
private static class TopicStatsHelper {
303350
public double averageMsgSize;
304351
public double aggMsgRateIn;
@@ -1417,8 +1464,11 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
14171464
}
14181465

14191466
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
1467+
// Mark the progress of close to prevent close calling concurrently.
1468+
this.closeFutures =
1469+
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
14201470

1421-
return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
1471+
CompletableFuture<Void> res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources()
14221472
.getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> {
14231473
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
14241474

@@ -1528,6 +1578,11 @@ public void deleteLedgerComplete(Object ctx) {
15281578
unfenceTopicToResume();
15291579
}
15301580
});
1581+
1582+
FutureUtil.completeAfter(closeFutures.transferring, res);
1583+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, res);
1584+
FutureUtil.completeAfter(closeFutures.waitDisconnectClients, res);
1585+
return res;
15311586
} finally {
15321587
lock.writeLock().unlock();
15331588
}
@@ -1543,6 +1598,12 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
15431598
return close(true, closeWithoutWaitingClientDisconnect);
15441599
}
15451600

1601+
private enum CloseTypes {
1602+
transferring,
1603+
notWaitDisconnectClients,
1604+
waitDisconnectClients;
1605+
}
1606+
15461607
/**
15471608
* Close this topic - close all producers and subscriptions associated with this topic.
15481609
*
@@ -1553,32 +1614,57 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
15531614
@Override
15541615
public CompletableFuture<Void> close(
15551616
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
1556-
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
1557-
15581617
lock.writeLock().lock();
1559-
try {
1560-
if (!disconnectClients) {
1561-
transferring = true;
1562-
}
1618+
// Choose the close type.
1619+
CloseTypes closeType;
1620+
if (!disconnectClients) {
1621+
closeType = CloseTypes.transferring;
1622+
} else if (closeWithoutWaitingClientDisconnect) {
1623+
closeType = CloseTypes.notWaitDisconnectClients;
1624+
} else {
15631625
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
15641626
// forcefully wants to close managed-ledger without waiting all resources to be closed.
1565-
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
1566-
fenceTopicToCloseOrDelete();
1627+
closeType = CloseTypes.waitDisconnectClients;
1628+
}
1629+
/** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/
1630+
CompletableFuture<Void> inProgressTransferCloseTask = null;
1631+
try {
1632+
// Return in-progress future if exists.
1633+
if (isClosingOrDeleting) {
1634+
if (closeType == CloseTypes.transferring) {
1635+
return closeFutures.transferring;
1636+
}
1637+
if (closeType == CloseTypes.notWaitDisconnectClients && closeFutures.notWaitDisconnectClients != null) {
1638+
return closeFutures.notWaitDisconnectClients;
1639+
}
1640+
if (closeType == CloseTypes.waitDisconnectClients && closeFutures.waitDisconnectClients != null) {
1641+
return closeFutures.waitDisconnectClients;
1642+
}
1643+
if (transferring) {
1644+
inProgressTransferCloseTask = closeFutures.transferring;
1645+
}
1646+
}
1647+
fenceTopicToCloseOrDelete();
1648+
if (closeType == CloseTypes.transferring) {
1649+
transferring = true;
1650+
this.closeFutures = new CloseFutures(new CompletableFuture(), null, null);
15671651
} else {
1568-
log.warn("[{}] Topic is already being closed or deleted", topic);
1569-
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
1570-
return closeFuture;
1652+
this.closeFutures =
1653+
new CloseFutures(new CompletableFuture(), new CompletableFuture(), new CompletableFuture());
15711654
}
15721655
} finally {
15731656
lock.writeLock().unlock();
15741657
}
15751658

15761659
List<CompletableFuture<Void>> futures = new ArrayList<>();
1660+
if (inProgressTransferCloseTask != null) {
1661+
futures.add(inProgressTransferCloseTask);
1662+
}
15771663

15781664
futures.add(transactionBuffer.closeAsync());
15791665
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
15801666
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
1581-
if (disconnectClients) {
1667+
if (closeType != CloseTypes.transferring) {
15821668
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
15831669
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
15841670
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)));
@@ -1616,40 +1702,79 @@ public CompletableFuture<Void> close(
16161702
}
16171703
}
16181704

1619-
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
1620-
? CompletableFuture.completedFuture(null)
1621-
: FutureUtil.waitForAll(futures);
1705+
CompletableFuture<Void> disconnectClientsInCurrentCall = null;
1706+
// Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring.
1707+
AtomicReference<CompletableFuture<Void>> disconnectClientsToCache = new AtomicReference<>();
1708+
switch (closeType) {
1709+
case transferring -> {
1710+
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
1711+
break;
1712+
}
1713+
case notWaitDisconnectClients -> {
1714+
disconnectClientsInCurrentCall = CompletableFuture.completedFuture(null);
1715+
disconnectClientsToCache.set(FutureUtil.waitForAll(futures));
1716+
break;
1717+
}
1718+
case waitDisconnectClients -> {
1719+
disconnectClientsInCurrentCall = FutureUtil.waitForAll(futures);
1720+
disconnectClientsToCache.set(disconnectClientsInCurrentCall);
1721+
}
1722+
}
16221723

1623-
clientCloseFuture.thenRun(() -> {
1624-
// After having disconnected all producers/consumers, close the managed ledger
1625-
ledger.asyncClose(new CloseCallback() {
1626-
@Override
1627-
public void closeComplete(Object ctx) {
1628-
if (disconnectClients) {
1629-
// Everything is now closed, remove the topic from map
1630-
disposeTopic(closeFuture);
1631-
} else {
1632-
closeFuture.complete(null);
1633-
}
1724+
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
1725+
Runnable closeLedgerAfterCloseClients = (() -> ledger.asyncClose(new CloseCallback() {
1726+
@Override
1727+
public void closeComplete(Object ctx) {
1728+
if (closeType != CloseTypes.transferring) {
1729+
// Everything is now closed, remove the topic from map
1730+
disposeTopic(closeFuture);
1731+
} else {
1732+
closeFuture.complete(null);
16341733
}
1734+
}
16351735

1636-
@Override
1637-
public void closeFailed(ManagedLedgerException exception, Object ctx) {
1638-
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
1639-
if (disconnectClients) {
1640-
disposeTopic(closeFuture);
1641-
} else {
1642-
closeFuture.complete(null);
1643-
}
1736+
@Override
1737+
public void closeFailed(ManagedLedgerException exception, Object ctx) {
1738+
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
1739+
if (closeType != CloseTypes.transferring) {
1740+
disposeTopic(closeFuture);
1741+
} else {
1742+
closeFuture.complete(null);
16441743
}
1645-
}, null);
1646-
}).exceptionally(exception -> {
1744+
}
1745+
}, null));
1746+
1747+
disconnectClientsInCurrentCall.thenRun(closeLedgerAfterCloseClients).exceptionally(exception -> {
16471748
log.error("[{}] Error closing topic", topic, exception);
16481749
unfenceTopicToResume();
16491750
closeFuture.completeExceptionally(exception);
16501751
return null;
16511752
});
16521753

1754+
switch (closeType) {
1755+
case transferring -> {
1756+
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
1757+
break;
1758+
}
1759+
case notWaitDisconnectClients -> {
1760+
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
1761+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
1762+
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients,
1763+
closeFuture.thenCompose(ignore -> disconnectClientsToCache.get().exceptionally(ex -> {
1764+
// Since the managed ledger has been closed, eat the error of clients disconnection.
1765+
log.error("[{}] Closed managed ledger, but disconnect clients failed,"
1766+
+ " this topic will be marked closed", topic, ex);
1767+
return null;
1768+
})));
1769+
break;
1770+
}
1771+
case waitDisconnectClients -> {
1772+
FutureUtil.completeAfterAll(closeFutures.transferring, closeFuture);
1773+
FutureUtil.completeAfter(closeFutures.notWaitDisconnectClients, closeFuture);
1774+
FutureUtil.completeAfterAll(closeFutures.waitDisconnectClients, closeFuture);
1775+
}
1776+
}
1777+
16531778
return closeFuture;
16541779
}
16551780

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.testng.Assert.assertEquals;
2626
import static org.testng.Assert.assertFalse;
2727
import static org.testng.Assert.assertNotEquals;
28+
import static org.testng.Assert.assertNull;
2829
import static org.testng.Assert.assertTrue;
2930
import static org.testng.Assert.fail;
3031
import com.google.common.collect.Sets;
@@ -226,7 +227,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
226227
});
227228
}
228229

229-
private void injectMockReplicatorProducerBuilder(
230+
private Runnable injectMockReplicatorProducerBuilder(
230231
BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator)
231232
throws Exception {
232233
String cluster2 = pulsar2.getConfig().getClusterName();
@@ -246,7 +247,8 @@ private void injectMockReplicatorProducerBuilder(
246247
replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients");
247248
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
248249
PulsarClient spyClient = spy(internalClient);
249-
replicationClients.put(cluster2, spyClient);
250+
assertTrue(replicationClients.remove(cluster2, internalClient));
251+
assertNull(replicationClients.putIfAbsent(cluster2, spyClient));
250252

251253
// Inject producer decorator.
252254
doAnswer(invocation -> {
@@ -275,6 +277,12 @@ private void injectMockReplicatorProducerBuilder(
275277
}).when(spyProducerBuilder).createAsync();
276278
return spyProducerBuilder;
277279
}).when(spyClient).newProducer(any(Schema.class));
280+
281+
// Return a cleanup injection task;
282+
return () -> {
283+
assertTrue(replicationClients.remove(cluster2, spyClient));
284+
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
285+
};
278286
}
279287

280288
private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception {
@@ -368,7 +376,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
368376
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
369377
final AtomicInteger createProducerCounter = new AtomicInteger();
370378
final int failTimes = 6;
371-
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
379+
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
372380
if (topicName.equals(producerCnf.getTopicName())) {
373381
// There is a switch to determine create producer successfully or not.
374382
if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -427,6 +435,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
427435
});
428436

429437
// cleanup.
438+
taskToClearInjection.run();
430439
cleanupTopics(() -> {
431440
admin1.topics().delete(topicName);
432441
admin2.topics().delete(topicName);
@@ -531,7 +540,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
531540
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
532541
final AtomicInteger createProducerCounter = new AtomicInteger();
533542
final int failTimes = 6;
534-
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
543+
Runnable taskToClearInjection = injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
535544
if (topicName.equals(producerCnf.getTopicName())) {
536545
// There is a switch to determine create producer successfully or not.
537546
if (createProducerCounter.incrementAndGet() > failTimes) {
@@ -593,6 +602,7 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception
593602
});
594603

595604
// cleanup.
605+
taskToClearInjection.run();
596606
cleanupTopics(namespaceName, () -> {
597607
admin1.topics().delete(topicName);
598608
admin2.topics().delete(topicName);
@@ -644,8 +654,9 @@ public void testUnFenceTopicToReuse() throws Exception {
644654
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
645655
});
646656

647-
// cleanup.
657+
// cleanup the injection.
648658
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
659+
// cleanup.
649660
producer1.close();
650661
cleanupTopics(() -> {
651662
admin1.topics().delete(topicName);

0 commit comments

Comments
 (0)