Skip to content

Commit 625dd5a

Browse files
authored
Fix responsibility check for existing shards allocator when timed out (#15223)
* Fix responsibility check for existing shards allocator when timed out Signed-off-by: Rishab Nahata <[email protected]>
1 parent 54c13a6 commit 625dd5a

File tree

6 files changed

+34
-31
lines changed

6 files changed

+34
-31
lines changed

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

+8
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,20 @@ protected void allocateUnassignedBatchOnTimeout(Set<ShardId> shardIds, RoutingAl
9090
ShardRouting unassignedShard = iterator.next();
9191
AllocateUnassignedDecision allocationDecision;
9292
if (unassignedShard.primary() == primary && shardIds.contains(unassignedShard.shardId())) {
93+
if (isResponsibleFor(unassignedShard) == false) {
94+
continue;
95+
}
9396
allocationDecision = AllocateUnassignedDecision.throttle(null);
9497
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
9598
}
9699
}
97100
}
98101

102+
/**
103+
* Is the allocator responsible for allocating the given {@link ShardRouting}?
104+
*/
105+
protected abstract boolean isResponsibleFor(ShardRouting shardRouting);
106+
99107
protected void executeDecision(
100108
ShardRouting shardRouting,
101109
AllocateUnassignedDecision allocateUnassignedDecision,

server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
8282
/**
8383
* Is the allocator responsible for allocating the given {@link ShardRouting}?
8484
*/
85-
protected static boolean isResponsibleFor(final ShardRouting shard) {
85+
protected boolean isResponsibleFor(final ShardRouting shard) {
8686
return shard.primary() // must be primary
8787
&& shard.unassigned() // must be unassigned
8888
// only handle either an existing store or a snapshot recovery

server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
191191
/**
192192
* Is the allocator responsible for allocating the given {@link ShardRouting}?
193193
*/
194-
protected static boolean isResponsibleFor(final ShardRouting shard) {
194+
protected boolean isResponsibleFor(final ShardRouting shard) {
195195
return shard.primary() == false // must be a replica
196196
&& shard.unassigned() // must be unassigned
197197
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...

server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
173173
RoutingAllocation allocation,
174174
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
175175
) {
176-
if (!isResponsibleFor(shardRouting)) {
176+
if (isResponsibleFor(shardRouting) == false) {
177177
return AllocateUnassignedDecision.NOT_TAKEN;
178178
}
179179
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);

server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java

+5-28
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.stream.Collectors;
5252

5353
import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED;
54+
import static org.opensearch.cluster.routing.UnassignedInfo.Reason.INDEX_CREATED;
5455

5556
public class PrimaryShardBatchAllocatorTests extends OpenSearchAllocationTestCase {
5657

@@ -283,40 +284,16 @@ public void testAllocateUnassignedBatchOnTimeoutWithNoMatchingPrimaryShards() {
283284
assertEquals(0, ignoredShards.size());
284285
}
285286

286-
public void testAllocateUnassignedBatchOnTimeoutWithNonPrimaryShards() {
287+
public void testAllocateUnassignedBatchOnTimeoutSkipIgnoringNewPrimaryShards() {
287288
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
288289
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
289290
setUpShards(1);
290-
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");
291+
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, INDEX_CREATED);
292+
ShardRouting shardRouting = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();
291293

292-
ShardRouting shardRouting = routingAllocation.routingTable()
293-
.getIndicesRouting()
294-
.get("test")
295-
.shard(shardId.id())
296-
.replicaShards()
297-
.get(0);
298294
Set<ShardId> shardIds = new HashSet<>();
299295
shardIds.add(shardRouting.shardId());
300-
batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false);
301-
302-
List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
303-
assertEquals(1, ignoredShards.size());
304-
}
305-
306-
public void testAllocateUnassignedBatchOnTimeoutWithNoShards() {
307-
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
308-
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
309-
setUpShards(1);
310-
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");
311-
312-
ShardRouting shardRouting = routingAllocation.routingTable()
313-
.getIndicesRouting()
314-
.get("test")
315-
.shard(shardId.id())
316-
.replicaShards()
317-
.get(0);
318-
Set<ShardId> shardIds = new HashSet<>();
319-
batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false);
296+
batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, true);
320297

321298
List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
322299
assertEquals(0, ignoredShards.size());

server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java

+18
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,24 @@ public void testAllocateUnassignedBatchOnTimeoutWithAlreadyRecoveringReplicaShar
744744
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
745745
}
746746

747+
public void testAllocateUnassignedBatchOnTimeoutSkipIgnoringNewReplicaShards() {
748+
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(
749+
yesAllocationDeciders(),
750+
Settings.EMPTY,
751+
UnassignedInfo.Reason.INDEX_CREATED
752+
);
753+
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
754+
Set<ShardId> shards = new HashSet<>();
755+
while (iterator.hasNext()) {
756+
ShardRouting sr = iterator.next();
757+
if (sr.primary() == false) {
758+
shards.add(sr.shardId());
759+
}
760+
}
761+
testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation, false);
762+
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
763+
}
764+
747765
private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
748766
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED);
749767
}

0 commit comments

Comments
 (0)