Skip to content

Commit 9242f33

Browse files
committed
[fix] [test] Fix flaky test ReplicatorTest (apache#22594)
(cherry picked from commit 6fdc0e3)
1 parent 620fe9b commit 9242f33

File tree

2 files changed

+137
-124
lines changed

2 files changed

+137
-124
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java

Lines changed: 98 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,22 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.fail;
2122
import com.google.common.collect.Sets;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.Future;
2227
import lombok.Cleanup;
2328
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
29+
import lombok.extern.slf4j.Slf4j;
30+
import org.apache.pulsar.broker.BrokerTestUtil;
2431
import org.apache.pulsar.client.api.MessageRoutingMode;
2532
import org.apache.pulsar.client.api.PulsarClient;
2633
import org.apache.pulsar.client.impl.ConsumerImpl;
2734
import org.apache.pulsar.client.impl.ProducerImpl;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
35+
import org.apache.pulsar.common.naming.TopicName;
36+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
3037
import org.testng.Assert;
3138
import org.testng.annotations.AfterClass;
3239
import org.testng.annotations.BeforeClass;
@@ -36,6 +43,11 @@
3643
import java.lang.reflect.Method;
3744
import java.util.concurrent.TimeUnit;
3845

46+
/**
47+
* The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
48+
* a lot of topic deletion and makes namespace policies being incorrect.
49+
*/
50+
@Slf4j
3951
@Test(groups = "broker-impl")
4052
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
4153

@@ -64,7 +76,7 @@ public void cleanup() throws Exception {
6476
*
6577
* @throws Exception
6678
*/
67-
@Test
79+
@Test(priority = Integer.MAX_VALUE)
6880
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
6981
log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
7082

@@ -100,32 +112,88 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
100112

101113
}
102114

103-
@Test
104-
public void testForcefullyTopicDeletion() throws Exception {
105-
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
106-
107-
final String namespace = "pulsar/removeClusterTest";
108-
admin1.namespaces().createNamespace(namespace);
109-
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
110-
111-
final String topicName = "persistent://" + namespace + "/topic";
112-
113-
@Cleanup
114-
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
115-
.build();
116-
117-
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
118-
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
119-
producer1.close();
120-
121-
admin1.topics().delete(topicName, true);
122-
123-
MockedPulsarServiceBaseTest
124-
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
125-
126-
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
115+
/**
116+
* This is not a formal operation and can cause serious problems if call it in a production environment.
117+
*/
118+
@Test(priority = Integer.MAX_VALUE - 1)
119+
public void testConfigChange() throws Exception {
120+
log.info("--- Starting ReplicatorTest::testConfigChange ---");
121+
// This test is to verify that the config change on global namespace is successfully applied in broker during
122+
// runtime.
123+
// Run a set of producer tasks to create the topics
124+
List<Future<Void>> results = new ArrayList<>();
125+
for (int i = 0; i < 10; i++) {
126+
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
127+
128+
results.add(executor.submit(new Callable<Void>() {
129+
@Override
130+
public Void call() throws Exception {
131+
132+
@Cleanup
133+
MessageProducer producer = new MessageProducer(url1, dest);
134+
log.info("--- Starting producer --- " + url1);
135+
136+
@Cleanup
137+
MessageConsumer consumer = new MessageConsumer(url1, dest);
138+
log.info("--- Starting Consumer --- " + url1);
139+
140+
producer.produce(2);
141+
consumer.receive(2);
142+
return null;
143+
}
144+
}));
145+
}
146+
147+
for (Future<Void> result : results) {
148+
try {
149+
result.get();
150+
} catch (Exception e) {
151+
log.error("exception in getting future result ", e);
152+
fail(String.format("replication test failed with %s exception", e.getMessage()));
153+
}
154+
}
155+
156+
Thread.sleep(1000L);
157+
// Make sure that the internal replicators map contains remote cluster info
158+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
159+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
160+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
161+
162+
Assert.assertNotNull(replicationClients1.get("r2"));
163+
Assert.assertNotNull(replicationClients1.get("r3"));
164+
Assert.assertNotNull(replicationClients2.get("r1"));
165+
Assert.assertNotNull(replicationClients2.get("r3"));
166+
Assert.assertNotNull(replicationClients3.get("r1"));
167+
Assert.assertNotNull(replicationClients3.get("r2"));
168+
169+
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
170+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
171+
172+
// Wait for config changes to be updated.
173+
Thread.sleep(1000L);
174+
175+
// Make sure that the internal replicators map still contains remote cluster info
176+
Assert.assertNotNull(replicationClients1.get("r2"));
177+
Assert.assertNotNull(replicationClients1.get("r3"));
178+
Assert.assertNotNull(replicationClients2.get("r1"));
179+
Assert.assertNotNull(replicationClients2.get("r3"));
180+
Assert.assertNotNull(replicationClients3.get("r1"));
181+
Assert.assertNotNull(replicationClients3.get("r2"));
182+
183+
// Case 2: Update the configuration back
184+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
185+
186+
// Wait for config changes to be updated.
187+
Thread.sleep(1000L);
188+
189+
// Make sure that the internal replicators map still contains remote cluster info
190+
Assert.assertNotNull(replicationClients1.get("r2"));
191+
Assert.assertNotNull(replicationClients1.get("r3"));
192+
Assert.assertNotNull(replicationClients2.get("r1"));
193+
Assert.assertNotNull(replicationClients2.get("r3"));
194+
Assert.assertNotNull(replicationClients3.get("r1"));
195+
Assert.assertNotNull(replicationClients3.get("r2"));
196+
197+
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
127198
}
128-
129-
private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
130-
131199
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 39 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@
4444
import java.util.SortedSet;
4545
import java.util.TreeSet;
4646
import java.util.UUID;
47-
import java.util.concurrent.Callable;
4847
import java.util.concurrent.CompletableFuture;
4948
import java.util.concurrent.ConcurrentHashMap;
5049
import java.util.concurrent.CountDownLatch;
5150
import java.util.concurrent.ExecutorService;
5251
import java.util.concurrent.Executors;
53-
import java.util.concurrent.Future;
5452
import java.util.concurrent.TimeUnit;
5553
import java.util.concurrent.atomic.AtomicBoolean;
5654
import java.util.stream.Collectors;
@@ -68,6 +66,7 @@
6866
import org.apache.bookkeeper.mledger.impl.PositionImpl;
6967
import org.apache.pulsar.broker.BrokerTestUtil;
7068
import org.apache.pulsar.broker.PulsarService;
69+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
7170
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
7271
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
7372
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
154153
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
155154
}
156155

157-
@Test(priority = Integer.MAX_VALUE)
158-
public void testConfigChange() throws Exception {
159-
log.info("--- Starting ReplicatorTest::testConfigChange ---");
160-
// This test is to verify that the config change on global namespace is successfully applied in broker during
161-
// runtime.
162-
// Run a set of producer tasks to create the topics
163-
List<Future<Void>> results = new ArrayList<>();
164-
for (int i = 0; i < 10; i++) {
165-
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
166-
167-
results.add(executor.submit(new Callable<Void>() {
168-
@Override
169-
public Void call() throws Exception {
170-
171-
@Cleanup
172-
MessageProducer producer = new MessageProducer(url1, dest);
173-
log.info("--- Starting producer --- " + url1);
174-
175-
@Cleanup
176-
MessageConsumer consumer = new MessageConsumer(url1, dest);
177-
log.info("--- Starting Consumer --- " + url1);
178-
179-
producer.produce(2);
180-
consumer.receive(2);
181-
return null;
182-
}
183-
}));
184-
}
185-
186-
for (Future<Void> result : results) {
187-
try {
188-
result.get();
189-
} catch (Exception e) {
190-
log.error("exception in getting future result ", e);
191-
fail(String.format("replication test failed with %s exception", e.getMessage()));
192-
}
193-
}
194-
195-
Thread.sleep(1000L);
196-
// Make sure that the internal replicators map contains remote cluster info
197-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
198-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
199-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
200-
201-
Assert.assertNotNull(replicationClients1.get("r2"));
202-
Assert.assertNotNull(replicationClients1.get("r3"));
203-
Assert.assertNotNull(replicationClients2.get("r1"));
204-
Assert.assertNotNull(replicationClients2.get("r3"));
205-
Assert.assertNotNull(replicationClients3.get("r1"));
206-
Assert.assertNotNull(replicationClients3.get("r2"));
207-
208-
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
209-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
210-
211-
// Wait for config changes to be updated.
212-
Thread.sleep(1000L);
213-
214-
// Make sure that the internal replicators map still contains remote cluster info
215-
Assert.assertNotNull(replicationClients1.get("r2"));
216-
Assert.assertNotNull(replicationClients1.get("r3"));
217-
Assert.assertNotNull(replicationClients2.get("r1"));
218-
Assert.assertNotNull(replicationClients2.get("r3"));
219-
Assert.assertNotNull(replicationClients3.get("r1"));
220-
Assert.assertNotNull(replicationClients3.get("r2"));
221-
222-
// Case 2: Update the configuration back
223-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
224-
225-
// Wait for config changes to be updated.
226-
Thread.sleep(1000L);
227-
228-
// Make sure that the internal replicators map still contains remote cluster info
229-
Assert.assertNotNull(replicationClients1.get("r2"));
230-
Assert.assertNotNull(replicationClients1.get("r3"));
231-
Assert.assertNotNull(replicationClients2.get("r1"));
232-
Assert.assertNotNull(replicationClients2.get("r3"));
233-
Assert.assertNotNull(replicationClients3.get("r1"));
234-
Assert.assertNotNull(replicationClients3.get("r2"));
235-
236-
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
237-
}
238-
239156
@Test(timeOut = 10000)
240157
public void activeBrokerParse() throws Exception {
241158
pulsar1.getConfiguration().setAuthorizationEnabled(true);
@@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
253170
pulsar1.getConfiguration().setAuthorizationEnabled(false);
254171
}
255172

173+
@Test
174+
public void testForcefullyTopicDeletion() throws Exception {
175+
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
176+
177+
final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
178+
admin1.namespaces().createNamespace(namespace);
179+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
180+
181+
final String topicName = "persistent://" + namespace + "/topic";
182+
183+
@Cleanup
184+
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
185+
.build();
186+
187+
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
188+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
189+
producer1.close();
190+
191+
admin1.topics().delete(topicName, true);
192+
193+
MockedPulsarServiceBaseTest
194+
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
195+
196+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
197+
}
198+
256199
@SuppressWarnings("unchecked")
257200
@Test(timeOut = 30000)
258201
public void testConcurrentReplicator() throws Exception {
@@ -1240,7 +1183,7 @@ public void testReplicatedCluster() throws Exception {
12401183

12411184
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
12421185

1243-
final String namespace = "pulsar/global/repl";
1186+
final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl");
12441187
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
12451188
admin1.namespaces().createNamespace(namespace);
12461189
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
@@ -1647,7 +1590,7 @@ public void testReplicatorWithFailedAck() throws Exception {
16471590

16481591
log.info("--- Starting ReplicatorTest::testReplication ---");
16491592

1650-
String namespace = "pulsar/global/ns2";
1593+
String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
16511594
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
16521595
final TopicName dest = TopicName
16531596
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
@@ -1685,13 +1628,15 @@ public void testReplicatorWithFailedAck() throws Exception {
16851628

16861629
MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
16871630
Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
1688-
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
1689-
PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");
16901631

16911632
Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS)
1692-
.untilAsserted(() -> assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
1693-
replicator.getState()));
1694-
assertEquals(replicator.getState(), org.apache.pulsar.broker.service.AbstractReplicator.State.Started);
1633+
.ignoreExceptions()
1634+
.untilAsserted(() -> {
1635+
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
1636+
PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");
1637+
assertEquals(org.apache.pulsar.broker.service.AbstractReplicator.State.Started,
1638+
replicator.getState());
1639+
});
16951640

16961641
// Make sure all the data has replicated to the remote cluster before close the cursor.
16971642
Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition));
@@ -1717,7 +1662,7 @@ public void testReplicatorWithFailedAck() throws Exception {
17171662
@Test
17181663
public void testWhenUpdateReplicationCluster() throws Exception {
17191664
log.info("--- testWhenUpdateReplicationCluster ---");
1720-
String namespace = "pulsar/ns2";
1665+
String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
17211666
admin1.namespaces().createNamespace(namespace);
17221667
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
17231668
final TopicName dest = TopicName.get(
@@ -1746,12 +1691,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
17461691
@Test
17471692
public void testReplicatorProducerNotExceed() throws Exception {
17481693
log.info("--- testReplicatorProducerNotExceed ---");
1749-
String namespace1 = "pulsar/ns11";
1694+
String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
17501695
admin1.namespaces().createNamespace(namespace1);
17511696
admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
17521697
final TopicName dest1 = TopicName.get(
17531698
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
1754-
String namespace2 = "pulsar/ns22";
1699+
String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
17551700
admin2.namespaces().createNamespace(namespace2);
17561701
admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
17571702
final TopicName dest2 = TopicName.get(

0 commit comments

Comments
 (0)