Skip to content

Commit 5249349

Browse files
committed
added loadBalancerServiceUnitStateMaxConcurrentOverrides conf
1 parent 81f502a commit 5249349

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

conf/broker.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,13 @@ loadManagerServiceUnitStateTableViewClassName=org.apache.pulsar.broker.loadbalan
15861586
# enable it. It accepts `None` to disable it."
15871587
loadBalancerServiceUnitTableViewSyncer=None
15881588

1589+
1590+
# Specify the maximum number of concurrent orphan bundle ownership overrides.
1591+
# The leader broker triggers these overrides upon detecting orphaned bundles.
1592+
# It identifies orphan bundle ownerships by periodically scanning ownership data
1593+
# and monitoring for broker shutdowns or inactive states.
1594+
loadBalancerServiceUnitStateMaxConcurrentOverrides = 10
1595+
15891596
### --- Replication --- ###
15901597

15911598
# Enable replication metrics

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3023,6 +3023,16 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
30233023
)
30243024
private ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = ServiceUnitTableViewSyncerType.None;
30253025

3026+
@FieldContext(
3027+
dynamic = true,
3028+
category = CATEGORY_LOAD_BALANCER,
3029+
doc = "Specify the maximum number of concurrent orphan bundle ownership overrides. "
3030+
+ "The leader broker triggers these overrides upon detecting orphaned bundles. "
3031+
+ "It identifies orphan bundle ownerships by periodically scanning ownership data "
3032+
+ "and monitoring for broker shutdowns or inactive states."
3033+
)
3034+
private int loadBalancerServiceUnitStateMaxConcurrentOverrides = 10;
3035+
30263036
/**** --- Replication. --- ****/
30273037
@FieldContext(
30283038
category = CATEGORY_REPLICATION,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
106106

107107
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
108108
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
109-
private static final int MAX_CONCURRENT_OWNERSHIP_OVERRIDE = 500;
110109
public static final long VERSION_ID_INIT = 1; // initial versionId
111110
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
112111
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
@@ -1503,7 +1502,7 @@ private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, Com
15031502
}
15041503

15051504
private void tryWaitForOverrides(List<CompletableFuture<Void>> overrideFutures, boolean force) {
1506-
if (overrideFutures.size() > MAX_CONCURRENT_OWNERSHIP_OVERRIDE || force) {
1505+
if (overrideFutures.size() >= config.getLoadBalancerServiceUnitStateMaxConcurrentOverrides() || force) {
15071506
try {
15081507
FutureUtil.waitForAll(overrideFutures)
15091508
.get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);

0 commit comments

Comments
 (0)