Skip to content

Commit bb45f03

Browse files
authored
Add Setting to adjust the primary constraint weights (#16471)
Add Setting to adjust the primary constraint weights (#16471) Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 4ad1be3 commit bb45f03

File tree

8 files changed

+71
-25
lines changed

8 files changed

+71
-25
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
3232
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
3333
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
34+
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
3435

3536
### Dependencies
3637
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public void updateAllocationConstraint(String constraint, boolean enable) {
3939
this.constraints.get(constraint).setEnable(enable);
4040
}
4141

42-
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
43-
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
42+
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryThresholdWeight) {
43+
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight);
4444
return params.weight(constraints);
4545
}
4646
}

server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.Map;
1515
import java.util.function.Predicate;
1616

17-
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;
17+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.predicateKeyToWeightMap;
1818

1919
/**
2020
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
@@ -44,11 +44,13 @@ static class ConstraintParams {
4444
private ShardsBalancer balancer;
4545
private BalancedShardsAllocator.ModelNode node;
4646
private String index;
47+
private long PrimaryConstraintThreshold;
4748

48-
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
49+
ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
4950
this.balancer = balancer;
5051
this.node = node;
5152
this.index = index;
53+
this.PrimaryConstraintThreshold = primaryConstraintThreshold;
5254
}
5355

5456
public ShardsBalancer getBalancer() {
@@ -75,9 +77,12 @@ public String getIndex() {
7577
*/
7678
public long weight(Map<String, Constraint> constraints) {
7779
long totalConstraintWeight = 0;
78-
for (Constraint constraint : constraints.values()) {
80+
for (Map.Entry<String, Constraint> entry : constraints.entrySet()) {
81+
String key = entry.getKey();
82+
Constraint constraint = entry.getValue();
7983
if (constraint.test(this)) {
80-
totalConstraintWeight += CONSTRAINT_WEIGHT;
84+
double weight = predicateKeyToWeightMap(key, PrimaryConstraintThreshold);
85+
totalConstraintWeight += weight;
8186
}
8287
}
8388
return totalConstraintWeight;

server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,14 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
8686
return primaryShardCount >= allowedPrimaryShardCount;
8787
};
8888
}
89+
90+
public static long predicateKeyToWeightMap(String key, long primaryConstraintWeight) {
91+
switch (key) {
92+
case CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID:
93+
case CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID:
94+
return primaryConstraintWeight;
95+
default:
96+
return CONSTRAINT_WEIGHT;
97+
}
98+
}
8999
}

server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public void updateRebalanceConstraint(String constraint, boolean enable) {
4242
this.constraints.get(constraint).setEnable(enable);
4343
}
4444

45-
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) {
46-
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
45+
public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) {
46+
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold);
4747
return params.weight(constraints);
4848
}
4949
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
139139
Property.NodeScope
140140
);
141141

