Skip to content

Commit 691803c

Browse files
author
Harsh Garg
committed
Fixing _list/shards API for closed indices
Signed-off-by: Harsh Garg <[email protected]>
1 parent 9f790ee commit 691803c

File tree

4 files changed

+161
-25
lines changed

4 files changed

+161
-25
lines changed

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
package org.opensearch.action.admin.cluster.shards;
1010

11+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1112
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.action.pagination.PageParams;
14+
import org.opensearch.client.Requests;
1215
import org.opensearch.cluster.metadata.IndexMetadata;
1316
import org.opensearch.cluster.routing.ShardRouting;
1417
import org.opensearch.common.settings.Settings;
@@ -22,7 +25,10 @@
2225
import java.io.IOException;
2326
import java.util.List;
2427
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutionException;
2529

30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.is;
2632
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
2733
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
2834
import static org.opensearch.search.SearchService.NO_TIMEOUT;
@@ -125,4 +131,72 @@ public void onFailure(Exception e) {
125131
latch.await();
126132
}
127133

134+
public void testCatShardsSuccessWithPaginationWithClosedIndices() throws InterruptedException, ExecutionException {
135+
internalCluster().startClusterManagerOnlyNodes(1);
136+
List<String> nodes = internalCluster().startDataOnlyNodes(3);
137+
final int numIndices = 3;
138+
final int numShards = 5;
139+
final int numReplicas = 2;
140+
final int pageSize = numIndices * numReplicas * (numShards + 1);
141+
createIndex(
142+
"test-1",
143+
Settings.builder()
144+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
145+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
146+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
147+
.build()
148+
);
149+
createIndex(
150+
"test-2",
151+
Settings.builder()
152+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
153+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
154+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
155+
.build()
156+
);
157+
createIndex(
158+
"test-3",
159+
Settings.builder()
160+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
161+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
162+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
163+
.build()
164+
);
165+
ensureGreen();
166+
167+
// close index test-3
168+
client().admin().indices().close(Requests.closeIndexRequest("test-3")).get();
169+
170+
ClusterStateResponse clusterStateResponse = client().admin()
171+
.cluster()
172+
.prepareState()
173+
.clear()
174+
.setMetadata(true)
175+
.setIndices("test-3")
176+
.get();
177+
assertThat(clusterStateResponse.getState().metadata().index("test-3").getState(), equalTo(IndexMetadata.State.CLOSE));
178+
179+
180+
final CatShardsRequest shardsRequest = new CatShardsRequest();
181+
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
182+
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
183+
shardsRequest.setPageParams(new PageParams(null, PageParams.PARAM_ASC_SORT_VALUE, pageSize));
184+
CountDownLatch latch = new CountDownLatch(1);
185+
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
186+
@Override
187+
public void onResponse(CatShardsResponse catShardsResponse) {
188+
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
189+
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals("test-3")));
190+
latch.countDown();
191+
}
192+
193+
@Override
194+
public void onFailure(Exception e) {
195+
fail();
196+
latch.countDown();
197+
}
198+
});
199+
latch.await();
200+
}
201+
128202
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.action.pagination.ShardPaginationStrategy;
1717
import org.opensearch.action.support.ActionFilters;
1818
import org.opensearch.action.support.HandledTransportAction;
19+
import org.opensearch.action.support.IndicesOptions;
1920
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
2021
import org.opensearch.client.node.NodeClient;
2122
import org.opensearch.common.breaker.ResponseLimitBreachedException;
@@ -148,7 +149,9 @@ public void onFailure(Exception e) {
148149
}
149150

150151
private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
151-
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
152+
return Objects.isNull(pageParams)
153+
? null
154+
: new ShardPaginationStrategy(pageParams, clusterStateResponse.getState(), IndicesOptions.strictExpandOpenAndForbidClosed());
152155
}
153156

