Skip to content

Commit 43b3622

Browse files
tjiumingdao-jun
andauthored
[improve][broker] Add topic_load_failed metric (apache#19236)
Co-authored-by: daojun <[email protected]>
1 parent 510744c commit 43b3622

File tree

5 files changed

+106
-1
lines changed

5 files changed

+106
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,6 +1226,10 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
12261226

12271227
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
12281228
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
1229+
topicFuture.exceptionally(t -> {
1230+
pulsarStats.recordTopicLoadFailed();
1231+
return null;
1232+
});
12291233
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
12301234
if (log.isDebugEnabled()) {
12311235
log.debug("Broker is unable to load non-persistent topic {}", topic);
@@ -1618,6 +1622,11 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
16181622
TopicName topicName = TopicName.get(topic);
16191623
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
16201624

1625+
topicFuture.exceptionally(t -> {
1626+
pulsarStats.recordTopicLoadFailed();
1627+
return null;
1628+
});
1629+
16211630
if (isTransactionInternalName(topicName)) {
16221631
String msg = String.format("Can not create transaction system topic %s", topic);
16231632
log.warn(msg);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) {
265265
}
266266
}
267267

268+
public void recordTopicLoadFailed() {
269+
brokerOperabilityMetrics.recordTopicLoadFailed();
270+
}
271+
268272
public void recordConnectionCreate() {
269273
brokerOperabilityMetrics.recordConnectionCreate();
270274
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.stats;
2020

21+
import io.prometheus.client.Counter;
2122
import java.util.ArrayList;
2223
import java.util.HashMap;
2324
import java.util.List;
@@ -29,6 +30,7 @@
2930
/**
3031
*/
3132
public class BrokerOperabilityMetrics {
33+
private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register();
3234
private final List<Metrics> metricsList;
3335
private final String localCluster;
3436
private final DimensionStats topicLoadStats;
@@ -84,7 +86,9 @@ Map<String, String> getDimensionMap(String metricsName) {
8486
}
8587

8688
Metrics getTopicLoadMetrics() {
87-
return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
89+
Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
90+
metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
91+
return metrics;
8892
}
8993

9094
Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionStats stats) {
@@ -112,6 +116,10 @@ public void recordTopicLoadTimeValue(long topicLoadLatencyMs) {
112116
topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS);
113117
}
114118

119+
public void recordTopicLoadFailed() {
120+
this.TOPIC_LOAD_FAILED.inc();
121+
}
122+
115123
public void recordConnectionCreate() {
116124
this.connectionTotalCreatedCount.increment();
117125
this.connectionActive.increment();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.gson.Gson;
3434
import com.google.gson.JsonArray;
3535
import com.google.gson.JsonObject;
36+
import com.google.gson.JsonPrimitive;
3637
import io.netty.buffer.ByteBuf;
3738
import io.netty.channel.EventLoopGroup;
3839
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -107,6 +108,7 @@
107108
import org.apache.pulsar.common.protocol.Commands;
108109
import org.apache.pulsar.common.util.netty.EventLoopUtil;
109110
import org.awaitility.Awaitility;
111+
import org.mockito.Mockito;
110112
import org.testng.Assert;
111113
import org.testng.annotations.AfterClass;
112114
import org.testng.annotations.BeforeClass;
@@ -1513,4 +1515,82 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
15131515
assertTrue(conf.isForceDeleteTenantAllowed());
15141516
});
15151517
}
1518+
1519+
1520+
@Test
1521+
public void testBrokerStatsTopicLoadFailed() throws Exception {
1522+
admin.namespaces().createNamespace("prop/ns-test");
1523+
1524+
String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID();
1525+
String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID();
1526+
1527+
BrokerService brokerService = pulsar.getBrokerService();
1528+
brokerService = Mockito.spy(brokerService);
1529+
// mock create persistent topic failed
1530+
Mockito
1531+
.doAnswer(invocation -> {
1532+
CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>();
1533+
f.completeExceptionally(new RuntimeException("This is an exception"));
1534+
return f;
1535+
})
1536+
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
1537+
1538+
// mock create non-persistent topic failed
1539+
Mockito
1540+
.doAnswer(inv -> {
1541+
CompletableFuture<Void> f = new CompletableFuture<>();
1542+
f.completeExceptionally(new RuntimeException("This is an exception"));
1543+
return f;
1544+
})
1545+
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
1546+
1547+
1548+
PulsarService pulsarService = pulsar;
1549+
Field field = PulsarService.class.getDeclaredField("brokerService");
1550+
field.setAccessible(true);
1551+
field.set(pulsarService, brokerService);
1552+
1553+
CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING)
1554+
.topic(persistentTopic)
1555+
.createAsync();
1556+
CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING)
1557+
.topic(nonPersistentTopic)
1558+
.createAsync();
1559+
1560+
producer.whenComplete((v, t) -> {
1561+
if (t == null) {
1562+
try {
1563+
v.close();
1564+
} catch (PulsarClientException e) {
1565+
// ignore
1566+
}
1567+
}
1568+
});
1569+
producer1.whenComplete((v, t) -> {
1570+
if (t == null) {
1571+
try {
1572+
v.close();
1573+
} catch (PulsarClientException e) {
1574+
// ignore
1575+
}
1576+
}
1577+
});
1578+
1579+
Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
1580+
String json = admin.brokerStats().getMetrics();
1581+
JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
1582+
AtomicBoolean flag = new AtomicBoolean(false);
1583+
1584+
metrics.forEach(ele -> {
1585+
JsonObject obj = ((JsonObject) ele);
1586+
JsonObject metrics0 = (JsonObject) obj.get("metrics");
1587+
JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count");
1588+
if (null != v && v.getAsDouble() >= 2D) {
1589+
flag.set(true);
1590+
}
1591+
});
1592+
1593+
return flag.get();
1594+
});
1595+
}
15161596
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ public void testPerTopicStats() throws Exception {
333333
assertEquals(cm.size(), 1);
334334
assertEquals(cm.get(0).tags.get("cluster"), "test");
335335

336+
cm = (List<Metric>) metrics.get("topic_load_failed_total");
337+
assertEquals(cm.size(), 1);
338+
assertEquals(cm.get(0).tags.get("cluster"), "test");
339+
336340
cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
337341
assertEquals(cm.size(), 2);
338342
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");

0 commit comments

Comments
 (0)