Skip to content

Commit 8300777

Browse files
author
sukriti sinha
committed
Created an API to fetch remote store segment data
Signed-off-by: sukriti sinha <[email protected]>
1 parent 58c281f commit 8300777

20 files changed

+1856
-0
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataResponse;
12+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreShardMetadata;
13+
import org.opensearch.cluster.ClusterState;
14+
import org.opensearch.cluster.node.DiscoveryNode;
15+
import org.opensearch.plugins.Plugin;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
import org.opensearch.test.transport.MockTransportService.TestPlugin;
18+
19+
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
24+
25+
import static org.hamcrest.Matchers.allOf;
26+
import static org.hamcrest.Matchers.hasKey;
27+
28+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
29+
public class RemoteStoreMetadataIT extends RemoteStoreBaseIntegTestCase {
30+
31+
private static final String INDEX_NAME = "remote-store-meta-api-test";
32+
33+
@Override
34+
protected Collection<Class<? extends Plugin>> nodePlugins() {
35+
return Stream.concat(super.nodePlugins().stream(), Stream.of(TestPlugin.class)).collect(Collectors.toList());
36+
}
37+
38+
public void setup() {
39+
internalCluster().startNodes(3);
40+
}
41+
42+
@SuppressWarnings("unchecked")
43+
public void testMetadataResponseFromAllNodes() {
44+
setup();
45+
46+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 3));
47+
ensureGreen(INDEX_NAME);
48+
indexDocs();
49+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
50+
51+
ClusterState state = getClusterState();
52+
List<String> nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList());
53+
54+
for (String node : nodes) {
55+
RemoteStoreMetadataResponse response = client(node).admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
56+
assertTrue(response.getSuccessfulShards() > 0);
57+
assertNotNull(response.groupByIndexAndShards());
58+
59+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
60+
shardMap.forEach((shardId, metadataList) -> {
61+
assertFalse(metadataList.isEmpty());
62+
63+
for (RemoteStoreShardMetadata metadata : metadataList) {
64+
assertEquals(index, metadata.getIndexName());
65+
assertEquals((int) shardId, metadata.getShardId());
66+
67+
assertNotNull(metadata.getLatestSegmentMetadataFileName());
68+
assertNotNull(metadata.getLatestTranslogMetadataFileName());
69+
70+
Map<String, Map<String, Object>> segmentFiles = metadata.getSegmentMetadataFiles();
71+
assertNotNull(segmentFiles);
72+
assertFalse(segmentFiles.isEmpty());
73+
74+
for (Map<String, Object> fileMeta : segmentFiles.values()) {
75+
Map<String, Object> files = (Map<String, Object>) fileMeta.get("files");
76+
assertNotNull(files);
77+
assertFalse(files.isEmpty());
78+
for (Object value : files.values()) {
79+
Map<String, Object> meta = (Map<String, Object>) value;
80+
assertThat(meta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length")));
81+
}
82+
83+
Map<String, Object> checkpoint = (Map<String, Object>) fileMeta.get("replication_checkpoint");
84+
assertNotNull(checkpoint);
85+
assertThat(
86+
checkpoint,
87+
allOf(
88+
hasKey("primary_term"),
89+
hasKey("segments_gen"),
90+
hasKey("segment_infos_version"),
91+
hasKey("codec"),
92+
hasKey("created_timestamp")
93+
)
94+
);
95+
}
96+
97+
Map<String, Map<String, Object>> translogFiles = metadata.getTranslogMetadataFiles();
98+
assertNotNull(translogFiles);
99+
assertFalse(translogFiles.isEmpty());
100+
for (Map<String, Object> translogMeta : translogFiles.values()) {
101+
assertThat(
102+
translogMeta,
103+
allOf(
104+
hasKey("primary_term"),
105+
hasKey("generation"),
106+
hasKey("min_translog_gen"),
107+
hasKey("generation_to_primary_term")
108+
)
109+
);
110+
}
111+
}
112+
});
113+
});
114+
}
115+
}
116+
117+
@SuppressWarnings("unchecked")
118+
public void testMetadataResponseAllShards() throws Exception {
119+
setup();
120+
121+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 2));
122+
ensureGreen(INDEX_NAME);
123+
indexDocs();
124+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
125+
126+
assertBusy(() -> { assertFalse(client().admin().cluster().prepareHealth(INDEX_NAME).get().isTimedOut()); });
127+
128+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
129+
130+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
131+
shardMap.forEach((shardId, metadataList) -> {
132+
assertFalse(metadataList.isEmpty());
133+
134+
for (RemoteStoreShardMetadata metadata : metadataList) {
135+
assertEquals(index, metadata.getIndexName());
136+
assertEquals((int) shardId, metadata.getShardId());
137+
138+
assertNotNull(metadata.getLatestSegmentMetadataFileName());
139+
assertNotNull(metadata.getLatestTranslogMetadataFileName());
140+
141+
Map<String, Map<String, Object>> segmentFiles = metadata.getSegmentMetadataFiles();
142+
assertNotNull(segmentFiles);
143+
assertFalse(segmentFiles.isEmpty());
144+
145+
for (Map<String, Object> fileMeta : segmentFiles.values()) {
146+
Map<String, Object> files = (Map<String, Object>) fileMeta.get("files");
147+
assertNotNull(files);
148+
assertFalse(files.isEmpty());
149+
for (Object value : files.values()) {
150+
Map<String, Object> meta = (Map<String, Object>) value;
151+
assertThat(meta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length")));
152+
}
153+
154+
Map<String, Object> checkpoint = (Map<String, Object>) fileMeta.get("replication_checkpoint");
155+
assertNotNull(checkpoint);
156+
assertThat(
157+
checkpoint,
158+
allOf(
159+
hasKey("primary_term"),
160+
hasKey("segments_gen"),
161+
hasKey("segment_infos_version"),
162+
hasKey("codec"),
163+
hasKey("created_timestamp")
164+
)
165+
);
166+
}
167+
168+
Map<String, Map<String, Object>> translogFiles = metadata.getTranslogMetadataFiles();
169+
assertNotNull(translogFiles);
170+
assertFalse(translogFiles.isEmpty());
171+
for (Map<String, Object> translogMeta : translogFiles.values()) {
172+
assertThat(
173+
translogMeta,
174+
allOf(
175+
hasKey("primary_term"),
176+
hasKey("generation"),
177+
hasKey("min_translog_gen"),
178+
hasKey("generation_to_primary_term")
179+
)
180+
);
181+
}
182+
}
183+
});
184+
});
185+
}
186+
187+
public void testMultipleMetadataFilesPerShard() throws Exception {
188+
setup();
189+
190+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
191+
ensureGreen(INDEX_NAME);
192+
193+
int refreshCount = 5;
194+
for (int i = 0; i < refreshCount; i++) {
195+
indexDocs();
196+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
197+
Thread.sleep(100);
198+
}
199+
200+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
201+
202+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
203+
shardMap.forEach((shardId, metadataList) -> {
204+
assertFalse(metadataList.isEmpty());
205+
206+
for (RemoteStoreShardMetadata metadata : metadataList) {
207+
assertEquals(refreshCount, metadata.getSegmentMetadataFiles().size());
208+
assertTrue(metadata.getTranslogMetadataFiles().size() >= 1);
209+
}
210+
});
211+
});
212+
}
213+
214+
public void testMetadataResponseMultipleIndicesAndShards() throws Exception {
215+
setup();
216+
217+
String index1 = INDEX_NAME + "-1";
218+
String index2 = INDEX_NAME + "-2";
219+
220+
createIndex(index1, remoteStoreIndexSettings(0, 2));
221+
createIndex(index2, remoteStoreIndexSettings(0, 3));
222+
ensureGreen(index1, index2);
223+
224+
indexDocs(index1);
225+
indexDocs(index2);
226+
227+
client().admin().indices().prepareRefresh(index1).get();
228+
client().admin().indices().prepareRefresh(index2).get();
229+
230+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata("*", null).get();
231+
232+
Map<String, Map<Integer, List<RemoteStoreShardMetadata>>> grouped = response.groupByIndexAndShards();
233+
234+
assertTrue(grouped.containsKey(index1));
235+
assertTrue(grouped.containsKey(index2));
236+
237+
grouped.forEach((index, shardMap) -> {
238+
shardMap.forEach((shardId, metadataList) -> {
239+
assertFalse(metadataList.isEmpty());
240+
metadataList.forEach(metadata -> {
241+
assertEquals(index, metadata.getIndexName());
242+
assertEquals((int) shardId, metadata.getShardId());
243+
assertNotNull(metadata.getSegmentMetadataFiles());
244+
assertFalse(metadata.getSegmentMetadataFiles().isEmpty());
245+
assertNotNull(metadata.getTranslogMetadataFiles());
246+
assertFalse(metadata.getTranslogMetadataFiles().isEmpty());
247+
});
248+
});
249+
});
250+
}
251+
252+
private void indexDocs() {
253+
indexDocs(INDEX_NAME);
254+
}
255+
256+
private void indexDocs(String indexName) {
257+
for (int i = 0; i < randomIntBetween(10, 20); i++) {
258+
client().prepareIndex(indexName).setId("doc-" + i).setSource("field", "value-" + i).get();
259+
}
260+
}
261+
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
6868
import org.opensearch.action.admin.cluster.remote.RemoteInfoAction;
6969
import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction;
70+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataAction;
71+
import org.opensearch.action.admin.cluster.remotestore.metadata.TransportRemoteStoreMetadataAction;
7072
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction;
7173
import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction;
7274
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsAction;
@@ -378,6 +380,7 @@
378380
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
379381
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
380382
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
383+
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreMetadataAction;
381384
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction;
382385
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
383386
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
@@ -638,6 +641,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
638641
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
639642
actions.register(WlmStatsAction.INSTANCE, TransportWlmStatsAction.class);
640643
actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class);
644+
actions.register(RemoteStoreMetadataAction.INSTANCE, TransportRemoteStoreMetadataAction.class);
641645
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
642646
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
643647
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
@@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
10531057
registerHandler.accept(new RestGetDecommissionStateAction());
10541058
registerHandler.accept(new RestRemoteStoreStatsAction());
10551059
registerHandler.accept(new RestRestoreRemoteStoreAction());
1060+
registerHandler.accept(new RestRemoteStoreMetadataAction());
10561061

