Skip to content

Commit 6038bbf

Browse files
committed
[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (apache#21948)
(cherry picked from commit b774666)
1 parent 076b55e commit 6038bbf

File tree

7 files changed

+239
-22
lines changed

7 files changed

+239
-22
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) {
247247
}
248248
startProducer();
249249
}).exceptionally(ex -> {
250-
log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
250+
log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
251251
+ " trigger a terminate. Replicator state: {}",
252252
localTopicName, replicatorId, STATE_UPDATER.get(this), ex);
253253
terminate();
@@ -376,9 +376,13 @@ public CompletableFuture<Void> terminate() {
376376
this.producer = null;
377377
// set the cursor as inactive.
378378
disableReplicatorRead();
379+
// release resources.
380+
doReleaseResources();
379381
});
380382
}
381383

384+
protected void doReleaseResources() {}
385+
382386
protected boolean tryChangeStatusToTerminating() {
383387
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){
384388
return true;
@@ -467,4 +471,8 @@ protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, Stat
467471
}
468472
return compareSetAndGetState(expect, update);
469473
}
474+
475+
public boolean isTerminated() {
476+
return state == State.Terminating || state == State.Terminated;
477+
}
470478
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
5151
boolean isConnected();
5252

5353
long getNumberOfEntriesInBacklog();
54+
55+
boolean isTerminated();
5456
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
449449
long waitTimeMillis = readFailureBackoff.next();
450450

451451
if (exception instanceof CursorAlreadyClosedException) {
452-
log.error("[{}] Error reading entries because replicator is"
452+
log.warn("[{}] Error reading entries because replicator is"
453453
+ " already deleted and cursor is already closed {}, ({})",
454454
replicatorId, ctx, exception.getMessage(), exception);
455455
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
@@ -569,7 +569,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
569569
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
570570
exception.getMessage(), exception);
571571
if (exception instanceof CursorAlreadyClosedException) {
572-
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
572+
log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
573573
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
574574
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
575575
terminate();
@@ -694,6 +694,11 @@ public boolean isConnected() {
694694
return producer != null && producer.isConnected();
695695
}
696696

697+
@Override
698+
protected void doReleaseResources() {
699+
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
700+
}
701+
697702
private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
698703

699704
@VisibleForTesting

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1630,6 +1630,7 @@ public CompletableFuture<Void> checkReplication() {
16301630
return deleteForcefully();
16311631
}
16321632

1633+
removeTerminatedReplicators(replicators);
16331634
List<CompletableFuture<Void>> futures = new ArrayList<>();
16341635

16351636
// Check for missing replicators
@@ -1668,6 +1669,8 @@ private CompletableFuture<Void> checkShadowReplication() {
16681669
if (log.isDebugEnabled()) {
16691670
log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
16701671
}
1672+
1673+
removeTerminatedReplicators(shadowReplicators);
16711674
List<CompletableFuture<Void>> futures = new ArrayList<>();
16721675

16731676
// Check for missing replicators
@@ -1818,19 +1821,30 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
18181821
if (replicationClient == null) {
18191822
return;
18201823
}
1821-
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
1822-
try {
1823-
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
1824-
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
1825-
} catch (PulsarServerException e) {
1826-
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
1824+
lock.readLock().lock();
1825+
try {
1826+
if (isClosingOrDeleting) {
1827+
// Whether is "transferring" or not, do not create new replicator.
1828+
log.info("[{}] Skip to create replicator because this topic is closing."
1829+
+ " remote cluster: {}. State of transferring : {}",
1830+
topic, remoteCluster, transferring);
1831+
return;
18271832
}
1828-
return null;
1829-
});
1830-
1831-
// clean up replicator if startup is failed
1832-
if (replicator == null) {
1833-
replicators.removeNullValue(remoteCluster);
1833+
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
1834+
try {
1835+
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
1836+
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
1837+
} catch (PulsarServerException e) {
1838+
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
1839+
}
1840+
return null;
1841+
});
1842+
// clean up replicator if startup is failed
1843+
if (replicator == null) {
1844+
replicators.removeNullValue(remoteCluster);
1845+
}
1846+
} finally {
1847+
lock.readLock().unlock();
18341848
}
18351849
});
18361850
}
@@ -3484,9 +3498,27 @@ private void fenceTopicToCloseOrDelete() {
34843498
}
34853499

