Skip to content

Commit 82af299

Browse files
authored
[improve][broker] Deny removing local cluster from topic level replicated cluster policy (apache#24351)
1 parent 64bacfd commit 82af299

File tree

9 files changed

+241
-5
lines changed

9 files changed

+241
-5
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,12 @@ protected static boolean isNotFoundException(Throwable ex) {
962962
== Status.NOT_FOUND.getStatusCode();
963963
}
964964

965+
protected static boolean is4xxRestException(Throwable ex) {
966+
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
967+
return realCause instanceof WebApplicationException
968+
&& (((WebApplicationException) realCause).getResponse().getStatus() % 100 == 4);
969+
}
970+
965971
protected static boolean isConflictException(Throwable ex) {
966972
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
967973
return realCause instanceof WebApplicationException
@@ -984,6 +990,10 @@ protected static boolean isNot307And404And400Exception(Throwable ex) {
984990
return !isRedirectException(ex) && !isNotFoundException(ex) && !isBadRequest(ex);
985991
}
986992

993+
protected static boolean isNot307And4xxException(Throwable ex) {
994+
return !isRedirectException(ex) && !is4xxRestException(ex);
995+
}
996+
987997
protected static String getTopicNotFoundErrorMessage(String topic) {
988998
return String.format("Topic %s not found", topic);
989999
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4990,7 +4990,7 @@ protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal)
49904990

49914991
protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
49924992
Throwable cause = thr.getCause();
4993-
if (isNot307And404And400Exception(cause)) {
4993+
if (isNot307And4xxException(cause)) {
49944994
log.error("[{}] Failed to perform {} on topic {}",
49954995
clientAppId(), methodName, topicName, cause);
49964996
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Optional;
3131
import java.util.Set;
32+
import java.util.concurrent.CompletableFuture;
3233
import javax.ws.rs.DELETE;
3334
import javax.ws.rs.DefaultValue;
3435
import javax.ws.rs.Encoded;
@@ -46,6 +47,7 @@
4647
import javax.ws.rs.core.Response;
4748
import org.apache.bookkeeper.mledger.Position;
4849
import org.apache.bookkeeper.mledger.PositionFactory;
50+
import org.apache.commons.collections4.CollectionUtils;
4951
import org.apache.pulsar.broker.admin.AdminResource;
5052
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
5153
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -2344,7 +2346,26 @@ public void setReplicationClusters(
23442346
@ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
23452347
validateTopicName(tenant, namespace, encodedTopic);
23462348
validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE)
2347-
.thenCompose(__ -> preValidation(authoritative))
2349+
.thenCompose(__ -> preValidation(authoritative)).thenCompose(__ -> {
2350+
// Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except
2351+
// the following scenario: User has two clusters, which enabled Geo-Replication through a global
2352+
// metadata store, the resources named partitioned topic metadata and the resource namespace-level
2353+
// "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify
2354+
// partitioned topic. To support this use case, the following steps can implement it:
2355+
// 1. set a global topic-level replicated clusters that do not contain local cluster.
2356+
// 2. the local cluster will remove the subtopics automatically, and remove the schemas and local
2357+
// topic policies. Just leave the global topic policies there, which prevents the namespace level
2358+
// replicated clusters policy taking affect.
2359+
// TODO But the API "pulsar-admin topics set-replication-clusters" does not support global policy,
2360+
// to support this scenario, a PIP is needed.
2361+
boolean clustersDoesNotContainsLocal = CollectionUtils.isEmpty(clusterIds)
2362+
|| !clusterIds.contains(pulsar().getConfig().getClusterName());
2363+
if (clustersDoesNotContainsLocal) {
2364+
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
2365+
"Can not remove local cluster from the topic-level replication clusters policy"));
2366+
}
2367+
return CompletableFuture.completedFuture(null);
2368+
})
23482369
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
23492370
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
23502371
.exceptionally(ex -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,9 +2016,15 @@ CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
20162016
}
20172017

20182018
});
2019-
// TODO regarding the topic level policies, it will be deleted at a seperate PR.
2020-
// Because there is an issue related to Global policies has not been solved so
2021-
// far.
2019+
// There are only one cases that will remove local clusters: using global metadata
2020+
// store, namespaces will share policies cross multi clusters, including
2021+
// "replicated clusters" and "partitioned topic metadata", we can hardly delete
2022+
// partitioned topic from one cluster and keep it exists in another. Removing
2023+
// local cluster from the namespace level "replicated clusters" can do this.
2024+
// TODO: there is no way to delete a specify partitioned topic if users have enabled
2025+
// Geo-Replication with a global metadata store, a PIP is needed.
2026+
// Since the system topic "__change_events" under the namespace will also be
2027+
// deleted, we can skip to delete topic-level policies.
20222028
}
20232029
}
20242030
});

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.lang.reflect.Field;
3333
import java.lang.reflect.Method;
3434
import java.util.ArrayList;
35+
import java.util.Arrays;
3536
import java.util.Collections;
3637
import java.util.HashMap;
3738
import java.util.HashSet;
@@ -3851,4 +3852,35 @@ public void testSetSubRateWithSub() throws Exception {
38513852
.ratePeriodInSecond(10)
38523853
.build());
38533854
}
3855+
3856+
@DataProvider
3857+
public Object[][] topicTypes() {
3858+
return new Object[][]{
3859+
{TopicType.PARTITIONED},
3860+
{TopicType.NON_PARTITIONED}
3861+
};
3862+
}
3863+
3864+
@Test(dataProvider = "topicTypes")
3865+
public void testRemoveLocalCluster(TopicType topicType) throws Exception {
3866+
String topic = "persistent://" + myNamespace + "/testSetSubRateWithSub";
3867+
if (TopicType.PARTITIONED.equals(topicType)) {
3868+
admin.topics().createNonPartitionedTopic(topic);
3869+
} else {
3870+
admin.topics().createPartitionedTopic(topic, 2);
3871+
}
3872+
try {
3873+
admin.topics().setReplicationClusters(topic, Arrays.asList("not-local-cluster"));
3874+
fail("Can not remove local cluster from the topic-level replication clusters policy");
3875+
} catch (PulsarAdminException.PreconditionFailedException e) {
3876+
assertTrue(e.getMessage().contains("Can not remove local cluster from the topic-level replication clusters"
3877+
+ " policy"));
3878+
}
3879+
// cleanup.
3880+
if (TopicType.PARTITIONED.equals(topicType)) {
3881+
admin.topics().delete(topic, false);
3882+
} else {
3883+
admin.topics().deletePartitionedTopic(topic, false);
3884+
}
3885+
}
38543886
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
9292
import org.apache.pulsar.common.policies.data.ClusterData;
9393
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
94+
import org.apache.pulsar.common.policies.data.PublishRate;
9495
import org.apache.pulsar.common.policies.data.RetentionPolicies;
9596
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
9697
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1422,4 +1423,88 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro
14221423
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
14231424
SchemaCompatibilityStrategy.FORWARD);
14241425
}
1426+
1427+
/***
1428+
* Manually modifying topic policies by Rest API.
1429+
* - Global topic level policies:
1430+
* - Add: replicate
1431+
* - Update: replicate
1432+
* - Delete a single policy(it is equivalent to specify updating): delete both local and remote policies.
1433+
* - Local topic level policies:
1434+
* - Add: never replicate
1435+
* - Update: never replicate
1436+
* - Delete a single policy(it is equivalent to specify updating): delete local policies only.
1437+
* Delete Topic triggers that both local and global policies will be deleted in local cluster, but will not delete
1438+
* the remote cluster's global policies. This test case will be covered by
1439+
* "OneWayReplicatorUsingGlobalPartitionedTest.testRemoveCluster".
1440+
*/
1441+
@Test
1442+
public void testTopicPoliciesReplicationRule() throws Exception {
1443+
// Init Pulsar resources.
1444+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
1445+
final TopicName topicNameObj = TopicName.get(topicName);
1446+
final String subscriptionName = "s1";
1447+
admin1.topics().createNonPartitionedTopic(topicName);
1448+
Producer<byte[]> producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
1449+
waitReplicatorStarted(topicName);
1450+
producer1.close();
1451+
assertTrue(pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(topicNameObj).join());
1452+
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
1453+
admin1.topics().createSubscription(subscriptionName, topicName, MessageId.earliest);
1454+
admin2.topics().createSubscription(subscriptionName, topicName, MessageId.earliest);
1455+
1456+
// Case 1: Global topic level policies -> Add: replicate.
1457+
PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
1458+
admin1.topicPolicies(true).setPublishRate(topicName, publishRateAddGlobal);
1459+
// Case 4: Local topic level policies -> Add: never replicate.
1460+
PublishRate publishRateAddLocal = new PublishRate(200, 20000);
1461+
admin1.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal);
1462+
waitChangeEventsReplicated(replicatedNamespace);
1463+
Thread.sleep(2000);
1464+
Awaitility.await().untilAsserted(() -> {
1465+
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
1466+
assertEquals(valueGlobal, publishRateAddGlobal);
1467+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1468+
assertNull(valueLocal);
1469+
});
1470+
1471+
// Case 2: Global topic level policies -> Update: replicate.
1472+
PublishRate publishRateUpdateGlobal = new PublishRate(300, 30000);
1473+
admin1.topicPolicies(true).setPublishRate(topicName, publishRateUpdateGlobal);
1474+
// Case 5: Local topic level policies -> Update: never replicate.
1475+
PublishRate publishRateUpdateLocal = new PublishRate(400, 40000);
1476+
admin1.topicPolicies(false).setPublishRate(topicName, publishRateUpdateLocal);
1477+
waitChangeEventsReplicated(replicatedNamespace);
1478+
Thread.sleep(2000);
1479+
Awaitility.await().untilAsserted(() -> {
1480+
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
1481+
assertEquals(valueGlobal, publishRateUpdateGlobal);
1482+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1483+
assertNull(valueLocal);
1484+
});
1485+
1486+
// Case 3: Global topic level policies -> Delete: delete both local and remote policies.
1487+
admin1.topicPolicies(true).removePublishRate(topicName);
1488+
waitChangeEventsReplicated(replicatedNamespace);
1489+
Thread.sleep(2000);
1490+
Awaitility.await().untilAsserted(() -> {
1491+
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
1492+
assertNull(valueGlobal);
1493+
});
1494+
1495+
// Case 6: Local topic level policies -> Delete: never replicate.
1496+
PublishRate publishRateAddLocal2 = new PublishRate(500, 50000);
1497+
admin2.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal2);
1498+
Awaitility.await().untilAsserted(() -> {
1499+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1500+
assertEquals(valueLocal, publishRateAddLocal2);
1501+
});
1502+
admin1.topicPolicies(false).removePublishRate(topicName);
1503+
waitChangeEventsReplicated(replicatedNamespace);
1504+
Thread.sleep(2000);
1505+
Awaitility.await().untilAsserted(() -> {
1506+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1507+
assertEquals(valueLocal, publishRateAddLocal2);
1508+
});
1509+
}
14251510
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@
3636
import java.util.Set;
3737
import java.util.concurrent.CompletableFuture;
3838
import java.util.concurrent.TimeUnit;
39+
import java.util.function.Function;
3940
import lombok.extern.slf4j.Slf4j;
41+
import org.apache.bookkeeper.mledger.Position;
4042
import org.apache.pulsar.broker.PulsarService;
4143
import org.apache.pulsar.broker.ServiceConfiguration;
44+
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
4245
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
4346
import org.apache.pulsar.client.admin.PulsarAdmin;
4447
import org.apache.pulsar.client.api.ClientBuilder;
@@ -48,6 +51,7 @@
4851
import org.apache.pulsar.client.api.Schema;
4952
import org.apache.pulsar.common.naming.SystemTopicNames;
5053
import org.apache.pulsar.common.naming.TopicName;
54+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
5155
import org.apache.pulsar.common.policies.data.ClusterData;
5256
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5357
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -506,4 +510,41 @@ protected void waitReplicatorStopped(PulsarService sourceCluster, PulsarService
506510
|| !persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
507511
});
508512
}
513+
514+
protected void waitChangeEventsReplicated(String ns) {
515+
String topicName = "persistent://" + ns + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
516+
TopicName topicNameObj = TopicName.get(topicName);
517+
Optional<PartitionedTopicMetadata> metadata = pulsar1.getPulsarResources().getNamespaceResources()
518+
.getPartitionedTopicResources()
519+
.getPartitionedTopicMetadataAsync(topicNameObj).join();
520+
Function<Replicator, Boolean> ensureNoBacklog = new Function<Replicator,Boolean>() {
521+
522+
@Override
523+
public Boolean apply(Replicator replicator) {
524+
if (!replicator.getRemoteCluster().equals("c2")) {
525+
return true;
526+
}
527+
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
528+
Position lac = persistentReplicator.getCursor().getManagedLedger().getLastConfirmedEntry();
529+
Position mdPos = persistentReplicator.getCursor().getMarkDeletedPosition();
530+
return mdPos.compareTo(lac) >= 0;
531+
}
532+
};
533+
if (metadata.isPresent()) {
534+
for (int index = 0; index < metadata.get().partitions; index++) {
535+
String partitionName = topicNameObj.getPartition(index).toString();
536+
PersistentTopic persistentTopic =
537+
(PersistentTopic) pulsar1.getBrokerService().getTopic(partitionName, false).join().get();
538+
persistentTopic.getReplicators().values().forEach(replicator -> {
539+
assertTrue(ensureNoBacklog.apply(replicator));
540+
});
541+
}
542+
} else {
543+
PersistentTopic persistentTopic =
544+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
545+
persistentTopic.getReplicators().values().forEach(replicator -> {
546+
assertTrue(ensureNoBacklog.apply(replicator));
547+
});
548+
}
549+
}
509550
}

0 commit comments

Comments
 (0)