Skip to content

Commit e0e4d88

Browse files
poorbarcodelhotari
authored andcommitted
[improve][broker] Deny removing local cluster from topic level replicated cluster policy (apache#24351)
(cherry picked from commit 82af299)
1 parent 67e6238 commit e0e4d88

File tree

9 files changed

+243
-6
lines changed

9 files changed

+243
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,12 @@ protected static boolean isNotFoundException(Throwable ex) {
938938
== Status.NOT_FOUND.getStatusCode();
939939
}
940940

941+
protected static boolean is4xxRestException(Throwable ex) {
942+
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
943+
return realCause instanceof WebApplicationException
944+
&& (((WebApplicationException) realCause).getResponse().getStatus() % 100 == 4);
945+
}
946+
941947
protected static boolean isConflictException(Throwable ex) {
942948
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
943949
return realCause instanceof WebApplicationException
@@ -960,6 +966,10 @@ protected static boolean isNot307And404And400Exception(Throwable ex) {
960966
return !isRedirectException(ex) && !isNotFoundException(ex) && !isBadRequest(ex);
961967
}
962968

969+
protected static boolean isNot307And4xxException(Throwable ex) {
970+
return !isRedirectException(ex) && !is4xxRestException(ex);
971+
}
972+
963973
protected static String getTopicNotFoundErrorMessage(String topic) {
964974
return String.format("Topic %s not found", topic);
965975
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5201,7 +5201,7 @@ protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal)
52015201

52025202
protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
52035203
Throwable cause = thr.getCause();
5204-
if (isNot307And404And400Exception(cause)) {
5204+
if (isNot307And4xxException(cause)) {
52055205
log.error("[{}] Failed to perform {} on topic {}",
52065206
clientAppId(), methodName, topicName, cause);
52075207
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Optional;
3131
import java.util.Set;
32+
import java.util.concurrent.CompletableFuture;
3233
import javax.ws.rs.DELETE;
3334
import javax.ws.rs.DefaultValue;
3435
import javax.ws.rs.Encoded;
@@ -46,6 +47,7 @@
4647
import javax.ws.rs.core.Response;
4748
import org.apache.bookkeeper.mledger.Position;
4849
import org.apache.bookkeeper.mledger.impl.PositionImpl;
50+
import org.apache.commons.collections4.CollectionUtils;
4951
import org.apache.pulsar.broker.admin.AdminResource;
5052
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
5153
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -2321,6 +2323,26 @@ public void setReplicationClusters(
23212323
@ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
23222324
validateTopicName(tenant, namespace, encodedTopic);
23232325
preValidation(authoritative)
2326+
.thenCompose(__ -> {
2327+
// Set a topic-level replicated clusters that do not contain local cluster is not meaningful, except
2328+
// the following scenario: User has two clusters, which enabled Geo-Replication through a global
2329+
// metadata store, the resources named partitioned topic metadata and the resource namespace-level
2330+
// "replicated clusters" are shared between multi clusters. Pulsar can hardly delete a specify
2331+
// partitioned topic. To support this use case, the following steps can implement it:
2332+
// 1. set a global topic-level replicated clusters that do not contain local cluster.
2333+
// 2. the local cluster will remove the subtopics automatically, and remove the schemas and local
2334+
// topic policies. Just leave the global topic policies there, which prevents the namespace level
2335+
// replicated clusters policy taking affect.
2336+
// TODO But the API "pulsar-admin topics set-replication-clusters" does not support global policy,
2337+
// to support this scenario, a PIP is needed.
2338+
boolean clustersDoesNotContainsLocal = CollectionUtils.isEmpty(clusterIds)
2339+
|| !clusterIds.contains(pulsar().getConfig().getClusterName());
2340+
if (clustersDoesNotContainsLocal) {
2341+
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED,
2342+
"Can not remove local cluster from the topic-level replication clusters policy"));
2343+
}
2344+
return CompletableFuture.completedFuture(null);
2345+
})
23242346
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
23252347
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
23262348
.exceptionally(ex -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1914,9 +1914,15 @@ CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
19141914
}
19151915

19161916
});
1917-
// TODO regarding the topic level policies, it will be deleted at a seperate PR.
1918-
// Because there is an issue related to Global policies has not been solved so
1919-
// far.
1917+
// There are only one cases that will remove local clusters: using global metadata
1918+
// store, namespaces will share policies cross multi clusters, including
1919+
// "replicated clusters" and "partitioned topic metadata", we can hardly delete
1920+
// partitioned topic from one cluster and keep it exists in another. Removing
1921+
// local cluster from the namespace level "replicated clusters" can do this.
1922+
// TODO: there is no way to delete a specify partitioned topic if users have enabled
1923+
// Geo-Replication with a global metadata store, a PIP is needed.
1924+
// Since the system topic "__change_events" under the namespace will also be
1925+
// deleted, we can skip to delete topic-level policies.
19201926
}
19211927
}
19221928
});

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.lang.reflect.Field;
3131
import java.lang.reflect.Method;
3232
import java.util.ArrayList;
33+
import java.util.Arrays;
3334
import java.util.Collections;
3435
import java.util.HashMap;
3536
import java.util.HashSet;
@@ -3831,4 +3832,35 @@ public void testSetSubRateWithSub() throws Exception {
38313832
.build());
38323833
}
38333834

