Skip to content

Commit 9f81479

Browse files
authored
Support cancellation for admin apis (opensearch-project#13966)
* Support cancellation for admin apis - add implementation for _cat/shards Signed-off-by: Somesh Gupta <[email protected]>
1 parent bcd09ab commit 9f81479

File tree

17 files changed

+601
-42
lines changed

17 files changed

+601
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
2828
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
2929
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
30+
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
3031
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
3132
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
3233
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.shards;
10+
11+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
12+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.cluster.routing.ShardRouting;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.unit.TimeValue;
17+
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.core.common.Strings;
19+
import org.opensearch.core.tasks.TaskCancelledException;
20+
import org.opensearch.test.InternalTestCluster;
21+
import org.opensearch.test.OpenSearchIntegTestCase;
22+
23+
import java.io.IOException;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
26+
27+
import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
28+
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
29+
import static org.opensearch.search.SearchService.NO_TIMEOUT;
30+
31+
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
32+
public class TransportCatShardsActionIT extends OpenSearchIntegTestCase {
33+
34+
public void testCatShardsWithSuccessResponse() throws InterruptedException {
35+
internalCluster().startClusterManagerOnlyNodes(1);
36+
List<String> nodes = internalCluster().startDataOnlyNodes(3);
37+
createIndex(
38+
"test",
39+
Settings.builder()
40+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
41+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
42+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
43+
.build()
44+
);
45+
ensureGreen("test");
46+
47+
final CatShardsRequest shardsRequest = new CatShardsRequest();
48+
shardsRequest.setCancelAfterTimeInterval(NO_TIMEOUT);
49+
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
50+
CountDownLatch latch = new CountDownLatch(1);
51+
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
52+
@Override
53+
public void onResponse(CatShardsResponse catShardsResponse) {
54+
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
55+
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
56+
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
57+
assertEquals("test", shard.getIndexName());
58+
assertNotNull(indicesStatsResponse.asMap().get(shard));
59+
}
60+
latch.countDown();
61+
}
62+
63+
@Override
64+
public void onFailure(Exception e) {
65+
fail();
66+
latch.countDown();
67+
}
68+
});
69+
latch.await();
70+
}
71+
72+
public void testCatShardsWithTimeoutException() throws IOException, AssertionError, InterruptedException {
73+
List<String> masterNodes = internalCluster().startClusterManagerOnlyNodes(1);
74+
List<String> nodes = internalCluster().startDataOnlyNodes(3);
75+
createIndex(
76+
"test",
77+
Settings.builder()
78+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
79+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
80+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "60m")
81+
.build()
82+
);
83+
ensureGreen("test");
84+
85+
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(masterNodes.get(0));
86+
// Dropping master node to delay in cluster state call.
87+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodes.get(0)));
88+
89+
CountDownLatch latch = new CountDownLatch(2);
90+
new Thread(() -> {
91+
try {
92+
// Ensures the cancellation timeout expires.
93+
Thread.sleep(2000);
94+
// Starting master node to proceed in cluster state call.
95+
internalCluster().startClusterManagerOnlyNode(
96+
Settings.builder().put("node.name", masterNodes.get(0)).put(clusterManagerDataPathSettings).build()
97+
);
98+
latch.countDown();
99+
} catch (InterruptedException e) {
100+
throw new RuntimeException(e);
101+
}
102+
}).start();
103+
104+
final CatShardsRequest shardsRequest = new CatShardsRequest();
105+
TimeValue timeoutInterval = timeValueMillis(1000);
106+
shardsRequest.setCancelAfterTimeInterval(timeoutInterval);
107+
shardsRequest.clusterManagerNodeTimeout(timeValueMillis(2500));
108+
shardsRequest.setIndices(Strings.EMPTY_ARRAY);
109+
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
110+
@Override
111+
public void onResponse(CatShardsResponse catShardsResponse) {
112+
// onResponse should not be called.
113+
latch.countDown();
114+
throw new AssertionError(
115+
"The cat shards action is expected to fail with a TaskCancelledException, but it received a successful response instead."
116+
);
117+
}
118+
119+
@Override
120+
public void onFailure(Exception e) {
121+
assertSame(e.getClass(), TaskCancelledException.class);
122+
assertEquals(e.getMessage(), "Cancellation timeout of " + timeoutInterval + " is expired");
123+
latch.countDown();
124+
}
125+
});
126+
latch.await();
127+
}
128+
129+
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@
8585
import org.opensearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
8686
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
8787
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
88+
import org.opensearch.action.admin.cluster.shards.CatShardsAction;
8889
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
90+
import org.opensearch.action.admin.cluster.shards.TransportCatShardsAction;
8991
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
9092
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction;
9193
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction;
@@ -646,6 +648,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
646648
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
647649
actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class);
648650
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
651+
actions.register(CatShardsAction.INSTANCE, TransportCatShardsAction.class);
649652
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
650653
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
651654
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.shards;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for cat shards
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class CatShardsAction extends ActionType<CatShardsResponse> {
19+
public static final CatShardsAction INSTANCE = new CatShardsAction();
20+
public static final String NAME = "cluster:monitor/shards";
21+
22+
private CatShardsAction() {
23+
super(NAME, CatShardsResponse::new);
24+
}
25+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.shards;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
13+
import org.opensearch.common.unit.TimeValue;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.tasks.TaskId;
16+
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;
17+
18+
import java.io.IOException;
19+
import java.util.Map;
20+
21+
/**
22+
* A request of _cat/shards.
23+
*
24+
* @opensearch.api
25+
*/
26+
public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsRequest> {
27+
28+
private String[] indices;
29+
private TimeValue cancelAfterTimeInterval;
30+
31+
public CatShardsRequest() {}
32+
33+
public CatShardsRequest(StreamInput in) throws IOException {
34+
super(in);
35+
}
36+
37+
@Override
38+
public ActionRequestValidationException validate() {
39+
return null;
40+
}
41+
42+
public void setIndices(String[] indices) {
43+
this.indices = indices;
44+
}
45+
46+
public String[] getIndices() {
47+
return this.indices;
48+
}
49+
50+
public void setCancelAfterTimeInterval(TimeValue timeout) {
51+
this.cancelAfterTimeInterval = timeout;
52+
}
53+
54+
public TimeValue getCancelAfterTimeInterval() {
55+
return this.cancelAfterTimeInterval;
56+
}
57+
58+
@Override
59+
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
61+
}
62+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.shards;
10+
11+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
12+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.core.action.ActionResponse;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.common.io.stream.StreamOutput;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* A response of a cat shards request.
21+
*
22+
* @opensearch.api
23+
*/
24+
public class CatShardsResponse extends ActionResponse {
25+
26+
private ClusterStateResponse clusterStateResponse = null;
27+
28+
private IndicesStatsResponse indicesStatsResponse = null;
29+
30+
public CatShardsResponse() {}
31+
32+
public CatShardsResponse(StreamInput in) throws IOException {
33+
super(in);
34+
}
35+
36+
@Override
37+
public void writeTo(StreamOutput out) throws IOException {
38+
clusterStateResponse.writeTo(out);
39+
indicesStatsResponse.writeTo(out);
40+
}
41+
42+
public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
43+
this.clusterStateResponse = clusterStateResponse;
44+
}
45+
46+
public ClusterStateResponse getClusterStateResponse() {
47+
return this.clusterStateResponse;
48+
}
49+
50+
public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
51+
this.indicesStatsResponse = indicesStatsResponse;
52+
}
53+
54+
public IndicesStatsResponse getIndicesStatsResponse() {
55+
return this.indicesStatsResponse;
56+
}
57+
}

0 commit comments

Comments
 (0)