Skip to content

Commit 29108c1

Browse files
poorbarcodenodece
authored andcommitted
[fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed (apache#22580)
(cherry picked from commit 340d60d)
1 parent dd95246 commit 29108c1

File tree

2 files changed

+121
-2
lines changed

2 files changed

+121
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,8 +1233,8 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
12331233
if (log.isDebugEnabled()) {
12341234
log.debug("Broker is unable to load non-persistent topic {}", topic);
12351235
}
1236-
return FutureUtil.failedFuture(
1237-
new NotAllowedException("Broker is not unable to load non-persistent topic"));
1236+
topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load non-persistent topic"));
1237+
return topicFuture;
12381238
}
12391239
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
12401240
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.IOException;
4242
import java.io.InputStream;
4343
import java.io.InputStreamReader;
44+
import java.io.StringReader;
4445
import java.lang.reflect.Array;
4546
import java.lang.reflect.Field;
4647
import java.nio.charset.StandardCharsets;
@@ -67,6 +68,7 @@
6768
import org.apache.bookkeeper.mledger.ManagedLedgerException;
6869
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
6970
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
71+
import org.apache.commons.lang3.StringUtils;
7072
import org.apache.http.HttpResponse;
7173
import org.apache.http.client.HttpClient;
7274
import org.apache.http.client.methods.HttpGet;
@@ -102,6 +104,10 @@
102104
import org.apache.pulsar.common.protocol.Commands;
103105
import org.apache.pulsar.common.util.netty.EventLoopUtil;
104106
import org.awaitility.Awaitility;
107+
import org.apache.zookeeper.KeeperException;
108+
import org.apache.zookeeper.MockZooKeeper;
109+
import org.glassfish.jersey.client.JerseyClient;
110+
import org.glassfish.jersey.client.JerseyClientBuilder;
105111
import org.testng.Assert;
106112
import org.testng.annotations.AfterClass;
107113
import org.testng.annotations.BeforeClass;
@@ -1461,4 +1467,117 @@ public void testDuplicateAcknowledgement() throws Exception {
14611467
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
14621468
.get("sub-1").getUnackedMessages(), 0);
14631469
}
1470+
1471+
@Test
1472+
public void testMetricsPersistentTopicLoadFails() throws Exception {
1473+
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
1474+
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
1475+
admin.namespaces().createNamespace(namespace);
1476+
admin.topics().createNonPartitionedTopic(topic);
1477+
admin.topics().unload(topic);
1478+
1479+
// Inject an error that makes the topic load fails.
1480+
AtomicBoolean failMarker = new AtomicBoolean(true);
1481+
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
1482+
if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
1483+
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
1484+
return true;
1485+
}
1486+
return false;
1487+
});
1488+
1489+
// Do test
1490+
Thread.sleep(1000 * 3);
1491+
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
1492+
JerseyClient httpClient = JerseyClientBuilder.createClient();
1493+
Awaitility.await().until(() -> {
1494+
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
1495+
.request().get(String.class);
1496+
BufferedReader reader = new BufferedReader(new StringReader(response));
1497+
String line;
1498+
String metricsLine = null;
1499+
while ((line = reader.readLine()) != null) {
1500+
if (StringUtils.isBlank(line)) {
1501+
continue;
1502+
}
1503+
if (line.startsWith("#")) {
1504+
continue;
1505+
}
1506+
if (line.contains("topic_load_failed")) {
1507+
metricsLine = line;
1508+
break;
1509+
}
1510+
}
1511+
log.info("topic_load_failed: {}", metricsLine);
1512+
if (metricsLine == null) {
1513+
return false;
1514+
}
1515+
reader.close();
1516+
String[] parts = metricsLine.split(" ");
1517+
Double value = Double.valueOf(parts[parts.length - 1]);
1518+
return value >= 1D;
1519+
});
1520+
1521+
// Remove the injection.
1522+
failMarker.set(false);
1523+
// cleanup.
1524+
httpClient.close();
1525+
producer.join().close();
1526+
admin.topics().delete(topic);
1527+
admin.namespaces().deleteNamespace(namespace);
1528+
}
1529+
1530+
@Test
1531+
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
1532+
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
1533+
String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID();
1534+
admin.namespaces().createNamespace(namespace);
1535+
1536+
// Inject an error that makes the topic load fails.
1537+
pulsar.getConfiguration().setEnableNonPersistentTopics(false);
1538+
1539+
// Do test.
1540+
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
1541+
JerseyClient httpClient = JerseyClientBuilder.createClient();
1542+
Awaitility.await().until(() -> {
1543+
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
1544+
.request().get(String.class);
1545+
BufferedReader reader = new BufferedReader(new StringReader(response));
1546+
String line;
1547+
String metricsLine = null;
1548+
while ((line = reader.readLine()) != null) {
1549+
if (StringUtils.isBlank(line)) {
1550+
continue;
1551+
}
1552+
if (line.startsWith("#")) {
1553+
continue;
1554+
}
1555+
if (line.contains("topic_load_failed")) {
1556+
metricsLine = line;
1557+
break;
1558+
}
1559+
}
1560+
log.info("topic_load_failed: {}", metricsLine);
1561+
if (metricsLine == null) {
1562+
return false;
1563+
}
1564+
reader.close();
1565+
String[] parts = metricsLine.split(" ");
1566+
Double value = Double.valueOf(parts[parts.length - 1]);
1567+
return value >= 1D;
1568+
});
1569+
1570+
// Remove the injection.
1571+
pulsar.getConfiguration().setEnableNonPersistentTopics(true);
1572+
1573+
// cleanup.
1574+
httpClient.close();
1575+
try {
1576+
producer.join().close();
1577+
} catch (Exception ex) {
1578+
// The producer creation failed, so skip to close it.
1579+
}
1580+
admin.topics().delete(topic);
1581+
admin.namespaces().deleteNamespace(namespace);
1582+
}
14641583
}

0 commit comments

Comments
 (0)