10571062
// pull-based ingestion API
10581063
registerHandler.accept(new RestPauseIngestionAction());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Action to fetch metadata from remote store
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class RemoteStoreMetadataAction extends ActionType<RemoteStoreMetadataResponse> {
19+
public static final RemoteStoreMetadataAction INSTANCE = new RemoteStoreMetadataAction();
20+
public static final String NAME = "cluster:admin/remote_store/metadata";
21+
22+
private RemoteStoreMetadataAction() {
23+
super(NAME, RemoteStoreMetadataResponse::new);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.support.broadcast.BroadcastRequest;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.StreamOutput;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* Request object for fetching remote store metadata of shards across one or more indices.
20+
*
21+
* @opensearch.internal
22+
*/
23+
@ExperimentalApi
24+
public class RemoteStoreMetadataRequest extends BroadcastRequest<RemoteStoreMetadataRequest> {
25+
private String[] shards;
26+
27+
public RemoteStoreMetadataRequest() {
28+
super((String[]) null);
29+
shards = new String[0];
30+
}
31+
32+
public RemoteStoreMetadataRequest(StreamInput in) throws IOException {
33+
super(in);
34+
shards = in.readStringArray();
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
super.writeTo(out);
40+
out.writeStringArray(shards);
41+
}
42+
43+
public RemoteStoreMetadataRequest shards(String... shards) {
44+
this.shards = shards;
45+
return this;
46+
}
47+
48+
public String[] shards() {
49+
return this.shards;
50+
}
51+
}

0 commit comments

Comments
 (0)