Skip to content

Commit 340d60d

Browse files
authored
[fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed (apache#22580)
1 parent 6fdc0e3 commit 340d60d

File tree

2 files changed

+91
-71
lines changed

2 files changed

+91
-71
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1265,7 +1265,8 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
12651265
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
12661266
} catch (Throwable e) {
12671267
log.warn("Failed to create topic {}", topic, e);
1268-
return FutureUtil.failedFuture(e);
1268+
topicFuture.completeExceptionally(e);
1269+
return topicFuture;
12691270
}
12701271
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
12711272
isOwner.thenRun(() -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 89 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,23 @@
2020

2121
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
2222
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
23+
import static org.mockito.ArgumentMatchers.anyString;
2324
import static org.mockito.Mockito.any;
2425
import static org.mockito.Mockito.doReturn;
26+
import static org.mockito.Mockito.mock;
2527
import static org.mockito.Mockito.spy;
28+
import static org.mockito.Mockito.when;
2629
import static org.testng.Assert.assertEquals;
2730
import static org.testng.Assert.assertFalse;
2831
import static org.testng.Assert.assertNotNull;
2932
import static org.testng.Assert.assertNull;
3033
import static org.testng.Assert.assertTrue;
3134
import static org.testng.Assert.fail;
35+
import com.google.common.collect.Multimap;
3236
import com.google.common.collect.Sets;
3337
import com.google.gson.Gson;
3438
import com.google.gson.JsonArray;
3539
import com.google.gson.JsonObject;
36-
import com.google.gson.JsonPrimitive;
3740
import io.netty.buffer.ByteBuf;
3841
import io.netty.channel.EventLoopGroup;
3942
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -79,6 +82,7 @@
7982
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
8083
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
8184
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
85+
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
8286
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
8387
import org.apache.pulsar.client.admin.BrokerStats;
8488
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -113,7 +117,11 @@
113117
import org.apache.pulsar.common.protocol.Commands;
114118
import org.apache.pulsar.common.util.netty.EventLoopUtil;
115119
import org.apache.pulsar.compaction.Compactor;
120+
import org.apache.zookeeper.KeeperException;
121+
import org.apache.zookeeper.MockZooKeeper;
116122
import org.awaitility.Awaitility;
123+
import org.glassfish.jersey.client.JerseyClient;
124+
import org.glassfish.jersey.client.JerseyClientBuilder;
117125
import org.mockito.Mockito;
118126
import org.testng.Assert;
119127
import org.testng.annotations.AfterClass;
@@ -1589,82 +1597,93 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
15891597
});
15901598
}
15911599

