Skip to content

Commit ada31a9

Browse files
authored
[fix] [broker] Fix nothing changed after removing dynamic configs (apache#22673)
1 parent 5ab0512 commit ada31a9

File tree

2 files changed

+160
-45
lines changed

2 files changed

+160
-45
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 92 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,7 @@ public class BrokerService implements Closeable {
229229
private final OrderedExecutor topicOrderedExecutor;
230230
// offline topic backlog cache
231231
private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache;
232-
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
233-
prepareDynamicConfigurationMap();
232+
private final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap;
234233
private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners;
235234

236235
private final ConcurrentLinkedQueue<TopicLoadingContext> pendingTopicLoadingQueue;
@@ -313,6 +312,7 @@ public class BrokerService implements Closeable {
313312

314313
public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
315314
this.pulsar = pulsar;
315+
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
316316
this.brokerPublishRateLimiter = new PublishRateLimiterImpl(pulsar.getMonotonicSnapshotClock());
317317
this.preciseTopicPublishRateLimitingEnable =
318318
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
@@ -2496,40 +2496,71 @@ private void handleDynamicConfigurationUpdates() {
24962496

24972497
if (dynamicConfigResources != null) {
24982498
dynamicConfigResources.getDynamicConfigurationAsync()
2499-
.thenAccept(optMap -> {
2500-
if (!optMap.isPresent()) {
2501-
return;
2499+
.thenAccept(optMap -> {
2500+
// Case some dynamic configs have been removed.
2501+
dynamicConfigurationMap.forEach((configKey, fieldWrapper) -> {
2502+
boolean configRemoved = optMap.isEmpty() || !optMap.get().containsKey(configKey);
2503+
if (fieldWrapper.lastDynamicValue != null && configRemoved) {
2504+
configValueChanged(configKey, null);
25022505
}
2503-
Map<String, String> data = optMap.get();
2504-
data.forEach((configKey, value) -> {
2505-
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
2506-
if (configFieldWrapper == null) {
2507-
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
2508-
return;
2509-
}
2510-
Field configField = configFieldWrapper.field;
2511-
Consumer listener = configRegisteredListeners.get(configKey);
2512-
try {
2513-
final Object existingValue;
2514-
final Object newValue;
2515-
if (configField != null) {
2516-
newValue = FieldParser.value(data.get(configKey), configField);
2517-
existingValue = configField.get(pulsar.getConfiguration());
2518-
configField.set(pulsar.getConfiguration(), newValue);
2519-
} else {
2520-
newValue = value;
2521-
existingValue = configFieldWrapper.customValue;
2522-
configFieldWrapper.customValue = newValue == null ? null : String.valueOf(newValue);
2523-
}
2524-
log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
2525-
if (listener != null && !Objects.equals(existingValue, newValue)) {
2526-
listener.accept(newValue);
2527-
}
2528-
} catch (Exception e) {
2529-
log.error("Failed to update config {}", configKey, e);
2530-
}
2531-
});
25322506
});
2507+
// Some configs have been changed.
2508+
if (!optMap.isPresent()) {
2509+
return;
2510+
}
2511+
Map<String, String> data = optMap.get();
2512+
data.forEach((configKey, value) -> {
2513+
configValueChanged(configKey, value);
2514+
});
2515+
});
2516+
}
2517+
}
2518+
2519+
private void configValueChanged(String configKey, String newValueStr) {
2520+
ConfigField configFieldWrapper = dynamicConfigurationMap.get(configKey);
2521+
if (configFieldWrapper == null) {
2522+
log.warn("{} does not exist in dynamicConfigurationMap, skip this config.", configKey);
2523+
return;
2524+
}
2525+
Consumer listener = configRegisteredListeners.get(configKey);
2526+
try {
2527+
// Convert existingValue and newValue.
2528+
final Object existingValue;
2529+
final Object newValue;
2530+
if (configFieldWrapper.field != null) {
2531+
if (StringUtils.isBlank(newValueStr)) {
2532+
newValue = configFieldWrapper.defaultValue;
2533+
} else {
2534+
newValue = FieldParser.value(newValueStr, configFieldWrapper.field);
2535+
}
2536+
existingValue = configFieldWrapper.field.get(pulsar.getConfiguration());
2537+
configFieldWrapper.field.set(pulsar.getConfiguration(), newValue);
2538+
} else {
2539+
// This case only occurs when it is a customized item.
2540+
// See: https://github.com/apache/pulsar/blob/master/pip/pip-300.md.
2541+
log.info("Skip update customized dynamic configuration {}/{} in memory, only trigger an event"
2542+
+ " listeners.", configKey, newValueStr);
2543+
existingValue = configFieldWrapper.lastDynamicValue;
2544+
newValue = newValueStr == null ? configFieldWrapper.defaultValue : newValueStr;
2545+
}
2546+
// Record the latest dynamic config.
2547+
configFieldWrapper.lastDynamicValue = newValueStr;
2548+
2549+
if (newValueStr == null) {
2550+
log.info("Successfully remove the dynamic configuration {}, and revert to the default value",
2551+
configKey);
2552+
} else {
2553+
log.info("Successfully updated configuration {}/{}", configKey, newValueStr);
2554+
}
2555+
2556+
if (listener != null && !Objects.equals(existingValue, newValue)) {
2557+
// So far, all config items that related to configuration listeners, their default value is not null.
2558+
// And the customized config can be null before.
2559+
// So call "listener.accept(null)" is okay.
2560+
listener.accept(newValue);
2561+
}
2562+
} catch (Exception e) {
2563+
log.error("Failed to update config {}", configKey, e);
25332564
}
25342565
}
25352566

@@ -2936,6 +2967,9 @@ private void updateManagedLedgerConfig() {
29362967
* On notification, listener should first check if config value has been changed and after taking appropriate
29372968
* action, listener should update config value with new value if it has been changed (so, next time listener can
29382969
* compare values on configMap change).
2970+
*
2971+
* Note: The new value that the {@param listener} may accept could be a null value.
2972+
*
29392973
* @param <T>
29402974
*
29412975
* @param configKey
@@ -3057,16 +3091,23 @@ public boolean validateDynamicConfiguration(String key, String value) {
30573091
return true;
30583092
}
30593093

3060-
private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
3094+
private ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
30613095
ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
30623096
ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
3063-
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
3064-
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
3065-
field.setAccessible(true);
3066-
if (field.getAnnotation(FieldContext.class).dynamic()) {
3067-
dynamicConfigurationMap.put(field.getName(), new ConfigField(field));
3097+
try {
3098+
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
3099+
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
3100+
field.setAccessible(true);
3101+
if (field.getAnnotation(FieldContext.class).dynamic()) {
3102+
Object defaultValue = field.get(pulsar.getConfiguration());
3103+
dynamicConfigurationMap.put(field.getName(), new ConfigField(field, defaultValue));
3104+
}
30683105
}
30693106
}
3107+
} catch (IllegalArgumentException | IllegalAccessException ex) {
3108+
// This error never occurs.
3109+
log.error("Failed to initialize dynamic configuration map", ex);
3110+
throw new RuntimeException(ex);
30703111
}
30713112
return dynamicConfigurationMap;
30723113
}
@@ -3348,19 +3389,25 @@ private static class ConfigField {
33483389
// field holds the pulsar dynamic configuration.
33493390
final Field field;
33503391

3351-
// customValue holds the external dynamic configuration.
3352-
volatile String customValue;
3392+
// It is the dynamic config value if set.
3393+
// It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present.
3394+
volatile String lastDynamicValue;
3395+
3396+
// The default value of "pulsar.config", which is initialized when the broker is starting.
3397+
// After the dynamic config has been removed, revert the config to this default value.
3398+
final Object defaultValue;
33533399

33543400
Predicate<String> validator;
33553401

3356-
public ConfigField(Field field) {
3402+
public ConfigField(Field field, Object defaultValue) {
33573403
super();
33583404
this.field = field;
3405+
this.defaultValue = defaultValue;
33593406
}
33603407

33613408
public static ConfigField newCustomConfigField(String customValue) {
3362-
ConfigField configField = new ConfigField(null);
3363-
configField.customValue = customValue;
3409+
ConfigField configField = new ConfigField(null, null);
3410+
configField.lastDynamicValue = customValue;
33643411
return configField;
33653412
}
33663413
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiDynamicConfigurationsTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223
import static org.testng.Assert.assertEquals;
2324
import static org.testng.Assert.assertNotNull;
25+
import static org.testng.Assert.assertNull;
2426
import static org.testng.Assert.assertThrows;
2527
import static org.testng.Assert.fail;
2628
import java.util.Map;
2729
import java.util.concurrent.atomic.AtomicReference;
2830
import javax.ws.rs.core.Response;
2931
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.pulsar.broker.BrokerTestUtil;
3033
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
3134
import org.apache.pulsar.client.admin.PulsarAdminException;
3235
import org.awaitility.Awaitility;
@@ -107,4 +110,69 @@ public void testRegisterCustomDynamicConfiguration() throws PulsarAdminException
107110
allDynamicConfigurations = admin.brokers().getAllDynamicConfigurations();
108111
assertThat(allDynamicConfigurations).doesNotContainKey(key);
109112
}
113+
114+
@Test
115+
public void testDeleteStringDynamicConfig() throws PulsarAdminException {
116+
String syncEventTopic = BrokerTestUtil.newUniqueName(SYSTEM_NAMESPACE + "/tp");
117+
// The default value is null;
118+
Awaitility.await().untilAsserted(() -> {
119+
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
120+
});
121+
// Set dynamic config.
122+
admin.brokers().updateDynamicConfiguration("configurationMetadataSyncEventTopic", syncEventTopic);
123+
Awaitility.await().untilAsserted(() -> {
124+
assertEquals(pulsar.getConfig().getConfigurationMetadataSyncEventTopic(), syncEventTopic);
125+
});
126+
// Remove dynamic config.
127+
admin.brokers().deleteDynamicConfiguration("configurationMetadataSyncEventTopic");
128+
Awaitility.await().untilAsserted(() -> {
129+
assertNull(pulsar.getConfig().getConfigurationMetadataSyncEventTopic());
130+
});
131+
}
132+
133+
@Test
134+
public void testDeleteIntDynamicConfig() throws PulsarAdminException {
135+
// Record the default value;
136+
int defaultValue = pulsar.getConfig().getMaxConcurrentTopicLoadRequest();
137+
// Set dynamic config.
138+
int newValue = defaultValue + 1000;
139+
admin.brokers().updateDynamicConfiguration("maxConcurrentTopicLoadRequest", newValue + "");
140+
Awaitility.await().untilAsserted(() -> {
141+
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), newValue);
142+
});
143+
// Verify: it has been reverted to the default value.
144+
admin.brokers().deleteDynamicConfiguration("maxConcurrentTopicLoadRequest");
145+
Awaitility.await().untilAsserted(() -> {
146+
assertEquals(pulsar.getConfig().getMaxConcurrentTopicLoadRequest(), defaultValue);
147+
});
148+
}
149+
150+
@Test
151+
public void testDeleteCustomizedDynamicConfig() throws PulsarAdminException {
152+
// Record the default value;
153+
String customizedConfigName = "a123";
154+
pulsar.getBrokerService().registerCustomDynamicConfiguration(customizedConfigName, v -> true);
155+
156+
AtomicReference<Object> currentValue = new AtomicReference<>();
157+
pulsar.getBrokerService().registerConfigurationListener(customizedConfigName, v -> {
158+
currentValue.set(v);
159+
});
160+
161+
// The default value is null;
162+
Awaitility.await().untilAsserted(() -> {
163+
assertNull(currentValue.get());
164+
});
165+
166+
// Set dynamic config.
167+
admin.brokers().updateDynamicConfiguration(customizedConfigName, "xxx");
168+
Awaitility.await().untilAsserted(() -> {
169+
assertEquals(currentValue.get(), "xxx");
170+
});
171+
172+
// Remove dynamic config.
173+
admin.brokers().deleteDynamicConfiguration(customizedConfigName);
174+
Awaitility.await().untilAsserted(() -> {
175+
assertNull(currentValue.get());
176+
});
177+
}
110178
}

0 commit comments

Comments
 (0)