Skip to content

Commit 6622ff7

Browse files
authored
[fix][broker] Fix Metadata Event Synchronizer producer creation retry so that the producer gets created eventually (apache#24081)
1 parent 9f38a5c commit 6622ff7

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
22+
import com.google.common.annotations.VisibleForTesting;
2223
import java.util.Arrays;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,13 +53,13 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize
5253
protected BrokerService brokerService;
5354
@Getter
5455
protected String topicName;
55-
protected PulsarClientImpl client;
56+
protected volatile PulsarClientImpl client;
5657
protected volatile Producer<MetadataEvent> producer;
5758
protected volatile Consumer<MetadataEvent> consumer;
5859
private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
5960
listeners = new CopyOnWriteArrayList<>();
6061

61-
static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER =
62+
protected static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER =
6263
AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state");
6364
@Getter
6465
private volatile State state;
@@ -132,7 +133,7 @@ private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
132133
});
133134
}
134135

135-
private void startProducer() {
136+
protected void startProducer() {
136137
if (isClosingOrClosed()) {
137138
log.info("[{}] Skip to start new producer because the synchronizer is closed", topicName);
138139
}
@@ -167,11 +168,16 @@ private void startProducer() {
167168
log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
168169
waitTimeMs / 1000.0);
169170
// BackOff before retrying
170-
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
171+
pulsar.getExecutor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
171172
return null;
172173
});
173174
}
174175

176+
@VisibleForTesting
177+
public Producer<MetadataEvent> getProducer() {
178+
return producer;
179+
}
180+
175181
private void startConsumer() {
176182
if (isClosingOrClosed()) {
177183
log.info("[{}] Skip to start new consumer because the synchronizer is closed", topicName);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.pulsar.broker.PulsarService;
8686
import org.apache.pulsar.broker.namespace.NamespaceService;
8787
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
88+
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer.State;
8889
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
8990
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
9091
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
@@ -107,6 +108,7 @@
107108
import org.apache.pulsar.client.impl.ClientBuilderImpl;
108109
import org.apache.pulsar.client.impl.ClientCnx;
109110
import org.apache.pulsar.client.impl.ConnectionPool;
111+
import org.apache.pulsar.client.impl.PulsarClientImpl;
110112
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
111113
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
112114
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -2005,5 +2007,22 @@ public void testTlsWithAuthParams() throws Exception {
20052007
}
20062008
}
20072009

2010+
@Test
2011+
public void testPulsarMetadataEventSyncProducerCreation() throws Exception {
2012+
final String topicName = "persistent://prop/usw/my-ns/syncTopic";
2013+
pulsar.getConfiguration().setMetadataSyncEventTopic(topicName);
2014+
PulsarMetadataEventSynchronizer sync = new PulsarMetadataEventSynchronizer(pulsar, topicName);
2015+
// set invalid client for retry
2016+
PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://invalidhost:8080")
2017+
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
2018+
sync.client = client;
2019+
sync.STATE_UPDATER.set(sync, State.Starting_Producer);
2020+
sync.startProducer();
2021+
assertNull(sync.getProducer());
2022+
// update valid client which will set the producer
2023+
sync.client = (PulsarClientImpl) pulsarClient;
2024+
retryStrategically((test) -> sync.getProducer() != null, 1000, 10);
2025+
assertNotNull(sync.getProducer());
2026+
}
20082027
}
20092028

0 commit comments

Comments
 (0)