1592-
// this test is disabled since it is flaky
1593-
@Test(enabled = false)
1594-
public void testBrokerStatsTopicLoadFailed() throws Exception {
1595-
admin.namespaces().createNamespace("prop/ns-test");
1596-
1597-
String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID();
1598-
String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID();
1599-
1600-
BrokerService brokerService = pulsar.getBrokerService();
1601-
brokerService = Mockito.spy(brokerService);
1602-
// mock create persistent topic failed
1603-
Mockito
1604-
.doAnswer(invocation -> {
1605-
CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>();
1606-
f.completeExceptionally(new RuntimeException("This is an exception"));
1607-
return f;
1608-
})
1609-
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));
1610-
1611-
// mock create non-persistent topic failed
1612-
Mockito
1613-
.doAnswer(inv -> {
1614-
CompletableFuture<Void> f = new CompletableFuture<>();
1615-
f.completeExceptionally(new RuntimeException("This is an exception"));
1616-
return f;
1617-
})
1618-
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));
1619-
1620-
1621-
PulsarService pulsarService = pulsar;
1622-
Field field = PulsarService.class.getDeclaredField("brokerService");
1623-
field.setAccessible(true);
1624-
field.set(pulsarService, brokerService);
1625-
1626-
CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING)
1627-
.topic(persistentTopic)
1628-
.createAsync();
1629-
CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING)
1630-
.topic(nonPersistentTopic)
1631-
.createAsync();
1632-
1633-
producer.whenComplete((v, t) -> {
1634-
if (t == null) {
1635-
try {
1636-
v.close();
1637-
} catch (PulsarClientException e) {
1638-
// ignore
1639-
}
1600+
@Test
1601+
public void testMetricsPersistentTopicLoadFails() throws Exception {
1602+
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
1603+
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
1604+
admin.namespaces().createNamespace(namespace);
1605+
admin.topics().createNonPartitionedTopic(topic);
1606+
admin.topics().unload(topic);
1607+
1608+
// Inject an error that makes the topic load fails.
1609+
AtomicBoolean failMarker = new AtomicBoolean(true);
1610+
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
1611+
if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
1612+
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
1613+
return true;
16401614
}
1615+
return false;
16411616
});
1642-
producer1.whenComplete((v, t) -> {
1643-
if (t == null) {
1644-
try {
1645-
v.close();
1646-
} catch (PulsarClientException e) {
1647-
// ignore
1648-
}
1617+
1618+
// Do test
1619+
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
1620+
JerseyClient httpClient = JerseyClientBuilder.createClient();
1621+
Awaitility.await().until(() -> {
1622+
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
1623+
.request().get(String.class);
1624+
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
1625+
if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
1626+
return false;
1627+
}
1628+
double topic_load_failed_count = 0;
1629+
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) {
1630+
topic_load_failed_count += metric.value;
16491631
}
1632+
return topic_load_failed_count >= 1D;
16501633
});
16511634

1652-
Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
1653-
String json = admin.brokerStats().getMetrics();
1654-
JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
1655-
AtomicBoolean flag = new AtomicBoolean(false);
1656-
1657-
metrics.forEach(ele -> {
1658-
JsonObject obj = ((JsonObject) ele);
1659-
JsonObject metrics0 = (JsonObject) obj.get("metrics");
1660-
JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count");
1661-
if (null != v && v.getAsDouble() >= 2D) {
1662-
flag.set(true);
1663-
}
1664-
});
1635+
// Remove the injection.
1636+
failMarker.set(false);
1637+
// cleanup.
1638+
httpClient.close();
1639+
producer.join().close();
1640+
admin.topics().delete(topic);
1641+
admin.namespaces().deleteNamespace(namespace);
1642+
}
16651643

1666-
return flag.get();
1644+
@Test
1645+
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
1646+
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
1647+
String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID();
1648+
admin.namespaces().createNamespace(namespace);
1649+
1650+
// Inject an error that makes the topic load fails.
1651+
// Since we did not set a topic factory name, the "topicFactory" variable is null, inject a mocked
1652+
// "topicFactory".
1653+
Field fieldTopicFactory = BrokerService.class.getDeclaredField("topicFactory");
1654+
fieldTopicFactory.setAccessible(true);
1655+
TopicFactory originalTopicFactory = (TopicFactory) fieldTopicFactory.get(pulsar.getBrokerService());
1656+
assertNull(originalTopicFactory);
1657+
TopicFactory mockedTopicFactory = mock(TopicFactory.class);
1658+
when(mockedTopicFactory.create(anyString(), any(), any(), any()))
1659+
.thenThrow(new RuntimeException("mocked error"));
1660+
fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory);
1661+
1662+
// Do test.
1663+
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
1664+
JerseyClient httpClient = JerseyClientBuilder.createClient();
1665+
Awaitility.await().until(() -> {
1666+
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
1667+
.request().get(String.class);
1668+
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
1669+
if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
1670+
return false;
1671+
}
1672+
double topic_load_failed_count = 0;
1673+
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) {
1674+
topic_load_failed_count += metric.value;
1675+
}
1676+
return topic_load_failed_count >= 1D;
16671677
});
1678+
1679+
// Remove the injection.
1680+
fieldTopicFactory.set(pulsar.getBrokerService(), null);
1681+
1682+
// cleanup.
1683+
httpClient.close();
1684+
producer.join().close();
1685+
admin.topics().delete(topic);
1686+
admin.namespaces().deleteNamespace(namespace);
16681687
}
16691688

16701689
@Test

0 commit comments

Comments
 (0)