34863500
private void unfenceTopicToResume() {
3487-
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
34883501
isFenced = false;
34893502
isClosingOrDeleting = false;
3503+
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
3504+
unfenceReplicatorsToResume();
3505+
}
3506+
3507+
private void unfenceReplicatorsToResume() {
3508+
checkReplication();
3509+
checkShadowReplication();
3510+
}
3511+
3512+
private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
3513+
Map<String, Replicator> terminatedReplicators = new HashMap<>();
3514+
replicators.forEach((cluster, replicator) -> {
3515+
if (replicator.isTerminated()) {
3516+
terminatedReplicators.put(cluster, replicator);
3517+
}
3518+
});
3519+
terminatedReplicators.entrySet().forEach(entry -> {
3520+
replicators.remove(entry.getKey(), entry.getValue());
3521+
});
34903522
}
34913523

34923524
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,21 @@
2020

2121
import static org.mockito.Mockito.any;
2222
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.spy;
2425
import static org.testng.Assert.assertEquals;
2526
import static org.testng.Assert.assertFalse;
2627
import static org.testng.Assert.assertNotEquals;
2728
import static org.testng.Assert.assertTrue;
2829
import static org.testng.Assert.fail;
30+
import com.google.common.collect.Sets;
2931
import io.netty.util.concurrent.FastThreadLocalThread;
3032
import java.lang.reflect.Field;
3133
import java.lang.reflect.Method;
3234
import java.time.Duration;
3335
import java.util.Arrays;
3436
import java.util.Optional;
37+
import java.util.UUID;
3538
import java.util.concurrent.CompletableFuture;
3639
import java.util.concurrent.CountDownLatch;
3740
import java.util.concurrent.TimeUnit;
@@ -48,6 +51,7 @@
4851
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
4952
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
5053
import org.apache.pulsar.broker.BrokerTestUtil;
54+
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
5155
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
5256
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
5357
import org.apache.pulsar.client.api.Consumer;
@@ -492,4 +496,166 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw
492496
admin1.topics().deletePartitionedTopic(topicName);
493497
admin2.topics().deletePartitionedTopic(topicName);
494498
}
499+
500+
/**
501+
* See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
502+
* Steps:
503+
* 1.Create topic, does not enable replication now.
504+
* - The topic will be loaded in the memory.
505+
* 2.Enable namespace level replication.
506+
* - Broker creates a replicator, and the internal producer of replicator is starting.
507+
* - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
508+
* 3.Unload bundle.
509+
* - Starting to close the topic.
510+
* - The replicator will be closed, but it will not close the internal producer, because the producer has not
511+
* been created successfully.
512+
* - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
513+
* in the process of being closed now.
514+
* 4.Internal producer retry to connect.
515+
* - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
516+
* will not be closed now.
517+
* 5.Topic closed.
518+
* - Cancel the stuck of closing the "repl.cursor".
519+
* - The topic is wholly closed.
520+
* 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
521+
* to the remote cluster.
522+
*/
523+
@Test
524+
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
525+
final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
526+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
527+
// 1.Create topic, does not enable replication now.
528+
admin1.namespaces().createNamespace(namespaceName);
529+
admin2.namespaces().createNamespace(namespaceName);
530+
admin1.topics().createNonPartitionedTopic(topicName);
531+
PersistentTopic persistentTopic =
532+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
533+
534+
// We inject an error to make the internal producer fail to connect.
535+
// The delay time of next retry to create producer is below:
536+
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
537+
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
538+
final AtomicInteger createProducerCounter = new AtomicInteger();
539+
final int failTimes = 6;
540+
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
541+
if (topicName.equals(producerCnf.getTopicName())) {
542+
// There is a switch to determine create producer successfully or not.
543+
if (createProducerCounter.incrementAndGet() > failTimes) {
544+
return originalProducer;
545+
}
546+
log.info("Retry create replicator.producer count: {}", createProducerCounter);
547+
// Release producer and fail callback.
548+
originalProducer.closeAsync();
549+
throw new RuntimeException("mock error");
550+
}
551+
return originalProducer;
552+
});
553+
554+
// 2.Enable namespace level replication.
555+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
556+
AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
557+
Awaitility.await().untilAsserted(() -> {
558+
assertFalse(persistentTopic.getReplicators().isEmpty());
559+
replicator.set(
560+
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
561+
// Since we inject a producer creation error, the replicator can not start successfully.
562+
assertFalse(replicator.get().isConnected());
563+
});
564+
565+
// We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
566+
// producer of the replicator started.
567+
SpyCursor spyCursor =
568+
spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
569+
CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);
570+
571+
// 3.Unload bundle: call "topic.close(false)".
572+
// Stuck start new producer, until the state of replicator change to Stopped.
573+
// The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
574+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
575+
assertTrue(createProducerCounter.get() >= failTimes);
576+
});
577+
CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
578+
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
579+
String state = String.valueOf(replicator.get().getState());
580+
log.error("replicator state: {}", state);
581+
assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
582+
});
583+
584+
// 5.Delay close cursor, until "replicator.producer" create successfully.
585+
// The next once retry time of create "replicator.producer" will be 3.2s.
586+
Thread.sleep(4 * 1000);
587+
log.info("Replicator.state: {}", replicator.get().getState());
588+
cursorCloseSignal.startClose();
589+
cursorCloseSignal.startCallback();
590+
// Wait for topic close successfully.
591+
topicCloseFuture.join();
592+
593+
// 6. Verify there is no orphan producer on the remote cluster.
594+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
595+
PersistentTopic persistentTopic2 =
596+
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
597+
assertEquals(persistentTopic2.getProducers().size(), 0);
598+
Assert.assertFalse(replicator.get().isConnected());
599+
});
600+
601+
// cleanup.
602+
cleanupTopics(namespaceName, () -> {
603+
admin1.topics().delete(topicName);
604+
admin2.topics().delete(topicName);
605+
});
606+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
607+
admin1.namespaces().deleteNamespace(namespaceName);
608+
admin2.namespaces().deleteNamespace(namespaceName);
609+
}
610+
611+
@Test
612+
public void testUnFenceTopicToReuse() throws Exception {
613+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
614+
// Wait for replicator started.
615+
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
616+
waitReplicatorStarted(topicName);
617+
618+
// Inject an error to make topic close fails.
619+
final String mockProducerName = UUID.randomUUID().toString();
620+
final org.apache.pulsar.broker.service.Producer mockProducer =
621+
mock(org.apache.pulsar.broker.service.Producer.class);
622+
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
623+
.when(mockProducer).disconnect(any());
624+
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
625+
.when(mockProducer).disconnect();
626+
PersistentTopic persistentTopic =
627+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
628+
persistentTopic.getProducers().put(mockProducerName, mockProducer);
629+
630+
// Do close.
631+
GeoPersistentReplicator replicator1 =
632+
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
633+
try {
634+
persistentTopic.close(true, false).join();
635+
fail("Expected close fails due to a producer close fails");
636+
} catch (Exception ex) {
637+
log.info("Expected error: {}", ex.getMessage());
638+
}
639+
640+
// Broker will call `topic.unfenceTopicToResume` if close clients fails.
641+
// Verify: the replicator will be re-created.
642+
Awaitility.await().untilAsserted(() -> {
643+
assertTrue(producer1.isConnected());
644+
GeoPersistentReplicator replicator2 =
645+
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
646+
assertNotEquals(replicator1, replicator2);
647+
assertFalse(replicator1.isConnected());
648+
assertFalse(replicator1.producer != null && replicator1.producer.isConnected());
649+
assertTrue(replicator2.isConnected());
650+
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
651+
});
652+
653+
// cleanup.
654+
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
655+
producer1.close();
656+
cleanupTopics(() -> {
657+
admin1.topics().delete(topicName);
658+
admin2.topics().delete(topicName);
659+
});
660+
}
495661
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
150150
}
151151

152152
protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
153-
waitChangeEventsInit(replicatedNamespace);
154-
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1));
155-
admin1.namespaces().unload(replicatedNamespace);
153+
cleanupTopics(replicatedNamespace, cleanupTopicAction);
154+
}
155+
156+
protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
157+
waitChangeEventsInit(namespace);
158+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
159+
admin1.namespaces().unload(namespace);
156160
cleanupTopicAction.run();
157-
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
158-
waitChangeEventsInit(replicatedNamespace);
161+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
162+
waitChangeEventsInit(namespace);
159163
}
160164

161165
protected void waitChangeEventsInit(String namespace) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public Object[][] partitionedTopicProvider() {
154154
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
155155
}
156156

157-
@Test
157+
@Test(priority = Integer.MAX_VALUE)
158158
public void testConfigChange() throws Exception {
159159
log.info("--- Starting ReplicatorTest::testConfigChange ---");
160160
// This test is to verify that the config change on global namespace is successfully applied in broker during

0 commit comments

Comments
 (0)