Skip to content

Commit 8db6a3b

Browse files
authored
[fix][broker] Fix issue that topic policies was deleted after a sub topic deleted, even if the partitioned topic still exists (apache#24350)
1 parent 76c6f6a commit 8db6a3b

File tree

5 files changed

+82
-9
lines changed

5 files changed

+82
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,10 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
787787
})
788788
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
789789
).thenCompose(ignore ->
790-
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null))
791-
.thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
790+
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null)
791+
).thenCompose(ignore ->
792+
pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName).exceptionally(ex -> null)
793+
).thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
792794
.runWithMarkDeleteAsync(topicName, () -> namespaceResources()
793795
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
794796
.thenAccept(__ -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,9 @@ public InactiveTopicPolicies getInactiveTopicPolicies() {
12861286
}
12871287

12881288
public CompletableFuture<Void> deleteTopicPolicies() {
1289+
if (TopicName.get(getName()).isPartitioned()) {
1290+
return CompletableFuture.completedFuture(null);
1291+
}
12891292
return brokerService.pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(TopicName.get(topic));
12901293
}
12911294

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1282,7 +1282,13 @@ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceD
12821282
deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
12831283
deleteTopicAuthenticationFuture
12841284
.thenCompose(__ -> deleteSchema(tn))
1285-
.thenCompose(__ -> pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn)).whenComplete((v, ex) -> {
1285+
.thenCompose(__ -> {
1286+
if (tn.isPartitioned()) {
1287+
return CompletableFuture.completedFuture(null);
1288+
}
1289+
return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(tn);
1290+
})
1291+
.whenComplete((v, ex) -> {
12861292
if (ex != null) {
12871293
future.completeExceptionally(ex);
12881294
return;

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
9797
import org.apache.pulsar.common.policies.data.TopicPolicies;
9898
import org.apache.pulsar.common.policies.data.TopicStats;
99+
import org.apache.pulsar.common.policies.data.TopicType;
99100
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
100101
import org.assertj.core.api.Assertions;
101102
import org.awaitility.Awaitility;
@@ -3525,6 +3526,59 @@ public void testDeleteGlobalPolicy() throws Exception {
35253526
admin.topics().delete(tpName, false);
35263527
}
35273528

3529+
@DataProvider
3530+
public Object[][] partitionedTypes() {
3531+
return new Object[][]{
3532+
{TopicType.NON_PARTITIONED},
3533+
{TopicType.PARTITIONED}
3534+
};
3535+
}
3536+
3537+
@Test(dataProvider = "partitionedTypes")
3538+
public void testCleanupPoliciesAfterDeletedTopic(TopicType topicType) throws Exception {
3539+
final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
3540+
final TopicName tpNameP0 = TopicName.get(tpName).getPartition(0);
3541+
final String subscriptionName = "s1";
3542+
final int rateMsgGlobal = 1000;
3543+
final int rateMsgLocal = 1000;
3544+
if (TopicType.PARTITIONED.equals(topicType)) {
3545+
admin.topics().createPartitionedTopic(tpName, 2);
3546+
} else {
3547+
admin.topics().createNonPartitionedTopic(tpName);
3548+
}
3549+
3550+
admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest);
3551+
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
3552+
.getTopicIfExists(TopicType.PARTITIONED.equals(topicType) ? tpNameP0.toString(): tpName).get().get();
3553+
3554+
// Set global policy.
3555+
// Verify: policies was affected.
3556+
DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1);
3557+
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
3558+
DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 2, false, 2);
3559+
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateLocal);
3560+
Awaitility.await().untilAsserted(() -> {
3561+
assertEquals(persistentTopic.getHierarchyTopicPolicies().getDispatchRate().get(), dispatchRateLocal);
3562+
});
3563+
3564+
// cleanup.
3565+
if (TopicType.PARTITIONED.equals(topicType)) {
3566+
admin.topics().deletePartitionedTopic(tpName, false);
3567+
} else {
3568+
admin.topics().delete(tpName, false);
3569+
}
3570+
3571+
// Verify: the topic-level policies will be removed after the topic is deleted.
3572+
Awaitility.await().untilAsserted(() -> {
3573+
Optional<TopicPolicies> topicPoliciesOptional = pulsar.getTopicPoliciesService()
3574+
.getTopicPoliciesAsync(TopicName.get(tpName), LOCAL_ONLY).join();
3575+
Optional<TopicPolicies> topicPoliciesOptionalGlobal = pulsar.getTopicPoliciesService()
3576+
.getTopicPoliciesAsync(TopicName.get(tpName), GLOBAL_ONLY).join();
3577+
assertTrue(topicPoliciesOptional.isEmpty());
3578+
assertTrue(topicPoliciesOptionalGlobal.isEmpty());
3579+
});
3580+
}
3581+
35283582
@Test
35293583
public void testGlobalTopicPolicies() throws Exception {
35303584
final String topic = testTopic + UUID.randomUUID();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.pulsar.client.impl.ConsumerImpl;
4242
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
4343
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
44+
import org.apache.pulsar.common.policies.data.PublishRate;
4445
import org.awaitility.Awaitility;
4546
import org.awaitility.reflect.WhiteboxImpl;
4647
import org.testng.annotations.AfterClass;
@@ -162,6 +163,8 @@ public void testAppendCreateConsumerAfterOnePartGc(SubscribeTopicType subscribeT
162163
final String subscription = "s1";
163164
admin.topics().createPartitionedTopic(topic, 2);
164165
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
166+
final PublishRate publishRate = new PublishRate(100, 1000);
167+
admin.topicPolicies().setPublishRate(topic, publishRate);
165168

166169
// create consumers and producers.
167170
Producer<String> producer0 = pulsarClient.newProducer(Schema.STRING).topic(partition0)
@@ -185,12 +188,17 @@ public void testAppendCreateConsumerAfterOnePartGc(SubscribeTopicType subscribeT
185188
// Wait for topic GC.
186189
// Partition 0 will be deleted about 20s later, left 2min to avoid flaky.
187190
producer0.close();
188-
Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
189-
CompletableFuture<Optional<Topic>> tp1 = pulsar.getBrokerService().getTopic(partition0, false);
190-
CompletableFuture<Optional<Topic>> tp2 = pulsar.getBrokerService().getTopic(partition1, false);
191-
assertTrue(tp1 == null || !tp1.get().isPresent());
192-
assertTrue(tp2 != null && tp2.get().isPresent());
193-
});
191+
if (!subscribeTopicType.equals(SubscribeTopicType.MULTI_PARTITIONED_TOPIC)) {
192+
Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
193+
CompletableFuture<Optional<Topic>> tp1 = pulsar.getBrokerService().getTopic(partition0, false);
194+
CompletableFuture<Optional<Topic>> tp2 = pulsar.getBrokerService().getTopic(partition1, false);
195+
assertTrue(tp1 == null || !tp1.get().isPresent());
196+
assertTrue(tp2 != null && tp2.get().isPresent());
197+
// Verify: topic policies will not be removed after a sub-topic GC.
198+
PublishRate publishRateGot = admin.topicPolicies().getPublishRate(topic);
199+
assertEquals(publishRateGot, publishRate);
200+
});
201+
}
194202

195203
// Verify that the messages under "partition-1" still can be ack.
196204
for (int i = 0; i < 2; i++) {

0 commit comments

Comments
 (0)