Skip to content

Commit bc209ee

Browse files
pandeydivyansh1803Divyansh Pandey
andauthored
Add new index and cluster level settings to limit the total primary shards per node and per index (#17295)
* Added a new index level setting to limit the total primary shards per index per node. Added relevant files for unit test and integration test. Signed-off-by: Divyansh Pandey <[email protected]> * update files for code quality Signed-off-by: Divyansh Pandey <[email protected]> * moved primary shard count function to RoutingNode.java Signed-off-by: Divyansh Pandey <[email protected]> * removed unwanted files Signed-off-by: Divyansh Pandey <[email protected]> * added cluster level setting to limit total primary shards per node Signed-off-by: Divyansh Pandey <[email protected]> * allow the index level settings to be applied to both DOCUMENT and SEGMENT replication indices Signed-off-by: Divyansh Pandey <[email protected]> * Added necessary validator to restrict the index and cluster level primary shards per node settings only for remote store enabled cluster. Added relevant unit and integration tests. Signed-off-by: Divyansh Pandey <[email protected]> * refactoring changes Signed-off-by: Divyansh Pandey <[email protected]> * refactoring changes Signed-off-by: Divyansh Pandey <[email protected]> * Empty commit to rerun gradle test Signed-off-by: Divyansh Pandey <[email protected]> * optimised the calculation of total primary shards on a node Signed-off-by: Divyansh Pandey <[email protected]> * Refactoring changes Signed-off-by: Divyansh Pandey <[email protected]> * refactoring changes, added TODO to MetadataCreateIndexService Signed-off-by: Divyansh Pandey <[email protected]> * Added integration test for scenario where primary shards setting is set for cluster which is not remote store enabled Signed-off-by: Divyansh Pandey <[email protected]> --------- Signed-off-by: Divyansh Pandey <[email protected]> Signed-off-by: Divyansh Pandey <[email protected]> Co-authored-by: Divyansh Pandey <[email protected]>
1 parent 0714a1b commit bc209ee

File tree

16 files changed

+1320
-27
lines changed

16 files changed

+1320
-27
lines changed

CHANGELOG-3.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- Add systemd configurations to strengthen OS core security ([#17107](https://github.com/opensearch-project/OpenSearch/pull/17107))
1818
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
1919
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
20+
- Add cluster and index level settings to limit the total primary shards per node and per index [#17295](https://github.com/opensearch-project/OpenSearch/pull/17295)
2021
- Add execution_hint to cardinality aggregator request (#[17312](https://github.com/opensearch-project/OpenSearch/pull/17312))
2122
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
2223
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation.decider;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.routing.IndexRoutingTable;
13+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
14+
import org.opensearch.cluster.routing.RoutingNode;
15+
import org.opensearch.cluster.routing.ShardRouting;
16+
import org.opensearch.cluster.routing.ShardRoutingState;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.test.OpenSearchIntegTestCase;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
29+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
30+
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
31+
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING;
32+
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
33+
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
34+
35+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
36+
public class ShardsLimitAllocationDeciderIT extends OpenSearchIntegTestCase {
37+
38+
@Override
39+
protected Settings nodeSettings(int nodeOrdinal) {
40+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
41+
}
42+
43+
public void testClusterWideShardsLimit() {
44+
// Set the cluster-wide shard limit to 2
45+
updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 4);
46+
47+
// Create the first two indices with 3 shards and 1 replica each
48+
createIndex("test1", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
49+
createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
50+
51+
// Create the third index with 2 shards and 1 replica
52+
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
53+
54+
// Wait for the shard limit to be applied
55+
try {
56+
assertBusy(() -> {
57+
ClusterState state = client().admin().cluster().prepareState().get().getState();
58+
59+
// Check total number of shards
60+
assertEquals(16, state.getRoutingTable().allShards().size());
61+
62+
// Check number of unassigned shards
63+
int unassignedShards = state.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size();
64+
assertEquals(4, unassignedShards);
65+
66+
// Check shard distribution across nodes
67+
for (RoutingNode routingNode : state.getRoutingNodes()) {
68+
assertTrue("Node exceeds shard limit", routingNode.numberOfOwningShards() <= 4);
69+
}
70+
});
71+
} catch (Exception e) {
72+
throw new RuntimeException(e);
73+
}
74+
75+
// Additional assertions to verify shard distribution
76+
ClusterState state = client().admin().cluster().prepareState().get().getState();
77+
int totalAssignedShards = 0;
78+
for (RoutingNode routingNode : state.getRoutingNodes()) {
79+
totalAssignedShards += routingNode.numberOfOwningShards();
80+
}
81+
assertEquals("Total assigned shards should be 12", 12, totalAssignedShards);
82+
83+
}
84+
85+
public void testIndexSpecificShardLimit() {
86+
// Set the index-specific shard limit to 2 for the first index only
87+
Settings indexSettingsWithLimit = Settings.builder()
88+
.put(SETTING_NUMBER_OF_SHARDS, 4)
89+
.put(SETTING_NUMBER_OF_REPLICAS, 1)
90+
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2)
91+
.build();
92+
93+
Settings indexSettingsWithoutLimit = Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build();
94+
95+
// Create the first index with 4 shards, 1 replica, and the index-specific limit
96+
createIndex("test1", indexSettingsWithLimit);
97+
98+
// Create the second index with 4 shards and 1 replica, without the index-specific limit
99+
createIndex("test2", indexSettingsWithoutLimit);
100+
101+
// Create the third index with 3 shards and 1 replica, without the index-specific limit
102+
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
103+
104+
try {
105+
// Wait for the shard limit to be applied
106+
assertBusy(() -> {
107+
ClusterState state = client().admin().cluster().prepareState().get().getState();
108+
109+
// Check total number of shards
110+
assertEquals(22, state.getRoutingTable().allShards().size());
111+
112+
// Check total number of assigned and unassigned shards
113+
int totalAssignedShards = 0;
114+
int totalUnassignedShards = 0;
115+
Map<String, Integer> unassignedShardsByIndex = new HashMap<>();
116+
117+
for (IndexRoutingTable indexRoutingTable : state.getRoutingTable()) {
118+
String index = indexRoutingTable.getIndex().getName();
119+
int indexUnassignedShards = 0;
120+
121+
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
122+
for (ShardRouting shardRouting : shardRoutingTable) {
123+
if (shardRouting.unassigned()) {
124+
totalUnassignedShards++;
125+
indexUnassignedShards++;
126+
} else {
127+
totalAssignedShards++;
128+
}
129+
}
130+
}
131+
132+
unassignedShardsByIndex.put(index, indexUnassignedShards);
133+
}
134+
135+
assertEquals("Total assigned shards should be 20", 20, totalAssignedShards);
136+
assertEquals("Total unassigned shards should be 2", 2, totalUnassignedShards);
137+
138+
// Check unassigned shards for each index
139+
assertEquals("test1 should have 2 unassigned shards", 2, unassignedShardsByIndex.get("test1").intValue());
140+
assertEquals("test2 should have 0 unassigned shards", 0, unassignedShardsByIndex.get("test2").intValue());
141+
assertEquals("test3 should have 0 unassigned shards", 0, unassignedShardsByIndex.get("test3").intValue());
142+
});
143+
} catch (Exception e) {
144+
throw new RuntimeException(e);
145+
}
146+
}
147+
148+
public void testCombinedClusterAndIndexSpecificShardLimits() {
149+
// Set the cluster-wide shard limit to 6
150+
updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 6);
151+
152+
// Create the first index with 3 shards, 1 replica, and index-specific limit of 1
153+
Settings indexSettingsWithLimit = Settings.builder()
154+
.put(SETTING_NUMBER_OF_SHARDS, 3)
155+
.put(SETTING_NUMBER_OF_REPLICAS, 1)
156+
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
157+
.build();
158+
createIndex("test1", indexSettingsWithLimit);
159+
160+
// Create the second index with 4 shards and 1 replica
161+
createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
162+
163+
// Create the third index with 3 shards and 1 replica
164+
createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build());
165+
166+
try {
167+
assertBusy(() -> {
168+
ClusterState state = client().admin().cluster().prepareState().get().getState();
169+
170+
// Check total number of shards
171+
assertEquals("Total shards should be 20", 20, state.getRoutingTable().allShards().size());
172+
173+
int totalAssignedShards = 0;
174+
int totalUnassignedShards = 0;
175+
Map<String, Integer> unassignedShardsByIndex = new HashMap<>();
176+
Map<String, Integer> nodeShardCounts = new HashMap<>();
177+
Map<String, Set<String>> indexShardsPerNode = new HashMap<>();
178+
179+
for (RoutingNode routingNode : state.getRoutingNodes()) {
180+
String nodeName = routingNode.node().getName();
181+
nodeShardCounts.put(nodeName, routingNode.numberOfOwningShards());
182+
indexShardsPerNode.put(nodeName, new HashSet<>());
183+
184+
for (ShardRouting shardRouting : routingNode) {
185+
indexShardsPerNode.get(nodeName).add(shardRouting.getIndexName());
186+
}
187+
}
188+
189+
for (IndexRoutingTable indexRoutingTable : state.getRoutingTable()) {
190+
String index = indexRoutingTable.getIndex().getName();
191+
int indexUnassignedShards = 0;
192+
193+
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
194+
for (ShardRouting shardRouting : shardRoutingTable) {
195+
if (shardRouting.unassigned()) {
196+
totalUnassignedShards++;
197+
indexUnassignedShards++;
198+
} else {
199+
totalAssignedShards++;
200+
}
201+
}
202+
}
203+
204+
unassignedShardsByIndex.put(index, indexUnassignedShards);
205+
}
206+
207+
assertEquals("Total assigned shards should be 17", 17, totalAssignedShards);
208+
assertEquals("Total unassigned shards should be 3", 3, totalUnassignedShards);
209+
assertEquals("test1 should have 3 unassigned shards", 3, unassignedShardsByIndex.get("test1").intValue());
210+
assertEquals("test2 should have 0 unassigned shards", 0, unassignedShardsByIndex.getOrDefault("test2", 0).intValue());
211+
assertEquals("test3 should have 0 unassigned shards", 0, unassignedShardsByIndex.getOrDefault("test3", 0).intValue());
212+
213+
// Check shard distribution across nodes
214+
List<Integer> shardCounts = new ArrayList<>(nodeShardCounts.values());
215+
Collections.sort(shardCounts, Collections.reverseOrder());
216+
assertEquals("Two nodes should have 6 shards", 6, shardCounts.get(0).intValue());
217+
assertEquals("Two nodes should have 6 shards", 6, shardCounts.get(1).intValue());
218+
assertEquals("One node should have 5 shards", 5, shardCounts.get(2).intValue());
219+
220+
// Check that all nodes have only one shard of the first index
221+
for (Set<String> indexesOnNode : indexShardsPerNode.values()) {
222+
assertTrue("Each node should have a shard from test1", indexesOnNode.contains("test1"));
223+
}
224+
});
225+
} catch (Exception e) {
226+
throw new RuntimeException(e);
227+
}
228+
}
229+
230+
/**
231+
* Integration test to verify the behavior of INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
232+
* in a non-remote store environment.
233+
*
234+
* Scenario:
235+
* An end-user attempts to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
236+
* on a cluster where remote store is not enabled.
237+
*
238+
* Expected Outcome:
239+
* The system should reject the index creation request and throw an appropriate exception,
240+
* indicating that this setting is only applicable for remote store enabled clusters.
241+
*/
242+
public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
243+
// Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
244+
Settings indexSettings = Settings.builder()
245+
.put(SETTING_NUMBER_OF_SHARDS, 3)
246+
.put(SETTING_NUMBER_OF_REPLICAS, 1)
247+
.put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1)
248+
.build();
249+
250+
// Assert that creating the index throws an exception
251+
IllegalArgumentException exception = expectThrows(
252+
IllegalArgumentException.class,
253+
() -> { createIndex("test_index", indexSettings); }
254+
);
255+
256+
// Verify the exception message
257+
assertTrue(
258+
"Exception should mention that the setting requires remote store",
259+
exception.getMessage()
260+
.contains(
261+
"Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters"
262+
)
263+
);
264+
}
265+
266+
/**
267+
* Integration test to verify the behavior of CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
268+
* in a non-remote store environment.
269+
*
270+
* Scenario:
271+
* An end-user attempts to create an index with CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
272+
* on a cluster where remote store is not enabled.
273+
*
274+
* Expected Outcome:
275+
* The system should reject the index creation request and throw an appropriate exception,
276+
* indicating that this setting is only applicable for remote store enabled clusters.
277+
*/
278+
public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() {
279+
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
280+
updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1);
281+
});
282+
283+
// Verify the exception message
284+
assertTrue(
285+
"Exception should mention that the setting requires remote store",
286+
exception.getMessage()
287+
.contains(
288+
"Setting [cluster.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters"
289+
)
290+
);
291+
292+
// Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING
293+
Settings indexSettings = Settings.builder()
294+
.put(SETTING_NUMBER_OF_SHARDS, 3)
295+
.put(SETTING_NUMBER_OF_REPLICAS, 1)
296+
.put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1)
297+
.build();
298+
299+
createIndex("test_index", indexSettings);
300+
}
301+
302+
private void updateClusterSetting(String setting, int value) {
303+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(setting, value)).get();
304+
}
305+
}

0 commit comments

Comments
 (0)