Skip to content

Commit dc0176e

Browse files
rdhabaliasrinath-ctds
authored andcommitted
[fix][broker] Fix Metadata Event Synchronizer producer creation retry so that the producer gets created eventually (apache#24081)
(cherry picked from commit 6622ff7) (cherry picked from commit 8b0112d)
1 parent 4a5f398 commit dc0176e

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
22+
import com.google.common.annotations.VisibleForTesting;
2223
import java.util.Arrays;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,13 +53,13 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize
5253
protected BrokerService brokerService;
5354
@Getter
5455
protected String topicName;
55-
protected PulsarClientImpl client;
56+
protected volatile PulsarClientImpl client;
5657
protected volatile Producer<MetadataEvent> producer;
5758
protected volatile Consumer<MetadataEvent> consumer;
5859
private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
5960
listeners = new CopyOnWriteArrayList<>();
6061

61-
static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER =
62+
protected static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER =
6263
AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state");
6364
@Getter
6465
private volatile State state;
@@ -133,7 +134,7 @@ private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
133134
});
134135
}
135136

136-
private void startProducer() {
137+
protected void startProducer() {
137138
if (isClosingOrClosed()) {
138139
log.info("[{}] Skip to start new producer because the synchronizer is closed", topicName);
139140
return;
@@ -169,11 +170,16 @@ private void startProducer() {
169170
log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
170171
waitTimeMs / 1000.0);
171172
// BackOff before retrying
172-
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
173+
pulsar.getExecutor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
173174
return null;
174175
});
175176
}
176177

178+
@VisibleForTesting
179+
public Producer<MetadataEvent> getProducer() {
180+
return producer;
181+
}
182+
177183
private void startConsumer() {
178184
if (isClosingOrClosed()) {
179185
log.info("[{}] Skip to start new consumer because the synchronizer is closed", topicName);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.pulsar.broker.PulsarService;
8686
import org.apache.pulsar.broker.namespace.NamespaceService;
8787
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
88+
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer.State;
8889
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
8990
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
9091
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
@@ -107,6 +108,7 @@
107108
import org.apache.pulsar.client.impl.ClientBuilderImpl;
108109
import org.apache.pulsar.client.impl.ClientCnx;
109110
import org.apache.pulsar.client.impl.ConnectionPool;
111+
import org.apache.pulsar.client.impl.PulsarClientImpl;
110112
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
111113
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
112114
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -2005,5 +2007,22 @@ public void testTlsWithAuthParams() throws Exception {
20052007
}
20062008
}
20072009

2010+
@Test
2011+
public void testPulsarMetadataEventSyncProducerCreation() throws Exception {
2012+
final String topicName = "persistent://prop/usw/my-ns/syncTopic";
2013+
pulsar.getConfiguration().setMetadataSyncEventTopic(topicName);
2014+
PulsarMetadataEventSynchronizer sync = new PulsarMetadataEventSynchronizer(pulsar, topicName);
2015+
// set invalid client for retry
2016+
PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://invalidhost:8080")
2017+
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
2018+
sync.client = client;
2019+
sync.STATE_UPDATER.set(sync, State.Starting_Producer);
2020+
sync.startProducer();
2021+
assertNull(sync.getProducer());
2022+
// update valid client which will set the producer
2023+
sync.client = (PulsarClientImpl) pulsarClient;
2024+
retryStrategically((test) -> sync.getProducer() != null, 1000, 10);
2025+
assertNotNull(sync.getProducer());
2026+
}
20082027
}
20092028

0 commit comments

Comments
 (0)