Skip to content

Commit 7e7ab9f

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][broker] Orphan schema after disabled a cluster for a namespace (apache#24223)
(cherry picked from commit 2d78cbd) (cherry picked from commit af3c821)
1 parent 9d93892 commit 7e7ab9f

File tree

2 files changed

+90
-9
lines changed

2 files changed

+90
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.ScheduledFuture;
5050
import java.util.concurrent.TimeUnit;
5151
import java.util.concurrent.atomic.AtomicBoolean;
52+
import java.util.concurrent.atomic.AtomicInteger;
5253
import java.util.concurrent.atomic.AtomicLong;
5354
import java.util.concurrent.atomic.AtomicReference;
5455
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -1842,7 +1843,9 @@ public CompletableFuture<Void> checkReplication() {
18421843
if (!success) {
18431844
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
18441845
// because pulsar doesn't serve global topic without local repl-cluster configured.
1845-
return deleteForcefully();
1846+
return deleteForcefully().thenCompose(ignore -> {
1847+
return deleteSchemaAndPoliciesIfClusterRemoved();
1848+
});
18461849
}
18471850

18481851
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
@@ -1885,6 +1888,55 @@ public CompletableFuture<Void> checkReplication() {
18851888
});
18861889
}
18871890

1891+
CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
1892+
TopicName tName = TopicName.get(topic);
1893+
if (!tName.isPartitioned()) {
1894+
return CompletableFuture.completedFuture(null);
1895+
}
1896+
TopicName partitionedName = TopicName.get(tName.getPartitionedTopicName());
1897+
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
1898+
.getPartitionedTopicResources()
1899+
.getPartitionedTopicMetadataAsync(partitionedName)
1900+
.thenApply(metadataOp -> {
1901+
if (metadataOp.isEmpty()) {
1902+
return null;
1903+
}
1904+
AtomicInteger checkedCounter = new AtomicInteger(metadataOp.get().partitions);
1905+
for (int i = 0; i < metadataOp.get().partitions; i++) {
1906+
brokerService.getPulsar().getPulsarResources().getTopicResources()
1907+
.persistentTopicExists(partitionedName.getPartition(i)).thenAccept(b -> {
1908+
if (!b) {
1909+
int leftPartitions = checkedCounter.decrementAndGet();
1910+
log.info("[{}] partitions: {}, left: {}", tName, metadataOp.get().partitions,
1911+
leftPartitions);
1912+
if (leftPartitions == 0) {
1913+
brokerService.getPulsar().getSchemaStorage()
1914+
.delete(partitionedName.getSchemaName())
1915+
.whenComplete((schemaVersion, ex) -> {
1916+
if (ex == null) {
1917+
log.info("Deleted schema[{}] after all partitions[{}] were removed"
1918+
+ " because the current cluster has bee removed from"
1919+
+ " topic/namespace policies",
1920+
partitionedName, metadataOp.get().partitions);
1921+
} else {
1922+
log.error("Failed to delete schema[{}] after all partitions[{}] were"
1923+
+ " removed, when the current cluster has bee removed from"
1924+
+ " topic/namespace policies",
1925+
partitionedName, metadataOp.get().partitions, ex);
1926+
}
1927+
1928+
});
1929+
// TODO regarding the topic level policies, it will be deleted at a seperate PR.
1930+
// Because there is an issue related to Global policies has not been solved so
1931+
// far.
1932+
}
1933+
}
1934+
});
1935+
}
1936+
return null;
1937+
});
1938+
}
1939+
18881940
private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
18891941
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
18901942
return brokerService.pulsar().getPulsarResources().getNamespaceResources()

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.assertEquals;
2122
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
2324
import java.time.Duration;
2425
import java.util.Arrays;
2526
import java.util.HashSet;
27+
import java.util.List;
2628
import java.util.Optional;
2729
import java.util.concurrent.CompletableFuture;
2830
import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@
3335
import org.apache.pulsar.common.naming.TopicName;
3436
import org.apache.pulsar.common.policies.data.TopicType;
3537
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
38+
import org.apache.pulsar.common.protocol.schema.StoredSchema;
3639
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
3740
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
3841
import org.awaitility.Awaitility;
@@ -172,18 +175,39 @@ public void testRemoveCluster() throws Exception {
172175
// Initialize.
173176
final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8";
174177
final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
178+
final String topicP0 = TopicName.get(topic).getPartition(0).toString();
179+
final String topicP1 = TopicName.get(topic).getPartition(1).toString();
175180
final String topicChangeEvents = "persistent://" + ns1 + "/__change_events-partition-0";
176181
admin1.namespaces().createNamespace(ns1);
177182
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
178-
admin1.topics().createNonPartitionedTopic(topic);
183+
admin1.topics().createPartitionedTopic(topic, 2);
184+
Awaitility.await().untilAsserted(() -> {
185+
assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
186+
.partitionedTopicExists(TopicName.get(topic)));
187+
List<CompletableFuture<StoredSchema>> schemaList11
188+
= pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
189+
assertEquals(schemaList11.size(), 0);
190+
List<CompletableFuture<StoredSchema>> schemaList21
191+
= pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
192+
assertEquals(schemaList21.size(), 0);
193+
});
179194

180-
// Wait for loading topic up.
195+
// Wait for copying messages.
181196
Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
197+
p.send("msg-1");
198+
p.close();
182199
Awaitility.await().untilAsserted(() -> {
183200
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps =
184201
pulsar1.getBrokerService().getTopics();
185-
assertTrue(tps.containsKey(topic));
202+
assertTrue(tps.containsKey(topicP0));
203+
assertTrue(tps.containsKey(topicP1));
186204
assertTrue(tps.containsKey(topicChangeEvents));
205+
List<CompletableFuture<StoredSchema>> schemaList12
206+
= pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
207+
assertEquals(schemaList12.size(), 1);
208+
List<CompletableFuture<StoredSchema>> schemaList22
209+
= pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
210+
assertEquals(schemaList12.size(), 1);
187211
});
188212

189213
// The topics under the namespace of the cluster-1 will be deleted.
@@ -192,18 +216,23 @@ public void testRemoveCluster() throws Exception {
192216
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(() -> {
193217
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps =
194218
pulsar1.getBrokerService().getTopics();
195-
assertFalse(tps.containsKey(topic));
219+
assertFalse(tps.containsKey(topicP0));
220+
assertFalse(tps.containsKey(topicP1));
196221
assertFalse(tps.containsKey(topicChangeEvents));
197-
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
198-
.get(5, TimeUnit.SECONDS).isExists());
199222
assertFalse(pulsar1.getNamespaceService()
200223
.checkTopicExists(TopicName.get(topicChangeEvents))
201224
.get(5, TimeUnit.SECONDS).isExists());
225+
// Verify: schema will be removed in local cluster, and remote cluster will not.
226+
List<CompletableFuture<StoredSchema>> schemaList13
227+
= pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
228+
assertEquals(schemaList13.size(), 0);
229+
List<CompletableFuture<StoredSchema>> schemaList23
230+
= pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
231+
assertEquals(schemaList23.size(), 1);
202232
});
203233

204234
// cleanup.
205-
p.close();
206-
admin2.topics().delete(topic);
235+
admin2.topics().deletePartitionedTopic(topic);
207236
admin2.namespaces().deleteNamespace(ns1);
208237
}
209238
}

0 commit comments

Comments
 (0)