107
107
import org .apache .pulsar .broker .intercept .ManagedLedgerInterceptorImpl ;
108
108
import org .apache .pulsar .broker .loadbalance .LoadManager ;
109
109
import org .apache .pulsar .broker .namespace .NamespaceService ;
110
+ import org .apache .pulsar .broker .resources .DynamicConfigurationResources ;
110
111
import org .apache .pulsar .broker .resources .LocalPoliciesResources ;
111
112
import org .apache .pulsar .broker .resources .NamespaceResources ;
112
113
import org .apache .pulsar .broker .resources .NamespaceResources .PartitionedTopicResources ;
@@ -222,8 +223,7 @@ public class BrokerService implements Closeable {
222
223
private final OrderedExecutor topicOrderedExecutor ;
223
224
// offline topic backlog cache
224
225
private final ConcurrentOpenHashMap <TopicName , PersistentOfflineTopicStats > offlineTopicStatCache ;
225
- private static final ConcurrentOpenHashMap <String , ConfigField > dynamicConfigurationMap =
226
- prepareDynamicConfigurationMap ();
226
+ private final ConcurrentOpenHashMap <String , ConfigField > dynamicConfigurationMap ;
227
227
private final ConcurrentOpenHashMap <String , Consumer <?>> configRegisteredListeners ;
228
228
229
229
private final ConcurrentLinkedQueue <TopicLoadingContext > pendingTopicLoadingQueue ;
@@ -293,6 +293,7 @@ public class BrokerService implements Closeable {
293
293
294
294
public BrokerService (PulsarService pulsar , EventLoopGroup eventLoopGroup ) throws Exception {
295
295
this .pulsar = pulsar ;
296
+ this .dynamicConfigurationMap = prepareDynamicConfigurationMap ();
296
297
this .preciseTopicPublishRateLimitingEnable =
297
298
pulsar .getConfiguration ().isPreciseTopicPublishRateLimiterEnable ();
298
299
this .managedLedgerFactory = pulsar .getManagedLedgerFactory ();
@@ -2253,38 +2254,85 @@ private void handlePoliciesUpdates(NamespaceName namespace) {
2253
2254
}
2254
2255
2255
2256
private void handleDynamicConfigurationUpdates () {
2256
- pulsar ().getPulsarResources ().getDynamicConfigResources ().getDynamicConfigurationAsync ()
2257
+ DynamicConfigurationResources dynamicConfigResources = null ;
2258
+ try {
2259
+ dynamicConfigResources = pulsar ()
2260
+ .getPulsarResources ()
2261
+ .getDynamicConfigResources ();
2262
+ } catch (Exception e ) {
2263
+ log .warn ("Failed to read dynamic broker configuration" , e );
2264
+ }
2265
+
2266
+ if (dynamicConfigResources != null ) {
2267
+ dynamicConfigResources .getDynamicConfigurationAsync ()
2257
2268
.thenAccept (optMap -> {
2269
+ // Case some dynamic configs have been removed.
2270
+ dynamicConfigurationMap .forEach ((configKey , fieldWrapper ) -> {
2271
+ boolean configRemoved = !optMap .isPresent () || !optMap .get ().containsKey (configKey );
2272
+ if (fieldWrapper .lastDynamicValue != null && configRemoved ) {
2273
+ configValueChanged (configKey , null );
2274
+ }
2275
+ });
2276
+ // Some configs have been changed.
2258
2277
if (!optMap .isPresent ()) {
2259
2278
return ;
2260
2279
}
2261
2280
Map <String , String > data = optMap .get ();
2262
2281
data .forEach ((configKey , value ) -> {
2263
- ConfigField configFieldWrapper = dynamicConfigurationMap .get (configKey );
2264
- if (configFieldWrapper == null ) {
2265
- log .warn ("{} does not exist in dynamicConfigurationMap, skip this config." , configKey );
2266
- return ;
2267
- }
2268
- Field configField = configFieldWrapper .field ;
2269
- Object newValue = FieldParser .value (data .get (configKey ), configField );
2270
- if (configField != null ) {
2271
- Consumer listener = configRegisteredListeners .get (configKey );
2272
- try {
2273
- Object existingValue = configField .get (pulsar .getConfiguration ());
2274
- configField .set (pulsar .getConfiguration (), newValue );
2275
- log .info ("Successfully updated configuration {}/{}" , configKey ,
2276
- data .get (configKey ));
2277
- if (listener != null && !existingValue .equals (newValue )) {
2278
- listener .accept (newValue );
2279
- }
2280
- } catch (Exception e ) {
2281
- log .error ("Failed to update config {}/{}" , configKey , newValue );
2282
- }
2283
- } else {
2284
- log .error ("Found non-dynamic field in dynamicConfigMap {}/{}" , configKey , newValue );
2285
- }
2282
+ configValueChanged (configKey , value );
2286
2283
});
2287
2284
});
2285
+ }
2286
+ }
2287
+
2288
+ private void configValueChanged (String configKey , String newValueStr ) {
2289
+ ConfigField configFieldWrapper = dynamicConfigurationMap .get (configKey );
2290
+ if (configFieldWrapper == null ) {
2291
+ log .warn ("{} does not exist in dynamicConfigurationMap, skip this config." , configKey );
2292
+ return ;
2293
+ }
2294
+ Consumer listener = configRegisteredListeners .get (configKey );
2295
+ try {
2296
+ // Convert existingValue and newValue.
2297
+ final Object existingValue ;
2298
+ final Object newValue ;
2299
+ if (configFieldWrapper .field != null ) {
2300
+ if (StringUtils .isBlank (newValueStr )) {
2301
+ newValue = configFieldWrapper .defaultValue ;
2302
+ } else {
2303
+ newValue = FieldParser .value (newValueStr , configFieldWrapper .field );
2304
+ }
2305
+ existingValue = configFieldWrapper .field .get (pulsar .getConfiguration ());
2306
+ configFieldWrapper .field .set (pulsar .getConfiguration (), newValue );
2307
+ } else {
2308
+ // This case only occurs when it is a customized item.
2309
+ // Since https://github.com/apache/pulsar/blob/master/pip/pip-300.md has not been cherry-picked, this
2310
+ // case should never occur.
2311
+ log .error ("Skip update customized dynamic configuration {}/{} in memory, only trigger an event"
2312
+ + " listeners. Since PIP-300 has net been cherry-picked, this case should never occur" ,
2313
+ configKey , newValueStr );
2314
+ existingValue = configFieldWrapper .lastDynamicValue ;
2315
+ newValue = newValueStr == null ? configFieldWrapper .defaultValue : newValueStr ;
2316
+ }
2317
+ // Record the latest dynamic config.
2318
+ configFieldWrapper .lastDynamicValue = newValueStr ;
2319
+
2320
+ if (newValueStr == null ) {
2321
+ log .info ("Successfully remove the dynamic configuration {}, and revert to the default value" ,
2322
+ configKey );
2323
+ } else {
2324
+ log .info ("Successfully updated configuration {}/{}" , configKey , newValueStr );
2325
+ }
2326
+
2327
+ if (listener != null && !Objects .equals (existingValue , newValue )) {
2328
+ // So far, all config items that related to configuration listeners, their default value is not null.
2329
+ // And the customized config can be null before.
2330
+ // So call "listener.accept(null)" is okay.
2331
+ listener .accept (newValue );
2332
+ }
2333
+ } catch (Exception e ) {
2334
+ log .error ("Failed to update config {}" , configKey , e );
2335
+ }
2288
2336
}
2289
2337
2290
2338
/**
@@ -2654,6 +2702,9 @@ private void updateManagedLedgerConfig() {
2654
2702
* On notification, listener should first check if config value has been changed and after taking appropriate
2655
2703
* action, listener should update config value with new value if it has been changed (so, next time listener can
2656
2704
* compare values on configMap change).
2705
+ *
2706
+ * Note: The new value that the {@param listener} may accept could be a null value.
2707
+ *
2657
2708
* @param <T>
2658
2709
*
2659
2710
* @param configKey
@@ -2729,7 +2780,7 @@ public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() {
2729
2780
return delayedDeliveryTrackerFactory ;
2730
2781
}
2731
2782
2732
- public static List <String > getDynamicConfiguration () {
2783
+ public List <String > getDynamicConfiguration () {
2733
2784
return dynamicConfigurationMap .keys ();
2734
2785
}
2735
2786
@@ -2742,27 +2793,34 @@ public Map<String, String> getRuntimeConfiguration() {
2742
2793
return configMap ;
2743
2794
}
2744
2795
2745
- public static boolean isDynamicConfiguration (String key ) {
2796
+ public boolean isDynamicConfiguration (String key ) {
2746
2797
return dynamicConfigurationMap .containsKey (key );
2747
2798
}
2748
2799
2749
- public static boolean validateDynamicConfiguration (String key , String value ) {
2800
+ public boolean validateDynamicConfiguration (String key , String value ) {
2750
2801
if (dynamicConfigurationMap .containsKey (key ) && dynamicConfigurationMap .get (key ).validator != null ) {
2751
2802
return dynamicConfigurationMap .get (key ).validator .test (value );
2752
2803
}
2753
2804
return true ;
2754
2805
}
2755
2806
2756
- private static ConcurrentOpenHashMap <String , ConfigField > prepareDynamicConfigurationMap () {
2807
+ private ConcurrentOpenHashMap <String , ConfigField > prepareDynamicConfigurationMap () {
2757
2808
ConcurrentOpenHashMap <String , ConfigField > dynamicConfigurationMap =
2758
2809
ConcurrentOpenHashMap .<String , ConfigField >newBuilder ().build ();
2759
- for (Field field : ServiceConfiguration .class .getDeclaredFields ()) {
2760
- if (field != null && field .isAnnotationPresent (FieldContext .class )) {
2761
- field .setAccessible (true );
2762
- if (field .getAnnotation (FieldContext .class ).dynamic ()) {
2763
- dynamicConfigurationMap .put (field .getName (), new ConfigField (field ));
2810
+ try {
2811
+ for (Field field : ServiceConfiguration .class .getDeclaredFields ()) {
2812
+ if (field != null && field .isAnnotationPresent (FieldContext .class )) {
2813
+ field .setAccessible (true );
2814
+ if (field .getAnnotation (FieldContext .class ).dynamic ()) {
2815
+ Object defaultValue = field .get (pulsar .getConfiguration ());
2816
+ dynamicConfigurationMap .put (field .getName (), new ConfigField (field , defaultValue ));
2817
+ }
2764
2818
}
2765
2819
}
2820
+ } catch (IllegalArgumentException | IllegalAccessException ex ) {
2821
+ // This error never occurs.
2822
+ log .error ("Failed to initialize dynamic configuration map" , ex );
2823
+ throw new RuntimeException (ex );
2766
2824
}
2767
2825
return dynamicConfigurationMap ;
2768
2826
}
@@ -3036,11 +3094,21 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
3036
3094
3037
3095
private static class ConfigField {
3038
3096
final Field field ;
3097
+
3098
+ // It is the dynamic config value if set.
3099
+ // It is null if has does not set a dynamic config, even if the value of "pulsar.config" is present.
3100
+ volatile String lastDynamicValue ;
3101
+
3102
+ // The default value of "pulsar.config", which is initialized when the broker is starting.
3103
+ // After the dynamic config has been removed, revert the config to this default value.
3104
+ final Object defaultValue ;
3105
+
3039
3106
Predicate <String > validator ;
3040
3107
3041
- public ConfigField (Field field ) {
3108
+ public ConfigField (Field field , Object defaultValue ) {
3042
3109
super ();
3043
3110
this .field = field ;
3111
+ this .defaultValue = defaultValue ;
3044
3112
}
3045
3113
}
3046
3114
0 commit comments