Skip to content

Commit 6fdc0e3

Browse files
authored
[fix] [test] Fix flaky test ReplicatorTest (apache#22594)
1 parent 93afd89 commit 6fdc0e3

File tree

2 files changed

+130
-120
lines changed

2 files changed

+130
-120
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java

Lines changed: 98 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,24 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.fail;
2122
import com.google.common.collect.Sets;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.Future;
2227
import lombok.Cleanup;
23-
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.pulsar.broker.BrokerTestUtil;
2430
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
2531
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
2632
import org.apache.pulsar.client.api.MessageRoutingMode;
2733
import org.apache.pulsar.client.api.PulsarClient;
2834
import org.apache.pulsar.client.impl.ConsumerImpl;
2935
import org.apache.pulsar.client.impl.ProducerImpl;
36+
import org.apache.pulsar.common.naming.TopicName;
37+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
3038
import org.awaitility.Awaitility;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
3339
import org.testng.Assert;
3440
import org.testng.annotations.AfterClass;
3541
import org.testng.annotations.BeforeClass;
@@ -41,6 +47,11 @@
4147
import java.lang.reflect.Method;
4248
import java.util.concurrent.TimeUnit;
4349

50+
/**
51+
* The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
52+
* a lot of topic deletion and makes namespace policies being incorrect.
53+
*/
54+
@Slf4j
4455
@Test(groups = "broker-impl")
4556
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
4657

@@ -81,7 +92,7 @@ public void cleanup() throws Exception {
8192
*
8293
* @throws Exception
8394
*/
84-
@Test
95+
@Test(priority = Integer.MAX_VALUE)
8596
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
8697
log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
8798

@@ -115,32 +126,88 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
115126
});
116127
}
117128

118-
@Test
119-
public void testForcefullyTopicDeletion() throws Exception {
120-
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
121-
122-
final String namespace = "pulsar/removeClusterTest";
123-
admin1.namespaces().createNamespace(namespace);
124-
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
125-
126-
final String topicName = "persistent://" + namespace + "/topic";
127-
128-
@Cleanup
129-
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
130-
.build();
131-
132-
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
133-
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
134-
producer1.close();
135-
136-
admin1.topics().delete(topicName, true);
137-
138-
MockedPulsarServiceBaseTest
139-
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
140-
141-
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
129+
/**
130+
* This is not a formal operation and can cause serious problems if call it in a production environment.
131+
*/
132+
@Test(priority = Integer.MAX_VALUE - 1)
133+
public void testConfigChange() throws Exception {
134+
log.info("--- Starting ReplicatorTest::testConfigChange ---");
135+
// This test is to verify that the config change on global namespace is successfully applied in broker during
136+
// runtime.
137+
// Run a set of producer tasks to create the topics
138+
List<Future<Void>> results = new ArrayList<>();
139+
for (int i = 0; i < 10; i++) {
140+
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
141+
142+
results.add(executor.submit(new Callable<Void>() {
143+
@Override
144+
public Void call() throws Exception {
145+
146+
@Cleanup
147+
MessageProducer producer = new MessageProducer(url1, dest);
148+
log.info("--- Starting producer --- " + url1);
149+
150+
@Cleanup
151+
MessageConsumer consumer = new MessageConsumer(url1, dest);
152+
log.info("--- Starting Consumer --- " + url1);
153+
154+
producer.produce(2);
155+
consumer.receive(2);
156+
return null;
157+
}
158+
}));
159+
}
160+
161+
for (Future<Void> result : results) {
162+
try {
163+
result.get();
164+
} catch (Exception e) {
165+
log.error("exception in getting future result ", e);
166+
fail(String.format("replication test failed with %s exception", e.getMessage()));
167+
}
168+
}
169+
170+
Thread.sleep(1000L);
171+
// Make sure that the internal replicators map contains remote cluster info
172+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
173+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
174+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
175+
176+
Assert.assertNotNull(replicationClients1.get("r2"));
177+
Assert.assertNotNull(replicationClients1.get("r3"));
178+
Assert.assertNotNull(replicationClients2.get("r1"));
179+
Assert.assertNotNull(replicationClients2.get("r3"));
180+
Assert.assertNotNull(replicationClients3.get("r1"));
181+
Assert.assertNotNull(replicationClients3.get("r2"));
182+
183+
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
184+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
185+
186+
// Wait for config changes to be updated.
187+
Thread.sleep(1000L);
188+
189+
// Make sure that the internal replicators map still contains remote cluster info
190+
Assert.assertNotNull(replicationClients1.get("r2"));
191+
Assert.assertNotNull(replicationClients1.get("r3"));
192+
Assert.assertNotNull(replicationClients2.get("r1"));
193+
Assert.assertNotNull(replicationClients2.get("r3"));
194+
Assert.assertNotNull(replicationClients3.get("r1"));
195+
Assert.assertNotNull(replicationClients3.get("r2"));
196+
197+
// Case 2: Update the configuration back
198+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
199+
200+
// Wait for config changes to be updated.
201+
Thread.sleep(1000L);
202+
203+
// Make sure that the internal replicators map still contains remote cluster info
204+
Assert.assertNotNull(replicationClients1.get("r2"));
205+
Assert.assertNotNull(replicationClients1.get("r3"));
206+
Assert.assertNotNull(replicationClients2.get("r1"));
207+
Assert.assertNotNull(replicationClients2.get("r3"));
208+
Assert.assertNotNull(replicationClients3.get("r1"));
209+
Assert.assertNotNull(replicationClients3.get("r2"));
210+
211+
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
142212
}
143-
144-
private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
145-
146213
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 32 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@
4444
import java.util.SortedSet;
4545
import java.util.TreeSet;
4646
import java.util.UUID;
47-
import java.util.concurrent.Callable;
4847
import java.util.concurrent.CompletableFuture;
4948
import java.util.concurrent.ConcurrentHashMap;
5049
import java.util.concurrent.CountDownLatch;
5150
import java.util.concurrent.ExecutorService;
5251
import java.util.concurrent.Executors;
53-
import java.util.concurrent.Future;
5452
import java.util.concurrent.TimeUnit;
5553
import java.util.concurrent.atomic.AtomicBoolean;
5654
import java.util.stream.Collectors;
@@ -68,6 +66,7 @@
6866
import org.apache.bookkeeper.mledger.impl.PositionImpl;
6967
import org.apache.pulsar.broker.BrokerTestUtil;
7068
import org.apache.pulsar.broker.PulsarService;
69+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
7170
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
7271
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
7372
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
154153
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
155154
}
156155

