|
19 | 19 | package org.apache.pulsar.broker.service;
|
20 | 20 |
|
21 | 21 | import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
|
| 22 | +import com.google.common.annotations.VisibleForTesting; |
22 | 23 | import java.util.Arrays;
|
23 | 24 | import java.util.concurrent.CompletableFuture;
|
24 | 25 | import java.util.concurrent.CopyOnWriteArrayList;
|
@@ -52,13 +53,13 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize
|
52 | 53 | protected BrokerService brokerService;
|
53 | 54 | @Getter
|
54 | 55 | protected String topicName;
|
55 |
| - protected PulsarClientImpl client; |
| 56 | + protected volatile PulsarClientImpl client; |
56 | 57 | protected volatile Producer<MetadataEvent> producer;
|
57 | 58 | protected volatile Consumer<MetadataEvent> consumer;
|
58 | 59 | private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>>
|
59 | 60 | listeners = new CopyOnWriteArrayList<>();
|
60 | 61 |
|
61 |
| - static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER = |
| 62 | + protected static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER = |
62 | 63 | AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state");
|
63 | 64 | @Getter
|
64 | 65 | private volatile State state;
|
@@ -133,7 +134,7 @@ private void publishAsync(MetadataEvent event, CompletableFuture<Void> future) {
|
133 | 134 | });
|
134 | 135 | }
|
135 | 136 |
|
136 |
| - private void startProducer() { |
| 137 | + protected void startProducer() { |
137 | 138 | if (isClosingOrClosed()) {
|
138 | 139 | log.info("[{}] Skip to start new producer because the synchronizer is closed", topicName);
|
139 | 140 | return;
|
@@ -169,11 +170,16 @@ private void startProducer() {
|
169 | 170 | log.warn("[{}] Failed to create producer ({}), retrying in {} s", topicName, ex.getMessage(),
|
170 | 171 | waitTimeMs / 1000.0);
|
171 | 172 | // BackOff before retrying
|
172 |
| - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); |
| 173 | + pulsar.getExecutor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); |
173 | 174 | return null;
|
174 | 175 | });
|
175 | 176 | }
|
176 | 177 |
|
| 178 | + @VisibleForTesting |
| 179 | + public Producer<MetadataEvent> getProducer() { |
| 180 | + return producer; |
| 181 | + } |
| 182 | + |
177 | 183 | private void startConsumer() {
|
178 | 184 | if (isClosingOrClosed()) {
|
179 | 185 | log.info("[{}] Skip to start new consumer because the synchronizer is closed", topicName);
|
|
0 commit comments