Skip to content

Commit 66df930

Browse files
authored
de-duping shards in ShardsBatchGatewayAllocator based on ShardId instead of ShardRouting#equals (opensearch-project#13710)
Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
1 parent 199a03e commit 66df930

File tree

4 files changed

+92
-5
lines changed

4 files changed

+92
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3535
- Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553))
3636
- Fix get field mapping API returns 404 error in mixed cluster with multiple versions ([#13624](https://github.com/opensearch-project/OpenSearch/pull/13624))
3737
- Allow clearing `remote_store.compatibility_mode` setting ([#13646](https://github.com/opensearch-project/OpenSearch/pull/13646))
38+
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
3839

3940
### Security
4041

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,17 @@ public void cleanCaches() {
117117

118118
// for tests
119119
protected ShardsBatchGatewayAllocator() {
120+
this(DEFAULT_SHARD_BATCH_SIZE);
121+
}
122+
123+
protected ShardsBatchGatewayAllocator(long batchSize) {
120124
this.rerouteService = null;
121125
this.batchStartedAction = null;
122126
this.primaryShardBatchAllocator = null;
123127
this.batchStoreAction = null;
124128
this.replicaShardBatchAllocator = null;
125-
this.maxBatchSize = DEFAULT_SHARD_BATCH_SIZE;
129+
this.maxBatchSize = batchSize;
126130
}
127-
128131
// for tests
129132

130133
@Override
@@ -228,13 +231,13 @@ protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boole
228231
batchEntry.getValue().getBatchedShards().forEach(shardId -> currentBatchedShards.put(shardId, batchEntry.getKey()));
229232
}
230233

231-
Set<ShardRouting> newShardsToBatch = Sets.newHashSet();
234+
Map<ShardId, ShardRouting> newShardsToBatch = new HashMap<>();
232235
Set<ShardId> batchedShardsToAssign = Sets.newHashSet();
233236
// add all unassigned shards to the batch if they are not already in a batch
234237
unassigned.forEach(shardRouting -> {
235238
if ((currentBatchedShards.containsKey(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) {
236239
assert shardRouting.unassigned();
237-
newShardsToBatch.add(shardRouting);
240+
newShardsToBatch.put(shardRouting.shardId(), shardRouting);
238241
}
239242
// if shard is already batched update to latest shardRouting information in the batches
240243
// Replica shard assignment can be cancelled if we get a better match. These ShardRouting objects also
@@ -262,7 +265,7 @@ else if (shardRouting.primary() == primary) {
262265

263266
refreshShardBatches(currentBatches, batchedShardsToAssign, primary);
264267

265-
Iterator<ShardRouting> iterator = newShardsToBatch.iterator();
268+
Iterator<ShardRouting> iterator = newShardsToBatch.values().iterator();
266269
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";
267270

268271
long batchSize = maxBatchSize;

server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,21 @@
1818
import org.opensearch.cluster.OpenSearchAllocationTestCase;
1919
import org.opensearch.cluster.metadata.IndexMetadata;
2020
import org.opensearch.cluster.metadata.Metadata;
21+
import org.opensearch.cluster.node.DiscoveryNode;
22+
import org.opensearch.cluster.routing.IndexRoutingTable;
2123
import org.opensearch.cluster.routing.IndexShardRoutingTable;
24+
import org.opensearch.cluster.routing.RecoverySource;
2225
import org.opensearch.cluster.routing.RoutingNodes;
2326
import org.opensearch.cluster.routing.RoutingTable;
2427
import org.opensearch.cluster.routing.ShardRouting;
28+
import org.opensearch.cluster.routing.ShardRoutingState;
29+
import org.opensearch.cluster.routing.TestShardRouting;
30+
import org.opensearch.cluster.routing.UnassignedInfo;
2531
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
2632
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
2733
import org.opensearch.common.collect.Tuple;
2834
import org.opensearch.common.settings.Settings;
35+
import org.opensearch.common.util.set.Sets;
2936
import org.opensearch.core.index.shard.ShardId;
3037
import org.opensearch.snapshots.SnapshotShardSizeInfo;
3138
import org.opensearch.test.gateway.TestShardBatchGatewayAllocator;
@@ -222,6 +229,21 @@ public void testSafelyRemoveShardFromBothBatch() {
222229
assertEquals(0, testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch().size());
223230
}
224231

232+
public void testDeDuplicationOfReplicaShardsAcrossBatch() {
233+
final ShardId shardId = new ShardId("test", "_na_", 0);
234+
final DiscoveryNode node = newNode("node1");
235+
// number of replicas is greater than batch size - to ensure shardRouting gets de-duped across batch
236+
createRoutingWithDifferentUnAssignedInfo(shardId, node, 50);
237+
testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(10);
238+
239+
// only replica shard should be in the batch
240+
Set<String> replicaBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, false);
241+
assertEquals(1, replicaBatches.size());
242+
ShardsBatchGatewayAllocator.ShardsBatch shardsBatch = testShardsBatchGatewayAllocator.getBatchIdToStoreShardBatch()
243+
.get(replicaBatches.iterator().next());
244+
assertEquals(1, shardsBatch.getBatchedShards().size());
245+
}
246+
225247
public void testGetBatchIdExisting() {
226248
createIndexAndUpdateClusterState(2, 1020, 1);
227249
// get all shardsRoutings for test index
@@ -345,6 +367,59 @@ private void createIndexAndUpdateClusterState(int count, int numberOfShards, int
345367
);
346368
}
347369

370+
private void createRoutingWithDifferentUnAssignedInfo(ShardId primaryShardId, DiscoveryNode node, int numberOfReplicas) {
371+
372+
ShardRouting primaryShard = TestShardRouting.newShardRouting(primaryShardId, node.getId(), true, ShardRoutingState.STARTED);
373+
Metadata metadata = Metadata.builder()
374+
.put(
375+
IndexMetadata.builder(primaryShardId.getIndexName())
376+
.settings(settings(Version.CURRENT))
377+
.numberOfShards(1)
378+
.numberOfReplicas(numberOfReplicas)
379+
.putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId()))
380+
)
381+
.build();
382+
383+
IndexRoutingTable.Builder isd = IndexRoutingTable.builder(primaryShardId.getIndex())
384+
.addIndexShard(new IndexShardRoutingTable.Builder(primaryShardId).addShard(primaryShard).build());
385+
386+
for (int i = 0; i < numberOfReplicas; i++) {
387+
isd.addShard(
388+
ShardRouting.newUnassigned(
389+
primaryShardId,
390+
false,
391+
RecoverySource.PeerRecoverySource.INSTANCE,
392+
new UnassignedInfo(
393+
UnassignedInfo.Reason.REPLICA_ADDED,
394+
"message for replica-copy " + i,
395+
null,
396+
0,
397+
System.nanoTime(),
398+
System.currentTimeMillis(),
399+
false,
400+
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
401+
Collections.emptySet()
402+
)
403+
)
404+
);
405+
}
406+
407+
RoutingTable routingTable = RoutingTable.builder().add(isd).build();
408+
clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
409+
.metadata(metadata)
410+
.routingTable(routingTable)
411+
.build();
412+
testAllocation = new RoutingAllocation(
413+
new AllocationDeciders(Collections.emptyList()),
414+
new RoutingNodes(clusterState, false),
415+
clusterState,
416+
ClusterInfo.EMPTY,
417+
SnapshotShardSizeInfo.EMPTY,
418+
System.nanoTime()
419+
);
420+
421+
}
422+
348423
// call this after index creation and update cluster state
349424
private Tuple<Set<String>, Set<String>> createBatchesAndAssert(int expectedBatchSize) {
350425
Set<String> primaryBatches = testShardsBatchGatewayAllocator.createAndUpdateBatches(testAllocation, true);

test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@
3131

3232
public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator {
3333

34+
public TestShardBatchGatewayAllocator() {
35+
36+
}
37+
38+
public TestShardBatchGatewayAllocator(long maxBatchSize) {
39+
super(maxBatchSize);
40+
}
41+
3442
Map<String /* node id */, Map<ShardId, ShardRouting>> knownAllocations = new HashMap<>();
3543
DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES;
3644
Map<String, ReplicationCheckpoint> shardIdNodeToReplicationCheckPointMap = new HashMap<>();

0 commit comments

Comments
 (0)