154157
private void validateRequestLimit(

server/src/main/java/org/opensearch/action/pagination/ShardPaginationStrategy.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.action.pagination;
1010

1111
import org.opensearch.OpenSearchParseException;
12+
import org.opensearch.action.support.IndicesOptions;
1213
import org.opensearch.cluster.ClusterState;
1314
import org.opensearch.cluster.metadata.IndexMetadata;
1415
import org.opensearch.cluster.routing.IndexRoutingTable;
@@ -37,14 +38,23 @@ public class ShardPaginationStrategy implements PaginationStrategy<ShardRouting>
3738
private PageData pageData;
3839

3940
public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState) {
41+
this(pageParams, clusterState, null);
42+
}
43+
44+
public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState, IndicesOptions indicesOptions) {
4045
ShardStrategyToken shardStrategyToken = getShardStrategyToken(pageParams.getRequestedToken());
4146
// Get list of indices metadata sorted by their creation time and filtered by the last sent index
42-
List<IndexMetadata> filteredIndices = getEligibleIndices(
47+
List<IndexMetadata> filteredIndices = PaginationStrategy.getSortedIndexMetadata(
4348
clusterState,
44-
pageParams.getSort(),
45-
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
46-
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime
49+
getMetadataFilter(
50+
pageParams.getSort(),
51+
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexName,
52+
Objects.isNull(shardStrategyToken) ? null : shardStrategyToken.lastIndexCreationTime,
53+
indicesOptions
54+
),
55+
PageParams.PARAM_ASC_SORT_VALUE.equals(pageParams.getSort()) ? ASC_COMPARATOR : DESC_COMPARATOR
4756
);
57+
4858
// Get the list of shards and indices belonging to current page.
4959
this.pageData = getPageData(
5060
filteredIndices,
@@ -54,39 +64,31 @@ public ShardPaginationStrategy(PageParams pageParams, ClusterState clusterState)
5464
);
5565
}
5666

57-
private static List<IndexMetadata> getEligibleIndices(
58-
ClusterState clusterState,
67+
private static Predicate<IndexMetadata> getMetadataFilter(
5968
String sortOrder,
6069
String lastIndexName,
61-
Long lastIndexCreationTime
70+
Long lastIndexCreationTime,
71+
IndicesOptions indicesOptions
6272
) {
6373
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
64-
return PaginationStrategy.getSortedIndexMetadata(
65-
clusterState,
66-
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
67-
);
68-
} else {
69-
return PaginationStrategy.getSortedIndexMetadata(
70-
clusterState,
71-
getMetadataFilter(sortOrder, lastIndexName, lastIndexCreationTime),
72-
PageParams.PARAM_ASC_SORT_VALUE.equals(sortOrder) ? ASC_COMPARATOR : DESC_COMPARATOR
73-
);
74-
}
75-
}
76-
77-
private static Predicate<IndexMetadata> getMetadataFilter(String sortOrder, String lastIndexName, Long lastIndexCreationTime) {
78-
if (Objects.isNull(lastIndexName) || Objects.isNull(lastIndexCreationTime)) {
79-
return indexMetadata -> true;
74+
return indexStateFilter(indicesOptions);
8075
}
8176
return indexNameFilter(lastIndexName).or(
8277
IndexPaginationStrategy.getIndexCreateTimeFilter(sortOrder, lastIndexName, lastIndexCreationTime)
83-
);
78+
).and(indexStateFilter(indicesOptions));
8479
}
8580

8681
private static Predicate<IndexMetadata> indexNameFilter(String lastIndexName) {
8782
return metadata -> metadata.getIndex().getName().equals(lastIndexName);
8883
}
8984

