Skip to content

Commit c19cf88

Browse files
Add canRemain method to TargetPoolAllocationDecider to move shards from (#15010)
local to remote pool for hot to warm tiering Signed-off-by: Neetika Singhal <[email protected]>
1 parent a60b668 commit c19cf88

File tree

7 files changed

+275
-4
lines changed

7 files changed

+275
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4848
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
4949
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
5050
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
51+
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
5152

5253
### Dependencies
5354
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.tiering;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.cluster.routing.ShardRouting;
13+
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
14+
import org.opensearch.index.IndexModule;
15+
16+
/**
17+
* Utility class for tiering operations
18+
*
19+
* @opensearch.internal
20+
*/
21+
public class TieringUtils {
22+
23+
/**
24+
* Checks if the specified shard is a partial shard by
25+
* checking the INDEX_STORE_LOCALITY_SETTING for its index.
26+
* see {@link #isPartialIndex(IndexMetadata)}
27+
* @param shard ShardRouting object representing the shard
28+
* @param allocation RoutingAllocation object representing the allocation
29+
* @return true if the shard is a partial shard, false otherwise
30+
*/
31+
public static boolean isPartialShard(ShardRouting shard, RoutingAllocation allocation) {
32+
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
33+
return isPartialIndex(indexMetadata);
34+
}
35+
36+
/**
37+
* Checks if the specified index is a partial index by
38+
* checking the INDEX_STORE_LOCALITY_SETTING for the index.
39+
*
40+
* @param indexMetadata the metadata of the index
41+
* @return true if the index is a partial index, false otherwise
42+
*/
43+
public static boolean isPartialIndex(final IndexMetadata indexMetadata) {
44+
return IndexModule.DataLocalityType.PARTIAL.name()
45+
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
46+
}
47+
}

server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import org.opensearch.cluster.metadata.IndexMetadata;
1212
import org.opensearch.cluster.node.DiscoveryNode;
1313
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
14+
import org.opensearch.common.util.FeatureFlags;
15+
16+
import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;
1417

1518
/**
1619
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
@@ -58,6 +61,7 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
5861
* @return {@link RoutingPool} for the given index.
5962
*/
6063
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
61-
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
64+
return indexMetadata.isRemoteSnapshot()
65+
|| (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialIndex(indexMetadata)) ? REMOTE_CAPABLE : LOCAL_ONLY;
6266
}
6367
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.cluster.routing.allocation.decider.Decision;
3131
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
3232
import org.opensearch.common.collect.Tuple;
33+
import org.opensearch.common.util.FeatureFlags;
3334
import org.opensearch.gateway.PriorityComparator;
3435

3536
import java.util.ArrayList;
@@ -45,6 +46,7 @@
4546
import java.util.stream.Stream;
4647
import java.util.stream.StreamSupport;
4748

49+
import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialShard;
4850
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
4951

