Skip to content

Commit 67e6238

Browse files
poorbarcodelhotari
authored andcommitted
[fix][broker] Once the cluster is configured incorrectly, the broker maintains the incorrect cluster configuration even if you removed it (apache#24419)
(cherry picked from commit e157cac)
1 parent 23b1f7b commit 67e6238

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,12 @@ public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterNam
812812
return CompletableFuture.completedFuture(null);
813813
}
814814
return client.closeAsync();
815+
}).thenCompose(__ -> {
816+
PulsarAdmin pulsarAdmin = clusterAdmins.remove(clusterName);
817+
if (pulsarAdmin != null) {
818+
pulsarAdmin.close();
819+
}
820+
return CompletableFuture.completedFuture(null);
815821
});
816822
}
817823

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.UUID;
2424
import java.util.concurrent.TimeUnit;
2525
import lombok.Cleanup;
26+
import org.apache.pulsar.client.admin.PulsarAdmin;
2627
import org.apache.pulsar.client.api.Producer;
2728
import org.apache.pulsar.client.api.PulsarClient;
29+
import org.apache.pulsar.common.naming.TopicName;
2830
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
2931
import org.awaitility.Awaitility;
3032
import org.testng.Assert;
@@ -88,6 +90,7 @@ public void testRemoveClusterFromNamespace() throws Exception {
8890
.build();
8991

9092
final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID();
93+
admin1.topics().createPartitionedTopic(topicName, 1);
9194

9295
Producer<byte[]> producer = client.newProducer()
9396
.topic(topicName)
@@ -98,9 +101,12 @@ public void testRemoveClusterFromNamespace() throws Exception {
98101
producer.close();
99102
client.close();
100103

101-
Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
104+
Replicator replicator = pulsar1.getBrokerService()
105+
.getTopicReference(TopicName.get(topicName).getPartition(0).toString())
102106
.get().getReplicators().get("r3");
103107

108+
PulsarAdmin replicatorAdmin = pulsar1.getBrokerService().getClusterAdmins().get("r3");
109+
104110
Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));
105111

106112
admin1.clusters().deleteCluster("r3");
@@ -110,5 +116,13 @@ public void testRemoveClusterFromNamespace() throws Exception {
110116

111117
Awaitility.await().untilAsserted(() -> Assert.assertNull(
112118
pulsar1.getBrokerService().getReplicationClients().get("r3")));
119+
Awaitility.await().untilAsserted(() -> Assert.assertNull(
120+
pulsar1.getBrokerService().getClusterAdmins().get("r3")));
121+
try {
122+
replicatorAdmin.clusters().getClusters();
123+
Assert.fail("Should get an error that pulsarAdmin has been closed");
124+
} catch (Exception e) {
125+
Assert.assertTrue(e.getMessage().contains("has been closed"));
126+
}
113127
}
114128
}

0 commit comments

Comments
 (0)