85+
private static Predicate<IndexMetadata> indexStateFilter(IndicesOptions indicesOptions) {
86+
if (Objects.isNull(indicesOptions) || !indicesOptions.forbidClosedIndices()) {
87+
return metadata -> true;
88+
}
89+
return metadata -> metadata.getState().equals(IndexMetadata.State.OPEN);
90+
}
91+
9092
/**
9193
* Will be used to get the list of shards and respective indices to which they belong,
9294
* which are to be displayed in a page.

server/src/test/java/org/opensearch/action/pagination/ShardPaginationStrategyTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.OpenSearchParseException;
1212
import org.opensearch.Version;
13+
import org.opensearch.action.support.IndicesOptions;
1314
import org.opensearch.cluster.ClusterName;
1415
import org.opensearch.cluster.ClusterState;
1516
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -393,6 +394,33 @@ public void testRetrieveShardsWhenLastIndexGetsDeletedAndReCreated() {
393394
assertNull(strategy.getResponseToken().getNextToken());
394395
}
395396

397+
/**
398+
* Validates strategy filters out CLOSED indices, if forbidClosed() indices options are provided.
399+
*/
400+
public void testNoClosedIndicesReturnedByStrategy() {
401+
final int pageSize = DEFAULT_NUMBER_OF_SHARDS * (DEFAULT_NUMBER_OF_REPLICAS + 1);
402+
ClusterState clusterState = getRandomClusterState(List.of(0, 1, 2, 3, 4, 5));
403+
// Add 2 closed indices to cluster state
404+
clusterState = addIndexToClusterState(clusterState, 6, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS, IndexMetadata.State.CLOSE);
405+
clusterState = addIndexToClusterState(clusterState, 7, DEFAULT_NUMBER_OF_SHARDS, DEFAULT_NUMBER_OF_REPLICAS, IndexMetadata.State.CLOSE);
406+
List<ShardRouting> shardRoutings = new ArrayList<>();
407+
List<String> indices = new ArrayList<>();
408+
String requestedToken = null;
409+
ShardPaginationStrategy strategy;
410+
do {
411+
PageParams pageParams = new PageParams(requestedToken, PARAM_ASC_SORT_VALUE, pageSize);
412+
strategy = new ShardPaginationStrategy(pageParams, clusterState, IndicesOptions.strictExpandOpenAndForbidClosed());
413+
requestedToken = strategy.getResponseToken().getNextToken();
414+
shardRoutings.addAll(strategy.getRequestedEntities());
415+
indices.addAll(strategy.getRequestedIndices());
416+
} while (requestedToken != null);
417+
// assert that the closed indices do not appear in the response
418+
assertFalse(indices.contains(TEST_INDEX_PREFIX + 6));
419+
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 6)));
420+
assertFalse(indices.contains(TEST_INDEX_PREFIX + 7));
421+
assertFalse(shardRoutings.stream().anyMatch(shard -> shard.getIndexName().equals(TEST_INDEX_PREFIX + 7)));
422+
}
423+
396424
public void testCreatingShardStrategyPageTokenWithRequestedTokenNull() {
397425
try {
398426
new ShardPaginationStrategy.ShardStrategyToken(null);
@@ -478,11 +506,40 @@ private ClusterState addIndexToClusterState(
478506
final int numShards,
479507
final int numReplicas,
480508
final long creationTime
509+
) {
510+
return addIndexToClusterState(clusterState, indexNumber, numShards, numReplicas, creationTime, IndexMetadata.State.OPEN);
511+
}
512+
513+
private ClusterState addIndexToClusterState(
514+
ClusterState clusterState,
515+
final int indexNumber,
516+
final int numShards,
517+
final int numReplicas,
518+
final IndexMetadata.State state
519+
) {
520+
return addIndexToClusterState(
521+
clusterState,
522+
indexNumber,
523+
numShards,
524+
numReplicas,
525+
Instant.now().plus(indexNumber, ChronoUnit.SECONDS).toEpochMilli(),
526+
state
527+
);
528+
}
529+
530+
private ClusterState addIndexToClusterState(
531+
ClusterState clusterState,
532+
final int indexNumber,
533+
final int numShards,
534+
final int numReplicas,
535+
final long creationTime,
536+
final IndexMetadata.State state
481537
) {
482538
IndexMetadata indexMetadata = IndexMetadata.builder(TEST_INDEX_PREFIX + indexNumber)
483539
.settings(settings(Version.CURRENT).put(SETTING_CREATION_DATE, creationTime))
484540
.numberOfShards(numShards)
485541
.numberOfReplicas(numReplicas)
542+
.state(state)
486543
.build();
487544
IndexRoutingTable.Builder indexRoutingTableBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()).initializeAsNew(
488545
indexMetadata

0 commit comments

Comments
 (0)