Skip to content

Commit be08b10

Browse files
committed
[fix] [test] Fix flaky test ReplicatorTest
1 parent d475655 commit be08b10

File tree

3 files changed

+241
-235
lines changed

3 files changed

+241
-235
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.testng.Assert.fail;
22+
import com.google.common.collect.Sets;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.Future;
27+
import lombok.Cleanup;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.pulsar.broker.BrokerTestUtil;
30+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
31+
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
32+
import org.apache.pulsar.client.api.MessageRoutingMode;
33+
import org.apache.pulsar.client.api.PulsarClient;
34+
import org.apache.pulsar.client.impl.ConsumerImpl;
35+
import org.apache.pulsar.client.impl.ProducerImpl;
36+
import org.apache.pulsar.common.naming.TopicName;
37+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
38+
import org.awaitility.Awaitility;
39+
import org.testng.Assert;
40+
import org.testng.annotations.AfterClass;
41+
import org.testng.annotations.BeforeClass;
42+
import org.testng.annotations.BeforeMethod;
43+
import org.testng.annotations.DataProvider;
44+
import org.testng.annotations.Factory;
45+
import org.testng.annotations.Test;
46+
47+
import java.lang.reflect.Method;
48+
import java.util.concurrent.TimeUnit;
49+
50+
@Slf4j
51+
@Test(groups = "broker-impl")
52+
public class ReplicatorGlobalNSDangerousOperationTest extends ReplicatorTestBase {
53+
54+
protected String methodName;
55+
@DataProvider(name = "loadManagerClassName")
56+
public static Object[][] loadManagerClassName() {
57+
return new Object[][]{
58+
{ModularLoadManagerImpl.class.getName()},
59+
{ExtensibleLoadManagerImpl.class.getName()}
60+
};
61+
}
62+
63+
@Factory(dataProvider = "loadManagerClassName")
64+
public ReplicatorGlobalNSDangerousOperationTest(String loadManagerClassName) {
65+
this.loadManagerClassName = loadManagerClassName;
66+
}
67+
68+
@BeforeMethod
69+
public void beforeMethod(Method m) {
70+
methodName = m.getName();
71+
}
72+
73+
@Override
74+
@BeforeClass(timeOut = 300000)
75+
public void setup() throws Exception {
76+
super.setup();
77+
}
78+
79+
@Override
80+
@AfterClass(alwaysRun = true, timeOut = 300000)
81+
public void cleanup() throws Exception {
82+
super.cleanup();
83+
}
84+
85+
/**
86+
* If local cluster is removed from the global namespace then all topics under that namespace should be deleted from
87+
* the cluster.
88+
*
89+
* @throws Exception
90+
*/
91+
@Test(priority = Integer.MAX_VALUE)
92+
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
93+
log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
94+
95+
final String namespace = "pulsar/global/removeClusterTest";
96+
admin1.namespaces().createNamespace(namespace);
97+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
98+
99+
final String topicName = "persistent://" + namespace + "/topic";
100+
101+
@Cleanup
102+
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
103+
.build();
104+
@Cleanup
105+
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
106+
.build();
107+
108+
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
109+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
110+
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) client1.newConsumer().topic(topicName)
111+
.subscriptionName("sub1").subscribe();
112+
ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) client2.newConsumer().topic(topicName)
113+
.subscriptionName("sub1").subscribe();
114+
115+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r2", "r3"));
116+
117+
Awaitility.await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
118+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
119+
Assert.assertFalse(producer1.isConnected());
120+
Assert.assertFalse(consumer1.isConnected());
121+
Assert.assertTrue(consumer2.isConnected());
122+
});
123+
}
124+
125+
/**
126+
* This is not a formal operation and can cause serious problems if call it in a production environment.
127+
*/
128+
@Test(priority = Integer.MAX_VALUE - 1)
129+
public void testConfigChange() throws Exception {
130+
log.info("--- Starting ReplicatorTest::testConfigChange ---");
131+
// This test is to verify that the config change on global namespace is successfully applied in broker during
132+
// runtime.
133+
// Run a set of producer tasks to create the topics
134+
List<Future<Void>> results = new ArrayList<>();
135+
for (int i = 0; i < 10; i++) {
136+
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
137+
138+
results.add(executor.submit(new Callable<Void>() {
139+
@Override
140+
public Void call() throws Exception {
141+
142+
@Cleanup
143+
MessageProducer producer = new MessageProducer(url1, dest);
144+
log.info("--- Starting producer --- " + url1);
145+
146+
@Cleanup
147+
MessageConsumer consumer = new MessageConsumer(url1, dest);
148+
log.info("--- Starting Consumer --- " + url1);
149+
150+
producer.produce(2);
151+
consumer.receive(2);
152+
return null;
153+
}
154+
}));
155+
}
156+
157+
for (Future<Void> result : results) {
158+
try {
159+
result.get();
160+
} catch (Exception e) {
161+
log.error("exception in getting future result ", e);
162+
fail(String.format("replication test failed with %s exception", e.getMessage()));
163+
}
164+
}
165+
166+
Thread.sleep(1000L);
167+
// Make sure that the internal replicators map contains remote cluster info
168+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
169+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
170+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
171+
172+
Assert.assertNotNull(replicationClients1.get("r2"));
173+
Assert.assertNotNull(replicationClients1.get("r3"));
174+
Assert.assertNotNull(replicationClients2.get("r1"));
175+
Assert.assertNotNull(replicationClients2.get("r3"));
176+
Assert.assertNotNull(replicationClients3.get("r1"));
177+
Assert.assertNotNull(replicationClients3.get("r2"));
178+
179+
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
180+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
181+
182+
// Wait for config changes to be updated.
183+
Thread.sleep(1000L);
184+
185+
// Make sure that the internal replicators map still contains remote cluster info
186+
Assert.assertNotNull(replicationClients1.get("r2"));
187+
Assert.assertNotNull(replicationClients1.get("r3"));
188+
Assert.assertNotNull(replicationClients2.get("r1"));
189+
Assert.assertNotNull(replicationClients2.get("r3"));
190+
Assert.assertNotNull(replicationClients3.get("r1"));
191+
Assert.assertNotNull(replicationClients3.get("r2"));
192+
193+
// Case 2: Update the configuration back
194+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
195+
196+
// Wait for config changes to be updated.
197+
Thread.sleep(1000L);
198+
199+
// Make sure that the internal replicators map still contains remote cluster info
200+
Assert.assertNotNull(replicationClients1.get("r2"));
201+
Assert.assertNotNull(replicationClients1.get("r3"));
202+
Assert.assertNotNull(replicationClients2.get("r1"));
203+
Assert.assertNotNull(replicationClients2.get("r3"));
204+
Assert.assertNotNull(replicationClients3.get("r1"));
205+
Assert.assertNotNull(replicationClients3.get("r2"));
206+
207+
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
208+
}
209+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java

Lines changed: 0 additions & 146 deletions
This file was deleted.

0 commit comments

Comments
 (0)