Skip to content

Commit 970df5f

Browse files
committed
[fix][broker] fix ExtensibleLoadManager to override the ownerships concurrently without blocking load manager thread (#24156)
(cherry picked from commit 066a20c)
1 parent 0634170 commit 970df5f

File tree

4 files changed

+101
-59
lines changed

4 files changed

+101
-59
lines changed

conf/broker.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,13 @@ loadManagerServiceUnitStateTableViewClassName=org.apache.pulsar.broker.loadbalan
15851585
# enable it. It accepts `None` to disable it."
15861586
loadBalancerServiceUnitTableViewSyncer=None
15871587

1588+
1589+
# Specify the maximum number of concurrent orphan bundle ownership overrides.
1590+
# The leader broker triggers these overrides upon detecting orphaned bundles.
1591+
# It identifies orphan bundle ownerships by periodically scanning ownership data
1592+
# and monitoring for broker shutdowns or inactive states.
1593+
loadBalancerServiceUnitStateMaxConcurrentOverrides = 64
1594+
15881595
### --- Replication --- ###
15891596

15901597
# Enable replication metrics

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3022,6 +3022,16 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
30223022
)
30233023
private ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = ServiceUnitTableViewSyncerType.None;
30243024

3025+
@FieldContext(
3026+
dynamic = true,
3027+
category = CATEGORY_LOAD_BALANCER,
3028+
doc = "Specify the maximum number of concurrent orphan bundle ownership overrides. "
3029+
+ "The leader broker triggers these overrides upon detecting orphaned bundles. "
3030+
+ "It identifies orphan bundle ownerships by periodically scanning ownership data "
3031+
+ "and monitoring for broker shutdowns or inactive states."
3032+
)
3033+
private int loadBalancerServiceUnitStateMaxConcurrentOverrides = 64;
3034+
30253035
/**** --- Replication. --- ****/
30263036
@FieldContext(
30273037
category = CATEGORY_REPLICATION,

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 80 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
113113
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
114114
private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
115115
private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
116+
116117
private final PulsarService pulsar;
117118
private final ServiceConfiguration config;
118119
private final Schema<ServiceUnitStateData> schema;
@@ -1379,58 +1380,53 @@ private void scheduleCleanup(String broker, long delayInSecs) {
13791380
}
13801381

13811382

1382-
private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker,
1383-
boolean gracefully) {
1383+
private CompletableFuture<Void> overrideOwnership(String serviceUnit,
1384+
ServiceUnitStateData orphanData,
1385+
String inactiveBroker,
1386+
boolean gracefully) {
13841387

13851388
final var version = getNextVersionId(orphanData);
1386-
try {
1387-
selectBroker(serviceUnit, inactiveBroker)
1388-
.thenApply(selectedOpt ->
1389-
selectedOpt.map(selectedBroker -> {
1390-
if (orphanData.state() == Splitting) {
1391-
// if Splitting, set orphan.dstBroker() as dst to indicate where it was from.
1392-
// (The src broker runs handleSplitEvent.)
1393-
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
1394-
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
1395-
} else if (orphanData.state() == Owned) {
1396-
// if Owned, set orphan.dstBroker() as source to clean it up in case it is still
1397-
// alive.
1398-
var sourceBroker = selectedBroker.equals(orphanData.dstBroker()) ? null :
1399-
orphanData.dstBroker();
1400-
// if gracefully, try to release ownership first
1401-
var overrideState = gracefully && sourceBroker != null ? Releasing : Owned;
1402-
return new ServiceUnitStateData(
1403-
overrideState,
1404-
selectedBroker,
1405-
sourceBroker,
1406-
true, version);
1407-
} else {
1408-
// if Assigning or Releasing, set orphan.sourceBroker() as source
1409-
// to clean it up in case it is still alive.
1410-
return new ServiceUnitStateData(Owned, selectedBroker,
1411-
selectedBroker.equals(orphanData.sourceBroker()) ? null :
1412-
orphanData.sourceBroker(),
1413-
true, version);
1414-
}
1415-
// If no broker is selected(available), free the ownership.
1416-
// If the previous owner is still active, it will close the bundle(topic) ownership.
1417-
}).orElseGet(() -> new ServiceUnitStateData(Free, null,
1418-
orphanData.state() == Owned ? orphanData.dstBroker() : orphanData.sourceBroker(),
1419-
true,
1420-
version)))
1421-
.thenCompose(override -> {
1422-
log.info(
1423-
"Overriding inactiveBroker:{}, ownership serviceUnit:{} from orphanData:{} to "
1424-
+ "overrideData:{}",
1425-
inactiveBroker, serviceUnit, orphanData, override);
1426-
return publishOverrideEventAsync(serviceUnit, override);
1427-
}).get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
1428-
} catch (Throwable e) {
1429-
log.error(
1430-
"Failed to override inactiveBroker:{} ownership serviceUnit:{} orphanData:{}. "
1431-
+ "totalCleanupErrorCnt:{}",
1432-
inactiveBroker, serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet(), e);
1433-
}
1389+
return selectBroker(serviceUnit, inactiveBroker)
1390+
.thenApply(selectedOpt ->
1391+
selectedOpt.map(selectedBroker -> {
1392+
if (orphanData.state() == Splitting) {
1393+
// if Splitting, set orphan.dstBroker() as dst to indicate where it was from.
1394+
// (The src broker runs handleSplitEvent.)
1395+
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
1396+
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
1397+
} else if (orphanData.state() == Owned) {
1398+
// if Owned, set orphan.dstBroker() as source to clean it up in case it is still
1399+
// alive.
1400+
var sourceBroker = selectedBroker.equals(orphanData.dstBroker()) ? null :
1401+
orphanData.dstBroker();
1402+
// if gracefully, try to release ownership first
1403+
var overrideState = gracefully && sourceBroker != null ? Releasing : Owned;
1404+
return new ServiceUnitStateData(
1405+
overrideState,
1406+
selectedBroker,
1407+
sourceBroker,
1408+
true, version);
1409+
} else {
1410+
// if Assigning or Releasing, set orphan.sourceBroker() as source
1411+
// to clean it up in case it is still alive.
1412+
return new ServiceUnitStateData(Owned, selectedBroker,
1413+
selectedBroker.equals(orphanData.sourceBroker()) ? null :
1414+
orphanData.sourceBroker(),
1415+
true, version);
1416+
}
1417+
// If no broker is selected(available), free the ownership.
1418+
// If the previous owner is still active, it will close the bundle(topic) ownership.
1419+
}).orElseGet(() -> new ServiceUnitStateData(Free, null,
1420+
orphanData.state() == Owned ? orphanData.dstBroker() : orphanData.sourceBroker(),
1421+
true,
1422+
version)))
1423+
.thenCompose(override -> {
1424+
log.info(
1425+
"Overriding inactiveBroker:{}, ownership serviceUnit:{} from orphanData:{} to "
1426+
+ "overrideData:{}",
1427+
inactiveBroker, serviceUnit, orphanData, override);
1428+
return publishOverrideEventAsync(serviceUnit, override);
1429+
});
14341430
}
14351431

14361432
private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
@@ -1505,7 +1501,21 @@ private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, Com
15051501
}
15061502
}
15071503

