Skip to content

[fix] [test] Fix flaky test ReplicatorTest #22594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -41,6 +47,11 @@
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

/**
* The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
* a lot of topic deletion and makes namespace policies being incorrect.
*/
@Slf4j
@Test(groups = "broker-impl")
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {

Expand Down Expand Up @@ -81,7 +92,7 @@ public void cleanup() throws Exception {
*
* @throws Exception
*/
@Test
@Test(priority = Integer.MAX_VALUE)
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");

Expand Down Expand Up @@ -115,32 +126,88 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
});
}

@Test
public void testForcefullyTopicDeletion() throws Exception {
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");

final String namespace = "pulsar/removeClusterTest";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));

final String topicName = "persistent://" + namespace + "/topic";

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
producer1.close();

admin1.topics().delete(topicName, true);

MockedPulsarServiceBaseTest
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);

Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
/**
* This is not a formal operation and can cause serious problems if call it in a production environment.
*/
@Test(priority = Integer.MAX_VALUE - 1)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
// Run a set of producer tasks to create the topics
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));

results.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {

@Cleanup
MessageProducer producer = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);

@Cleanup
MessageConsumer consumer = new MessageConsumer(url1, dest);
log.info("--- Starting Consumer --- " + url1);

producer.produce(2);
consumer.receive(2);
return null;
}
}));
}

for (Future<Void> result : results) {
try {
result.get();
} catch (Exception e) {
log.error("exception in getting future result ", e);
fail(String.format("replication test failed with %s exception", e.getMessage()));
}
}

Thread.sleep(1000L);
// Make sure that the internal replicators map contains remote cluster info
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();

Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 2: Update the configuration back
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand All @@ -68,6 +66,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test(priority = Integer.MAX_VALUE)
public void testConfigChange() throws Exception {
log.info("--- Starting ReplicatorTest::testConfigChange ---");
// This test is to verify that the config change on global namespace is successfully applied in broker during
// runtime.
// Run a set of producer tasks to create the topics
List<Future<Void>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));

results.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {

@Cleanup
MessageProducer producer = new MessageProducer(url1, dest);
log.info("--- Starting producer --- " + url1);

@Cleanup
MessageConsumer consumer = new MessageConsumer(url1, dest);
log.info("--- Starting Consumer --- " + url1);

producer.produce(2);
consumer.receive(2);
return null;
}
}));
}

for (Future<Void> result : results) {
try {
result.get();
} catch (Exception e) {
log.error("exception in getting future result ", e);
fail(String.format("replication test failed with %s exception", e.getMessage()));
}
}

Thread.sleep(1000L);
// Make sure that the internal replicators map contains remote cluster info
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();

Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 2: Update the configuration back
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));

// Wait for config changes to be updated.
Thread.sleep(1000L);

// Make sure that the internal replicators map still contains remote cluster info
Assert.assertNotNull(replicationClients1.get("r2"));
Assert.assertNotNull(replicationClients1.get("r3"));
Assert.assertNotNull(replicationClients2.get("r1"));
Assert.assertNotNull(replicationClients2.get("r3"));
Assert.assertNotNull(replicationClients3.get("r1"));
Assert.assertNotNull(replicationClients3.get("r2"));

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

@Test(timeOut = 10000)
public void activeBrokerParse() throws Exception {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
Expand All @@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}

@Test
public void testForcefullyTopicDeletion() throws Exception {
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");

final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));

final String topicName = "persistent://" + namespace + "/topic";

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
producer1.close();

admin1.topics().delete(topicName, true);

MockedPulsarServiceBaseTest
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);

Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
}

@SuppressWarnings("unchecked")
@Test(timeOut = 30000)
public void testConcurrentReplicator() throws Exception {
Expand Down Expand Up @@ -1270,7 +1213,7 @@ public void testReplicatedCluster() throws Exception {

log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");

final String namespace = "pulsar/global/repl";
final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl");
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
Expand Down Expand Up @@ -1677,7 +1620,7 @@ public void testReplicatorWithFailedAck() throws Exception {

log.info("--- Starting ReplicatorTest::testReplication ---");

String namespace = "pulsar/global/ns2";
String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
Expand Down Expand Up @@ -1749,7 +1692,7 @@ public void testReplicatorWithFailedAck() throws Exception {
@Test
public void testWhenUpdateReplicationCluster() throws Exception {
log.info("--- testWhenUpdateReplicationCluster ---");
String namespace = "pulsar/ns2";
String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
final TopicName dest = TopicName.get(
Expand Down Expand Up @@ -1778,12 +1721,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
@Test
public void testReplicatorProducerNotExceed() throws Exception {
log.info("--- testReplicatorProducerNotExceed ---");
String namespace1 = "pulsar/ns11";
String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
admin1.namespaces().createNamespace(namespace1);
admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
final TopicName dest1 = TopicName.get(
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
String namespace2 = "pulsar/ns22";
String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
admin2.namespaces().createNamespace(namespace2);
admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
final TopicName dest2 = TopicName.get(
Expand Down