157-
@Test(priority = Integer.MAX_VALUE)
158-
public void testConfigChange() throws Exception {
159-
log.info("--- Starting ReplicatorTest::testConfigChange ---");
160-
// This test is to verify that the config change on global namespace is successfully applied in broker during
161-
// runtime.
162-
// Run a set of producer tasks to create the topics
163-
List<Future<Void>> results = new ArrayList<>();
164-
for (int i = 0; i < 10; i++) {
165-
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
166-
167-
results.add(executor.submit(new Callable<Void>() {
168-
@Override
169-
public Void call() throws Exception {
170-
171-
@Cleanup
172-
MessageProducer producer = new MessageProducer(url1, dest);
173-
log.info("--- Starting producer --- " + url1);
174-
175-
@Cleanup
176-
MessageConsumer consumer = new MessageConsumer(url1, dest);
177-
log.info("--- Starting Consumer --- " + url1);
178-
179-
producer.produce(2);
180-
consumer.receive(2);
181-
return null;
182-
}
183-
}));
184-
}
185-
186-
for (Future<Void> result : results) {
187-
try {
188-
result.get();
189-
} catch (Exception e) {
190-
log.error("exception in getting future result ", e);
191-
fail(String.format("replication test failed with %s exception", e.getMessage()));
192-
}
193-
}
194-
195-
Thread.sleep(1000L);
196-
// Make sure that the internal replicators map contains remote cluster info
197-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
198-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
199-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
200-
201-
Assert.assertNotNull(replicationClients1.get("r2"));
202-
Assert.assertNotNull(replicationClients1.get("r3"));
203-
Assert.assertNotNull(replicationClients2.get("r1"));
204-
Assert.assertNotNull(replicationClients2.get("r3"));
205-
Assert.assertNotNull(replicationClients3.get("r1"));
206-
Assert.assertNotNull(replicationClients3.get("r2"));
207-
208-
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
209-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
210-
211-
// Wait for config changes to be updated.
212-
Thread.sleep(1000L);
213-
214-
// Make sure that the internal replicators map still contains remote cluster info
215-
Assert.assertNotNull(replicationClients1.get("r2"));
216-
Assert.assertNotNull(replicationClients1.get("r3"));
217-
Assert.assertNotNull(replicationClients2.get("r1"));
218-
Assert.assertNotNull(replicationClients2.get("r3"));
219-
Assert.assertNotNull(replicationClients3.get("r1"));
220-
Assert.assertNotNull(replicationClients3.get("r2"));
221-
222-
// Case 2: Update the configuration back
223-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
224-
225-
// Wait for config changes to be updated.
226-
Thread.sleep(1000L);
227-
228-
// Make sure that the internal replicators map still contains remote cluster info
229-
Assert.assertNotNull(replicationClients1.get("r2"));
230-
Assert.assertNotNull(replicationClients1.get("r3"));
231-
Assert.assertNotNull(replicationClients2.get("r1"));
232-
Assert.assertNotNull(replicationClients2.get("r3"));
233-
Assert.assertNotNull(replicationClients3.get("r1"));
234-
Assert.assertNotNull(replicationClients3.get("r2"));
235-
236-
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
237-
}
238-
239156
@Test(timeOut = 10000)
240157
public void activeBrokerParse() throws Exception {
241158
pulsar1.getConfiguration().setAuthorizationEnabled(true);
@@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
253170
pulsar1.getConfiguration().setAuthorizationEnabled(false);
254171
}
255172