1508-
private synchronized void doCleanup(String broker, boolean gracefully) {
1504+
private void tryWaitForOverrides(List<CompletableFuture<Void>> overrideFutures, boolean force) {
1505+
if (overrideFutures.size() >= config.getLoadBalancerServiceUnitStateMaxConcurrentOverrides() || force) {
1506+
try {
1507+
FutureUtil.waitForAll(overrideFutures)
1508+
.get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
1509+
} catch (Throwable e) {
1510+
log.error("Failed to override ownership: totalCleanupErrorCnt:{}",
1511+
totalCleanupErrorCnt.incrementAndGet(), e);
1512+
} finally {
1513+
overrideFutures.clear();
1514+
}
1515+
}
1516+
}
1517+
1518+
private void doCleanup(String broker, boolean gracefully) {
15091519
try {
15101520
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
15111521
.isEmpty()) {
@@ -1549,8 +1559,11 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
15491559
} catch (Exception e) {
15501560
log.error("Failed to flush", e);
15511561
}
1562+
List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
15521563
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
1553-
for (var etr : tableview.entrySet()) {
1564+
var iter = tableview.entrySet().iterator();
1565+
while (iter.hasNext()) {
1566+
var etr = iter.next();
15541567
var stateData = etr.getValue();
15551568
var serviceUnit = etr.getKey();
15561569
var state = state(stateData);
@@ -1559,7 +1572,8 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
15591572
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
15601573
orphanSystemServiceUnits.put(serviceUnit, stateData);
15611574
} else {
1562-
overrideOwnership(serviceUnit, stateData, broker, gracefully);
1575+
overrideFutures.add(overrideOwnership(serviceUnit, stateData, broker, gracefully));
1576+
tryWaitForOverrides(overrideFutures, !iter.hasNext());
15631577
}
15641578
orphanServiceUnitCleanupCnt++;
15651579
}
@@ -1584,9 +1598,14 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
15841598
}
15851599

15861600
// clean system bundles in the end
1587-
for (var orphanSystemServiceUnit : orphanSystemServiceUnits.entrySet()) {
1601+
var orphanSystemServiceUnitIter = orphanSystemServiceUnits.entrySet().iterator();
1602+
while (orphanSystemServiceUnitIter.hasNext()) {
1603+
var orphanSystemServiceUnit = iter.next();
15881604
log.info("Overriding orphan system service unit:{}", orphanSystemServiceUnit.getKey());
1589-
overrideOwnership(orphanSystemServiceUnit.getKey(), orphanSystemServiceUnit.getValue(), broker, gracefully);
1605+
overrideFutures.add(
1606+
overrideOwnership(orphanSystemServiceUnit.getKey(), orphanSystemServiceUnit.getValue(), broker,
1607+
gracefully));
1608+
tryWaitForOverrides(overrideFutures, !orphanSystemServiceUnitIter.hasNext());
15901609
}
15911610

15921611
try {
@@ -1710,10 +1729,14 @@ protected void monitorOwnerships(List<String> brokers) {
17101729
// timedOutInFlightStateServiceUnits are the in-flight ones although their src and dst brokers are known to
17111730
// be active.
17121731
if (!timedOutInFlightStateServiceUnits.isEmpty()) {
1713-
for (var etr : timedOutInFlightStateServiceUnits.entrySet()) {
1732+
List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
1733+
var iter = timedOutInFlightStateServiceUnits.entrySet().iterator();
1734+
while (iter.hasNext()) {
1735+
var etr = iter.next();
17141736
var orphanServiceUnit = etr.getKey();
17151737
var orphanData = etr.getValue();
1716-
overrideOwnership(orphanServiceUnit, orphanData, null, false);
1738+
overrideFutures.add(overrideOwnership(orphanServiceUnit, orphanData, null, false));
1739+
tryWaitForOverrides(overrideFutures, !iter.hasNext());
17171740
orphanServiceUnitCleanupCnt++;
17181741
}
17191742
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,10 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
463463

464464
Optional<TopBundlesLoadData> bundlesLoadData = context.topBundleLoadDataStore().get(maxBroker);
465465
if (bundlesLoadData.isEmpty() || bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
466-
log.error(String.format(CANNOT_UNLOAD_BROKER_MSG
467-
+ " TopBundlesLoadData is empty.", maxBroker));
466+
if (debugMode) {
467+
log.info(String.format(CANNOT_UNLOAD_BROKER_MSG
468+
+ " TopBundlesLoadData is empty.", maxBroker));
469+
}
468470
numOfBrokersWithEmptyLoadData++;
469471
continue;
470472
}

0 commit comments

Comments
 (0)