5052
/**
@@ -552,6 +554,16 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
552554
}
553555
}
554556

557+
/**
558+
* Checks if the shard can be skipped from the local shard balancer operations
559+
* @param shardRouting the shard to be checked
560+
* @return true if the shard can be skipped, false otherwise
561+
*/
562+
private boolean canShardBeSkipped(ShardRouting shardRouting) {
563+
return (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))
564+
&& !(FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shardRouting, allocation)));
565+
}
566+
555567
/**
556568
* Move started shards that can not be allocated to a node anymore
557569
* <p>
@@ -603,7 +615,7 @@ void moveShards() {
603615

604616
ShardRouting shardRouting = it.next();
605617

606-
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
618+
if (canShardBeSkipped(shardRouting)) {
607619
continue;
608620
}
609621

@@ -669,7 +681,7 @@ void moveShards() {
669681
*/
670682
@Override
671683
MoveDecision decideMove(final ShardRouting shardRouting) {
672-
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
684+
if (canShardBeSkipped(shardRouting)) {
673685
return MoveDecision.NOT_TAKEN;
674686
}
675687

@@ -758,7 +770,9 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
758770
for (ShardRouting shard : rn) {
759771
assert rn.nodeId().equals(shard.currentNodeId());
760772
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
761-
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
773+
if ((RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation))
774+
|| (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shard, allocation)))
775+
&& shard.state() != RELOCATING) {
762776
node.addShard(shard);
763777
++totalShardCount;
764778
if (logger.isTraceEnabled()) {

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,36 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
8787
return canAllocate(shardRouting, node, allocation);
8888
}
8989

90+
@Override
91+
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
92+
RoutingPool targetPool = RoutingPool.getShardPool(shardRouting, allocation);
93+
RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shardRouting.currentNodeId()));
94+
if (RoutingPool.REMOTE_CAPABLE.equals(targetPool) && targetPool != currentNodePool) {
95+
logger.debug(
96+
"Shard: [{}] has current pool: [{}], target pool: [{}]. Cannot remain on node: [{}]",
97+
shardRouting.shortSummary(),
98+
currentNodePool.name(),
99+
RoutingPool.REMOTE_CAPABLE.name(),
100+
node.node()
101+
);
102+
return allocation.decision(
103+
Decision.NO,
104+
NAME,
105+
"Shard %s is allocated on a different pool %s than the target pool %s",
106+
shardRouting.shortSummary(),
107+
currentNodePool,
108+
targetPool
109+
);
110+
}
111+
return allocation.decision(
112+
Decision.YES,
113+
NAME,
114+
"Routing pools are compatible. Shard pool: [%s], node pool: [%s]",
115+
currentNodePool,
116+
targetPool
117+
);
118+
}
119+
90120
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
91121
logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex());
92122
return canAllocateInTargetPool(indexMetadata, node, allocation);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.cluster.ClusterState;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.cluster.routing.RoutingNode;
15+
import org.opensearch.cluster.routing.RoutingNodes;
16+
import org.opensearch.cluster.routing.RoutingPool;
17+
import org.opensearch.cluster.routing.ShardRouting;
18+
import org.opensearch.common.util.FeatureFlags;
19+
import org.opensearch.index.IndexModule;
20+
import org.opensearch.test.FeatureFlagSetter;
21+
import org.junit.Before;
22+
23+
import static org.opensearch.cluster.routing.RoutingPool.LOCAL_ONLY;
24+
import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
25+
import static org.opensearch.cluster.routing.RoutingPool.getIndexPool;
26+
import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING;
27+
28+
public class ShardsTieringAllocationTests extends TieringAllocationBaseTestCase {
29+
30+
@Before
31+
public void setup() {
32+
FeatureFlagSetter.set(FeatureFlags.TIERED_REMOTE_INDEX);
33+
}
34+
35+
public void testShardsInLocalPool() {
36+
int localOnlyNodes = 5;
37+
int remoteCapableNodes = 3;
38+
int localIndices = 5;
39+
int remoteIndices = 0;
40+
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
41+
AllocationService service = this.createRemoteCapableAllocationService();
42+
// assign shards to respective nodes
43+
clusterState = allocateShardsAndBalance(clusterState, service);
44+
RoutingNodes routingNodes = clusterState.getRoutingNodes();
45+
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
46+
assertEquals(0, routingNodes.unassigned().size());
47+
48+
for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
49+
assertFalse(shard.unassigned());
50+
RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
51+
assertEquals(LOCAL_ONLY, shardPool);
52+
}
53+
}
54+
55+
public void testShardsInRemotePool() {
56+
int localOnlyNodes = 7;
57+
int remoteCapableNodes = 3;
58+
int localIndices = 0;
59+
int remoteIndices = 13;
60+
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
61+
AllocationService service = this.createRemoteCapableAllocationService();
62+
// assign shards to respective nodes
63+
clusterState = allocateShardsAndBalance(clusterState, service);
64+
RoutingNodes routingNodes = clusterState.getRoutingNodes();
65+
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
66+
assertEquals(0, routingNodes.unassigned().size());
67+
68+
for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
69+
assertFalse(shard.unassigned());
70+
RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
71+
assertEquals(REMOTE_CAPABLE, shardPool);
72+
}
73+
}
74+
75+
public void testShardsWithTiering() {
76+
int localOnlyNodes = 15;
77+
int remoteCapableNodes = 13;
78+
int localIndices = 10;
79+
int remoteIndices = 0;
80+
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
81+
AllocationService service = this.createRemoteCapableAllocationService();
82+
// assign shards to respective nodes
83+
clusterState = allocateShardsAndBalance(clusterState, service);
84+
// put indices in the hot to warm tiering state
85+
clusterState = updateIndexMetadataForTiering(
86+
clusterState,
87+
localIndices,
88+
IndexModule.TieringState.HOT_TO_WARM.name(),
89+
IndexModule.DataLocalityType.PARTIAL.name()
90+
);
91+
// trigger shard relocation
92+
clusterState = allocateShardsAndBalance(clusterState, service);
93+
RoutingNodes routingNodes = clusterState.getRoutingNodes();
94+
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
95+
assertEquals(0, routingNodes.unassigned().size());
96+
97+
for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
98+
assertFalse(shard.unassigned());
99+
RoutingNode node = routingNodes.node(shard.currentNodeId());
100+
RoutingPool nodePool = RoutingPool.getNodePool(node);
101+
RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
102+
assertEquals(RoutingPool.REMOTE_CAPABLE, shardPool);
103+
assertEquals(nodePool, shardPool);
104+
}
105+
}
106+
107+
public void testShardPoolForPartialIndices() {
108+
String index = "test-index";
109+
IndexMetadata indexMetadata = IndexMetadata.builder(index)
110+
.settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()))
111+
.numberOfShards(PRIMARIES)
112+
.numberOfReplicas(REPLICAS)
113+
.build();
114+
RoutingPool indexPool = getIndexPool(indexMetadata);
115+
assertEquals(REMOTE_CAPABLE, indexPool);
116+
}
117+
118+
public void testShardPoolForFullIndices() {
119+
String index = "test-index";
120+
IndexMetadata indexMetadata = IndexMetadata.builder(index)
121+
.settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()))
122+
.numberOfShards(PRIMARIES)
123+
.numberOfReplicas(REPLICAS)
124+
.build();
125+
RoutingPool indexPool = getIndexPool(indexMetadata);
126+
assertEquals(LOCAL_ONLY, indexPool);
127+
}
128+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.cluster.metadata.Metadata;
14+
import org.opensearch.common.SuppressForbidden;
15+
import org.opensearch.common.settings.Settings;
16+
17+
import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING;
18+
import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE;
19+
20+
@SuppressForbidden(reason = "feature flag overrides")
21+
public abstract class TieringAllocationBaseTestCase extends RemoteShardsBalancerBaseTestCase {
22+
23+
public ClusterState updateIndexMetadataForTiering(
24+
ClusterState clusterState,
25+
int localIndices,
26+
String tieringState,
27+
String dataLocality
28+
) {
29+
Metadata.Builder mb = Metadata.builder(clusterState.metadata());
30+
for (int i = 0; i < localIndices; i++) {
31+
IndexMetadata indexMetadata = clusterState.metadata().index(getIndexName(i, false));
32+
Settings settings = indexMetadata.getSettings();
33+
mb.put(
34+
IndexMetadata.builder(indexMetadata)
35+
.settings(
36+
Settings.builder()
37+
.put(settings)
38+
.put(settings)
39+
.put(INDEX_TIERING_STATE.getKey(), tieringState)
40+
.put(INDEX_STORE_LOCALITY_SETTING.getKey(), dataLocality)
41+
)
42+
);
43+
}
44+
Metadata metadata = mb.build();
45+
return ClusterState.builder(clusterState).metadata(metadata).build();
46+
}
47+
}

0 commit comments

Comments
 (0)