173+
@Test
174+
public void testForcefullyTopicDeletion() throws Exception {
175+
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
176+
177+
final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
178+
admin1.namespaces().createNamespace(namespace);
179+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
180+
181+
final String topicName = "persistent://" + namespace + "/topic";
182+
183+
@Cleanup
184+
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
185+
.build();
186+
187+
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
188+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
189+
producer1.close();
190+
191+
admin1.topics().delete(topicName, true);
192+
193+
MockedPulsarServiceBaseTest
194+
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
195+
196+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
197+
}
198+
256199
@SuppressWarnings("unchecked")
257200
@Test(timeOut = 30000)
258201
public void testConcurrentReplicator() throws Exception {
@@ -1270,7 +1213,7 @@ public void testReplicatedCluster() throws Exception {
12701213

12711214
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
12721215

1273-
final String namespace = "pulsar/global/repl";
1216+
final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl");
12741217
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
12751218
admin1.namespaces().createNamespace(namespace);
12761219
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
@@ -1677,7 +1620,7 @@ public void testReplicatorWithFailedAck() throws Exception {
16771620

16781621
log.info("--- Starting ReplicatorTest::testReplication ---");
16791622

1680-
String namespace = "pulsar/global/ns2";
1623+
String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
16811624
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
16821625
final TopicName dest = TopicName
16831626
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
@@ -1749,7 +1692,7 @@ public void testReplicatorWithFailedAck() throws Exception {
17491692
@Test
17501693
public void testWhenUpdateReplicationCluster() throws Exception {
17511694
log.info("--- testWhenUpdateReplicationCluster ---");
1752-
String namespace = "pulsar/ns2";
1695+
String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
17531696
admin1.namespaces().createNamespace(namespace);
17541697
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
17551698
final TopicName dest = TopicName.get(
@@ -1778,12 +1721,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
17781721
@Test
17791722
public void testReplicatorProducerNotExceed() throws Exception {
17801723
log.info("--- testReplicatorProducerNotExceed ---");
1781-
String namespace1 = "pulsar/ns11";
1724+
String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
17821725
admin1.namespaces().createNamespace(namespace1);
17831726
admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
17841727
final TopicName dest1 = TopicName.get(
17851728
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
1786-
String namespace2 = "pulsar/ns22";
1729+
String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
17871730
admin2.namespaces().createNamespace(namespace2);
17881731
admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
17891732
final TopicName dest2 = TopicName.get(

0 commit comments

Comments
 (0)