Skip to content

Commit dd95246

Browse files
tjiumingdao-jun
authored andcommitted
[improve][broker] Add topic_load_failed metric (apache#19236)
Co-authored-by: daojun <[email protected]> (cherry picked from commit 43b3622)
1 parent f421c45 commit dd95246

File tree

4 files changed

+26
-1
lines changed

4 files changed

+26
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,10 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
12251225

12261226
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
12271227
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
1228+
topicFuture.exceptionally(t -> {
1229+
pulsarStats.recordTopicLoadFailed();
1230+
return null;
1231+
});
12281232
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
12291233
if (log.isDebugEnabled()) {
12301234
log.debug("Broker is unable to load non-persistent topic {}", topic);
@@ -1539,6 +1543,11 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
15391543
TopicName topicName = TopicName.get(topic);
15401544
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
15411545

1546+
topicFuture.exceptionally(t -> {
1547+
pulsarStats.recordTopicLoadFailed();
1548+
return null;
1549+
});
1550+
15421551
if (isTransactionSystemTopic(topicName)) {
15431552
String msg = String.format("Can not create transaction system topic %s", topic);
15441553
log.warn(msg);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,10 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) {
240240
}
241241
}
242242

243+
public void recordTopicLoadFailed() {
244+
brokerOperabilityMetrics.recordTopicLoadFailed();
245+
}
246+
243247
public void recordConnectionCreate() {
244248
brokerOperabilityMetrics.recordConnectionCreate();
245249
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.stats;
2020

2121
import com.google.common.collect.Maps;
22+
import io.prometheus.client.Counter;
2223
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.Map;
@@ -29,6 +30,7 @@
2930
/**
3031
*/
3132
public class BrokerOperabilityMetrics {
33+
private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register();
3234
private final List<Metrics> metricsList;
3335
private final String localCluster;
3436
private final DimensionStats topicLoadStats;
@@ -90,7 +92,9 @@ Map<String, String> getDimensionMap(String metricsName) {
9092
}
9193

9294
Metrics getTopicLoadMetrics() {
93-
return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
95+
Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
96+
metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
97+
return metrics;
9498
}
9599

96100
Metrics getZkWriteLatencyMetrics() {
@@ -136,6 +140,10 @@ public void recordZkReadLatencyTimeValue(long topicLoadLatencyMs) {
136140
zkReadLatencyStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS);
137141
}
138142

143+
public void recordTopicLoadFailed() {
144+
this.TOPIC_LOAD_FAILED.inc();
145+
}
146+
139147
public void recordConnectionCreate() {
140148
this.connectionTotalCreatedCount.increment();
141149
this.connectionActive.increment();

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,10 @@ public void testPerTopicStats() throws Exception {
316316
assertEquals(cm.size(), 1);
317317
assertEquals(cm.get(0).tags.get("cluster"), "test");
318318

319+
cm = (List<Metric>) metrics.get("topic_load_failed_total");
320+
assertEquals(cm.size(), 1);
321+
assertEquals(cm.get(0).tags.get("cluster"), "test");
322+
319323
cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
320324
assertEquals(cm.size(), 2);
321325
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");

0 commit comments

Comments
 (0)