3835+
@DataProvider
3836+
public Object[][] topicTypes() {
3837+
return new Object[][]{
3838+
{TopicType.PARTITIONED},
3839+
{TopicType.NON_PARTITIONED}
3840+
};
3841+
}
3842+
3843+
@Test(dataProvider = "topicTypes")
3844+
public void testRemoveLocalCluster(TopicType topicType) throws Exception {
3845+
String topic = "persistent://" + myNamespace + "/testSetSubRateWithSub";
3846+
if (TopicType.PARTITIONED.equals(topicType)) {
3847+
admin.topics().createNonPartitionedTopic(topic);
3848+
} else {
3849+
admin.topics().createPartitionedTopic(topic, 2);
3850+
}
3851+
try {
3852+
admin.topics().setReplicationClusters(topic, Arrays.asList("not-local-cluster"));
3853+
fail("Can not remove local cluster from the topic-level replication clusters policy");
3854+
} catch (PulsarAdminException.PreconditionFailedException e) {
3855+
assertTrue(e.getMessage().contains("Can not remove local cluster from the topic-level replication clusters"
3856+
+ " policy"));
3857+
}
3858+
// cleanup.
3859+
if (TopicType.PARTITIONED.equals(topicType)) {
3860+
admin.topics().delete(topic, false);
3861+
} else {
3862+
admin.topics().deletePartitionedTopic(topic, false);
3863+
}
3864+
}
3865+
38343866
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
9191
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
9292
import org.apache.pulsar.common.policies.data.ClusterData;
93+
import org.apache.pulsar.common.policies.data.PublishRate;
9394
import org.apache.pulsar.common.policies.data.RetentionPolicies;
9495
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
9596
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1399,4 +1400,88 @@ public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) thro
13991400
static PositionImpl transferToPosImpl(Position position) {
14001401
return new PositionImpl(position.getLedgerId(), position.getEntryId());
14011402
}
1403+
1404+
/***
1405+
* Manually modifying topic policies by Rest API.
1406+
* - Global topic level policies:
1407+
* - Add: replicate
1408+
* - Update: replicate
1409+
* - Delete a single policy(it is equivalent to specify updating): delete both local and remote policies.
1410+
* - Local topic level policies:
1411+
* - Add: never replicate
1412+
* - Update: never replicate
1413+
* - Delete a single policy(it is equivalent to specify updating): delete local policies only.
1414+
* Delete Topic triggers that both local and global policies will be deleted in local cluster, but will not delete
1415+
* the remote cluster's global policies. This test case will be covered by
1416+
* "OneWayReplicatorUsingGlobalPartitionedTest.testRemoveCluster".
1417+
*/
1418+
@Test
1419+
public void testTopicPoliciesReplicationRule() throws Exception {
1420+
// Init Pulsar resources.
1421+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
1422+
final TopicName topicNameObj = TopicName.get(topicName);
1423+
final String subscriptionName = "s1";
1424+
admin1.topics().createNonPartitionedTopic(topicName);
1425+
Producer<byte[]> producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
1426+
waitReplicatorStarted(topicName);
1427+
producer1.close();
1428+
assertTrue(pulsar2.getPulsarResources().getTopicResources().persistentTopicExists(topicNameObj).join());
1429+
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
1430+
admin1.topics().createSubscription(subscriptionName, topicName, MessageId.earliest);
1431+
admin2.topics().createSubscription(subscriptionName, topicName, MessageId.earliest);
1432+
1433+
// Case 1: Global topic level policies -> Add: replicate.
1434+
PublishRate publishRateAddGlobal = new PublishRate(100, 10000);
1435+
admin1.topicPolicies(true).setPublishRate(topicName, publishRateAddGlobal);
1436+
// Case 4: Local topic level policies -> Add: never replicate.
1437+
PublishRate publishRateAddLocal = new PublishRate(200, 20000);
1438+
admin1.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal);
1439+
waitChangeEventsReplicated(replicatedNamespace);
1440+
Thread.sleep(2000);
1441+
Awaitility.await().untilAsserted(() -> {
1442+
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
1443+
assertEquals(valueGlobal, publishRateAddGlobal);
1444+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1445+
assertNull(valueLocal);
1446+
});
1447+
1448+
// Case 2: Global topic level policies -> Update: replicate.
1449+
PublishRate publishRateUpdateGlobal = new PublishRate(300, 30000);
1450+
admin1.topicPolicies(true).setPublishRate(topicName, publishRateUpdateGlobal);
1451+
// Case 5: Local topic level policies -> Update: never replicate.
1452+
PublishRate publishRateUpdateLocal = new PublishRate(400, 40000);
1453+
admin1.topicPolicies(false).setPublishRate(topicName, publishRateUpdateLocal);
1454+
waitChangeEventsReplicated(replicatedNamespace);
1455+
Thread.sleep(2000);
1456+
Awaitility.await().untilAsserted(() -> {
1457+
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
1458+
assertEquals(valueGlobal, publishRateUpdateGlobal);
1459+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1460+
assertNull(valueLocal);
1461+
});
1462+
1463+
// Case 3: Global topic level policies -> Delete: delete both local and remote policies.
1464+
admin1.topicPolicies(true).removePublishRate(topicName);
1465+
waitChangeEventsReplicated(replicatedNamespace);
1466+
Thread.sleep(2000);
1467+
Awaitility.await().untilAsserted(() -> {
1468+
PublishRate valueGlobal = admin2.topicPolicies(true).getPublishRate(topicName);
1469+
assertNull(valueGlobal);
1470+
});
1471+
1472+
// Case 6: Local topic level policies -> Delete: never replicate.
1473+
PublishRate publishRateAddLocal2 = new PublishRate(500, 50000);
1474+
admin2.topicPolicies(false).setPublishRate(topicName, publishRateAddLocal2);
1475+
Awaitility.await().untilAsserted(() -> {
1476+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1477+
assertEquals(valueLocal, publishRateAddLocal2);
1478+
});
1479+
admin1.topicPolicies(false).removePublishRate(topicName);
1480+
waitChangeEventsReplicated(replicatedNamespace);
1481+
Thread.sleep(2000);
1482+
Awaitility.await().untilAsserted(() -> {
1483+
PublishRate valueLocal = admin2.topicPolicies(false).getPublishRate(topicName);
1484+
assertEquals(valueLocal, publishRateAddLocal2);
1485+
});
1486+
}
14021487
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH;
2323
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
2424
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
25+
import static org.testng.Assert.assertEquals;
2526
import static org.testng.Assert.assertFalse;
2627
import static org.testng.Assert.assertTrue;
27-
import static org.testng.Assert.assertEquals;
2828
import com.google.common.collect.Sets;
2929
import java.net.URL;
3030
import java.time.Duration;
@@ -36,9 +36,12 @@
3636
import java.util.Set;
3737
import java.util.concurrent.CompletableFuture;
3838
import java.util.concurrent.TimeUnit;
39+
import java.util.function.Function;
3940
import lombok.extern.slf4j.Slf4j;
41+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
4042
import org.apache.pulsar.broker.PulsarService;
4143
import org.apache.pulsar.broker.ServiceConfiguration;
44+
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
4245
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
4346
import org.apache.pulsar.client.admin.PulsarAdmin;
4447
import org.apache.pulsar.client.api.ClientBuilder;
@@ -48,6 +51,7 @@
4851
import org.apache.pulsar.client.api.Schema;
4952
import org.apache.pulsar.common.naming.SystemTopicNames;
5053
import org.apache.pulsar.common.naming.TopicName;
54+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
5155
import org.apache.pulsar.common.policies.data.ClusterData;
5256
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5357
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -506,4 +510,42 @@ protected void waitReplicatorStopped(PulsarService sourceCluster, PulsarService
506510
|| !persistentTopic1.getReplicators().get(targetCluster.getConfig().getClusterName()).isConnected());
507511
});
508512
}
513+
514+
protected void waitChangeEventsReplicated(String ns) {
515+
String topicName = "persistent://" + ns + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
516+
TopicName topicNameObj = TopicName.get(topicName);
517+
Optional<PartitionedTopicMetadata> metadata = pulsar1.getPulsarResources().getNamespaceResources()
518+
.getPartitionedTopicResources()
519+
.getPartitionedTopicMetadataAsync(topicNameObj).join();
520+
Function<Replicator, Boolean> ensureNoBacklog = new Function<Replicator,Boolean>() {
521+
522+
@Override
523+
public Boolean apply(Replicator replicator) {
524+
if (!replicator.getRemoteCluster().equals("c2")) {
525+
return true;
526+
}
527+
PersistentReplicator persistentReplicator = (PersistentReplicator) replicator;
528+
PositionImpl lac =
529+
(PositionImpl) persistentReplicator.getCursor().getManagedLedger().getLastConfirmedEntry();
530+
PositionImpl mdPos = (PositionImpl) persistentReplicator.getCursor().getMarkDeletedPosition();
531+
return mdPos.compareTo(lac) >= 0;
532+
}
533+
};
534+
if (metadata.isPresent()) {
535+
for (int index = 0; index < metadata.get().partitions; index++) {
536+
String partitionName = topicNameObj.getPartition(index).toString();
537+
PersistentTopic persistentTopic =
538+
(PersistentTopic) pulsar1.getBrokerService().getTopic(partitionName, false).join().get();
539+
persistentTopic.getReplicators().values().forEach(replicator -> {
540+
assertTrue(ensureNoBacklog.apply(replicator));
541+
});
542+
}
543+
} else {
544+
PersistentTopic persistentTopic =
545+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
546+
persistentTopic.getReplicators().values().forEach(replicator -> {
547+
assertTrue(ensureNoBacklog.apply(replicator));
548+
});
549+
}
550+
}
509551
}

0 commit comments

Comments
 (0)