142+
public static final Setting<Long> PRIMARY_CONSTRAINT_THRESHOLD_SETTING = Setting.longSetting(
143+
"cluster.routing.allocation.primary_constraint.threshold",
144+
10,
145+
0,
146+
Property.Dynamic,
147+
Property.NodeScope
148+
);
149+
142150
/**
143151
* This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()}
144152
* and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation
@@ -201,6 +209,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
201209
private volatile float shardBalanceFactor;
202210
private volatile WeightFunction weightFunction;
203211
private volatile float threshold;
212+
private volatile long primaryConstraintThreshold;
204213

205214
private volatile boolean ignoreThrottleInRestore;
206215
private volatile TimeValue allocatorTimeout;
@@ -219,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
219228
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings));
220229
updateWeightFunction();
221230
setThreshold(THRESHOLD_SETTING.get(settings));
231+
setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings));
222232
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
223233
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
224234
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
@@ -231,6 +241,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
231241
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
232242
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
233243
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
244+
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting);
234245
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
235246
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
236247
}
@@ -294,7 +305,12 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan
294305
}
295306

296307
private void updateWeightFunction() {
297-
weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer);
308+
weightFunction = new WeightFunction(
309+
this.indexBalanceFactor,
310+
this.shardBalanceFactor,
311+
this.preferPrimaryShardRebalanceBuffer,
312+
this.primaryConstraintThreshold
313+
);
298314
}
299315

300316
/**
@@ -317,6 +333,11 @@ private void setThreshold(float threshold) {
317333
this.threshold = threshold;
318334
}
319335

336+
private void setPrimaryConstraintThresholdSetting(long threshold) {
337+
this.primaryConstraintThreshold = threshold;
338+
this.weightFunction.updatePrimaryConstraintThreshold(threshold);
339+
}
340+
320341
private void setAllocatorTimeout(TimeValue allocatorTimeout) {
321342
this.allocatorTimeout = allocatorTimeout;
322343
}
@@ -489,10 +510,11 @@ static class WeightFunction {
489510
private final float shardBalance;
490511
private final float theta0;
491512
private final float theta1;
513+
private long primaryConstraintThreshold;
492514
private AllocationConstraints constraints;
493515
private RebalanceConstraints rebalanceConstraints;
494516

495-
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
517+
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) {
496518
float sum = indexBalance + shardBalance;
497519
if (sum <= 0.0f) {
498520
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
@@ -501,6 +523,7 @@ static class WeightFunction {
501523
theta1 = indexBalance / sum;
502524
this.indexBalance = indexBalance;
503525
this.shardBalance = shardBalance;
526+
this.primaryConstraintThreshold = primaryConstraintThreshold;
504527
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
505528
this.constraints = new AllocationConstraints();
506529
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
@@ -510,12 +533,12 @@ static class WeightFunction {
510533

511534
public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
512535
float balancerWeight = weight(balancer, node, index);
513-
return balancerWeight + constraints.weight(balancer, node, index);
536+
return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold);
514537
}
515538

516539
public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) {
517540
float balancerWeight = weight(balancer, node, index);
518-
return balancerWeight + rebalanceConstraints.weight(balancer, node, index);
541+
return balancerWeight + rebalanceConstraints.weight(balancer, node, index, primaryConstraintThreshold);
519542
}
520543

521544
float weight(ShardsBalancer balancer, ModelNode node, String index) {
@@ -531,6 +554,10 @@ void updateAllocationConstraint(String constraint, boolean enable) {
531554
void updateRebalanceConstraint(String constraint, boolean add) {
532555
this.rebalanceConstraints.updateRebalanceConstraint(constraint, add);
533556
}
557+
558+
void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) {
559+
this.primaryConstraintThreshold = primaryConstraintThreshold;
560+
}
534561
}
535562

536563
/**

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) {
276276
BalancedShardsAllocator.THRESHOLD_SETTING,
277277
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
278278
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
279+
BalancedShardsAllocator.PRIMARY_CONSTRAINT_THRESHOLD_SETTING,
279280
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
280281
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
281282
BreakerSettings.CIRCUIT_BREAKER_TYPE,

server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
public class AllocationConstraintsTests extends OpenSearchAllocationTestCase {
2727

28+
long constraintWeight = 20L;
29+
2830
public void testSettings() {
2931
Settings.Builder settings = Settings.builder();
3032
ClusterSettings service = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
@@ -69,7 +71,7 @@ public void testIndexShardsPerNodeConstraint() {
6971
when(node.getNodeId()).thenReturn("test-node");
7072

7173
long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0;
72-
assertEquals(expectedWeight, constraints.weight(balancer, node, "index"));
74+
assertEquals(expectedWeight, constraints.weight(balancer, node, "index", constraintWeight));
7375

7476
}
7577

@@ -91,14 +93,14 @@ public void testPerIndexPrimaryShardsConstraint() {
9193
when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount);
9294
when(node.getNodeId()).thenReturn("test-node");
9395

94-
assertEquals(0, constraints.weight(balancer, node, indexName));
96+
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
9597

9698
perIndexPrimaryShardCount = 2;
9799
when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount);
98-
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
100+
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName, constraintWeight));
99101

100102
constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
101-
assertEquals(0, constraints.weight(balancer, node, indexName));
103+
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
102104
}
103105

104106
/**
@@ -118,14 +120,14 @@ public void testGlobalPrimaryShardsConstraint() {
118120
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
119121
when(node.getNodeId()).thenReturn("test-node");
120122

121-
assertEquals(0, constraints.weight(balancer, node, indexName));
123+
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
122124

123125
primaryShardCount = 3;
124126
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
125-
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
127+
assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight));
126128

127129
constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
128-
assertEquals(0, constraints.weight(balancer, node, indexName));
130+
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
129131
}
130132

131133
/**
@@ -150,22 +152,22 @@ public void testPrimaryShardsConstraints() {
150152
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
151153
when(node.getNodeId()).thenReturn("test-node");
152154

153-
assertEquals(0, constraints.weight(balancer, node, indexName));
155+
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
154156

155157
// breaching global primary shard count but not per index primary shard count
156158
primaryShardCount = 5;
157159
when(node.numPrimaryShards()).thenReturn(primaryShardCount);
158-
assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
160+
assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight));
159161

160162
// when per index primary shard count constraint is also breached
161163
perIndexPrimaryShardCount = 3;
162164
when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount);
163-
assertEquals(2 * CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName));
165+
assertEquals(CONSTRAINT_WEIGHT + constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight));
164166

165167
// disable both constraints
166168
constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
167169
constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false);
168-
assertEquals(0, constraints.weight(balancer, node, indexName));
170+
assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight));
169171
}
170172

171173
/**
@@ -202,8 +204,8 @@ public void testAllConstraints() {
202204

203205
long expectedWeight = (shardCount >= (int) Math.ceil(avgPerIndexShardsPerNode)) ? CONSTRAINT_WEIGHT : 0;
204206
expectedWeight += perIndexPrimaryShardCount > (int) Math.ceil(avgPerIndexPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0;
205-
expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0;
206-
assertEquals(expectedWeight, constraints.weight(balancer, node, indexName));
207+
expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? constraintWeight : 0;
208+
assertEquals(expectedWeight, constraints.weight(balancer, node, indexName, constraintWeight));
207209
}
208210

209211
}

0 commit comments

Comments
 (0)