Skip to content

Commit b51c182

Browse files
author
sukriti sinha
committed
Created an API to fetch remote store segment data
Signed-off-by: sukriti sinha <[email protected]>
1 parent 58c281f commit b51c182

22 files changed

+1851
-0
lines changed
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataResponse;
13+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreShardMetadata;
14+
import org.opensearch.cluster.ClusterState;
15+
import org.opensearch.cluster.metadata.IndexMetadata;
16+
import org.opensearch.cluster.node.DiscoveryNode;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.plugins.Plugin;
19+
import org.opensearch.test.OpenSearchIntegTestCase;
20+
import org.opensearch.test.transport.MockTransportService.TestPlugin;
21+
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.Locale;
25+
import java.util.Map;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.Stream;
28+
29+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
30+
import static org.hamcrest.Matchers.allOf;
31+
import static org.hamcrest.Matchers.hasKey;
32+
33+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
34+
public class RemoteStoreMetadataIT extends RemoteStoreBaseIntegTestCase {
35+
36+
private static final String INDEX_NAME = "remote-store-meta-api-test";
37+
38+
@Override
39+
protected Collection<Class<? extends Plugin>> nodePlugins() {
40+
return Stream.concat(super.nodePlugins().stream(), Stream.of(TestPlugin.class)).collect(Collectors.toList());
41+
}
42+
43+
public void setup() {
44+
internalCluster().startNodes(3);
45+
}
46+
47+
@SuppressWarnings("unchecked")
48+
public void testMetadataResponseFromAllNodes() {
49+
setup();
50+
51+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 3));
52+
ensureGreen(INDEX_NAME);
53+
indexDocs();
54+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
55+
56+
ClusterState state = getClusterState();
57+
List<String> nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList());
58+
59+
for (String node : nodes) {
60+
RemoteStoreMetadataResponse response = client(node).admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
61+
assertTrue(response.getSuccessfulShards() > 0);
62+
assertNotNull(response.groupByIndexAndShards());
63+
64+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
65+
shardMap.forEach((shardId, metadataList) -> {
66+
assertFalse(metadataList.isEmpty());
67+
68+
for (RemoteStoreShardMetadata metadata : metadataList) {
69+
assertEquals(index, metadata.getIndexName());
70+
assertEquals((int) shardId, metadata.getShardId());
71+
72+
assertNotNull(metadata.getLatestSegmentMetadataFileName());
73+
assertNotNull(metadata.getLatestTranslogMetadataFileName());
74+
75+
Map<String, Map<String, Object>> segmentFiles = metadata.getSegmentMetadataFiles();
76+
assertNotNull(segmentFiles);
77+
assertFalse(segmentFiles.isEmpty());
78+
79+
for (Map<String, Object> fileMeta : segmentFiles.values()) {
80+
Map<String, Object> files = (Map<String, Object>) fileMeta.get("files");
81+
assertNotNull(files);
82+
assertFalse(files.isEmpty());
83+
for (Object value : files.values()) {
84+
Map<String, Object> meta = (Map<String, Object>) value;
85+
assertThat(meta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length")));
86+
}
87+
88+
Map<String, Object> checkpoint = (Map<String, Object>) fileMeta.get("replication_checkpoint");
89+
assertNotNull(checkpoint);
90+
assertThat(
91+
checkpoint,
92+
allOf(
93+
hasKey("primary_term"),
94+
hasKey("segments_gen"),
95+
hasKey("segment_infos_version"),
96+
hasKey("codec"),
97+
hasKey("created_timestamp")
98+
)
99+
);
100+
}
101+
102+
Map<String, Map<String, Object>> translogFiles = metadata.getTranslogMetadataFiles();
103+
assertNotNull(translogFiles);
104+
assertFalse(translogFiles.isEmpty());
105+
for (Map<String, Object> translogMeta : translogFiles.values()) {
106+
assertThat(
107+
translogMeta,
108+
allOf(
109+
hasKey("primary_term"),
110+
hasKey("generation"),
111+
hasKey("min_translog_gen"),
112+
hasKey("generation_to_primary_term")
113+
)
114+
);
115+
}
116+
}
117+
});
118+
});
119+
}
120+
}
121+
122+
@SuppressWarnings("unchecked")
123+
public void testMetadataResponseAllShards() throws Exception {
124+
setup();
125+
126+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 2));
127+
ensureGreen(INDEX_NAME);
128+
indexDocs();
129+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
130+
131+
assertBusy(() -> { assertFalse(client().admin().cluster().prepareHealth(INDEX_NAME).get().isTimedOut()); });
132+
133+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
134+
135+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
136+
shardMap.forEach((shardId, metadataList) -> {
137+
assertFalse(metadataList.isEmpty());
138+
139+
for (RemoteStoreShardMetadata metadata : metadataList) {
140+
assertEquals(index, metadata.getIndexName());
141+
assertEquals((int) shardId, metadata.getShardId());
142+
143+
assertNotNull(metadata.getLatestSegmentMetadataFileName());
144+
assertNotNull(metadata.getLatestTranslogMetadataFileName());
145+
146+
Map<String, Map<String, Object>> segmentFiles = metadata.getSegmentMetadataFiles();
147+
assertNotNull(segmentFiles);
148+
assertFalse(segmentFiles.isEmpty());
149+
150+
for (Map<String, Object> fileMeta : segmentFiles.values()) {
151+
Map<String, Object> files = (Map<String, Object>) fileMeta.get("files");
152+
assertNotNull(files);
153+
assertFalse(files.isEmpty());
154+
for (Object value : files.values()) {
155+
Map<String, Object> meta = (Map<String, Object>) value;
156+
assertThat(meta, allOf(hasKey("original_name"), hasKey("checksum"), hasKey("length")));
157+
}
158+
159+
Map<String, Object> checkpoint = (Map<String, Object>) fileMeta.get("replication_checkpoint");
160+
assertNotNull(checkpoint);
161+
assertThat(
162+
checkpoint,
163+
allOf(
164+
hasKey("primary_term"),
165+
hasKey("segments_gen"),
166+
hasKey("segment_infos_version"),
167+
hasKey("codec"),
168+
hasKey("created_timestamp")
169+
)
170+
);
171+
}
172+
173+
Map<String, Map<String, Object>> translogFiles = metadata.getTranslogMetadataFiles();
174+
assertNotNull(translogFiles);
175+
assertFalse(translogFiles.isEmpty());
176+
for (Map<String, Object> translogMeta : translogFiles.values()) {
177+
assertThat(
178+
translogMeta,
179+
allOf(
180+
hasKey("primary_term"),
181+
hasKey("generation"),
182+
hasKey("min_translog_gen"),
183+
hasKey("generation_to_primary_term")
184+
)
185+
);
186+
}
187+
}
188+
});
189+
});
190+
}
191+
192+
public void testMultipleMetadataFilesPerShard() throws Exception {
193+
setup();
194+
195+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
196+
ensureGreen(INDEX_NAME);
197+
198+
int refreshCount = 5;
199+
for (int i = 0; i < refreshCount; i++) {
200+
indexDocs();
201+
client().admin().indices().prepareRefresh(INDEX_NAME).get();
202+
Thread.sleep(100);
203+
}
204+
205+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata(INDEX_NAME, null).get();
206+
207+
response.groupByIndexAndShards().forEach((index, shardMap) -> {
208+
shardMap.forEach((shardId, metadataList) -> {
209+
assertFalse(metadataList.isEmpty());
210+
211+
for (RemoteStoreShardMetadata metadata : metadataList) {
212+
assertEquals(refreshCount, metadata.getSegmentMetadataFiles().size());
213+
assertTrue(metadata.getTranslogMetadataFiles().size() >= 1);
214+
}
215+
});
216+
});
217+
}
218+
219+
public void testMetadataResponseMultipleIndicesAndShards() throws Exception {
220+
setup();
221+
222+
String index1 = INDEX_NAME + "-1";
223+
String index2 = INDEX_NAME + "-2";
224+
225+
createIndex(index1, remoteStoreIndexSettings(0, 2));
226+
createIndex(index2, remoteStoreIndexSettings(0, 3));
227+
ensureGreen(index1, index2);
228+
229+
indexDocs(index1);
230+
indexDocs(index2);
231+
232+
client().admin().indices().prepareRefresh(index1).get();
233+
client().admin().indices().prepareRefresh(index2).get();
234+
235+
RemoteStoreMetadataResponse response = client().admin().cluster().prepareRemoteStoreMetadata("*", null).get();
236+
237+
Map<String, Map<Integer, List<RemoteStoreShardMetadata>>> grouped = response.groupByIndexAndShards();
238+
239+
assertTrue(grouped.containsKey(index1));
240+
assertTrue(grouped.containsKey(index2));
241+
242+
grouped.forEach((index, shardMap) -> {
243+
shardMap.forEach((shardId, metadataList) -> {
244+
assertFalse(metadataList.isEmpty());
245+
metadataList.forEach(metadata -> {
246+
assertEquals(index, metadata.getIndexName());
247+
assertEquals((int) shardId, metadata.getShardId());
248+
assertNotNull(metadata.getSegmentMetadataFiles());
249+
assertFalse(metadata.getSegmentMetadataFiles().isEmpty());
250+
assertNotNull(metadata.getTranslogMetadataFiles());
251+
assertFalse(metadata.getTranslogMetadataFiles().isEmpty());
252+
});
253+
});
254+
});
255+
}
256+
257+
public void testShardFailureWhenRepositoryMissing() throws Exception {
258+
String indexName = "failure-case-index";
259+
260+
Settings.Builder settings = Settings.builder()
261+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
262+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
263+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
264+
.put("index.replication.type", "SEGMENT")
265+
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
266+
.put("index.remote_store.repository", "non-existent-repo")
267+
.put("index.remote_store.translog.enabled", true)
268+
.put("index.remote_store.translog.repository", "non-existent-repo");
269+
270+
Exception exception = expectThrows(Exception.class, () -> assertAcked(prepareCreate(indexName).setSettings(settings)));
271+
272+
assertTrue(exception.getMessage().toLowerCase(Locale.ROOT).contains("repository"));
273+
}
274+
275+
private void indexDocs() {
276+
indexDocs(INDEX_NAME);
277+
}
278+
279+
private void indexDocs(String indexName) {
280+
for (int i = 0; i < randomIntBetween(10, 20); i++) {
281+
client().prepareIndex(indexName).setId("doc-" + i).setSource("field", "value-" + i).get();
282+
}
283+
}
284+
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
6868
import org.opensearch.action.admin.cluster.remote.RemoteInfoAction;
6969
import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction;
70+
import org.opensearch.action.admin.cluster.remotestore.metadata.RemoteStoreMetadataAction;
71+
import org.opensearch.action.admin.cluster.remotestore.metadata.TransportRemoteStoreMetadataAction;
7072
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction;
7173
import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction;
7274
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsAction;
@@ -378,6 +380,7 @@
378380
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
379381
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
380382
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
383+
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreMetadataAction;
381384
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction;
382385
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
383386
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
@@ -638,6 +641,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
638641
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
639642
actions.register(WlmStatsAction.INSTANCE, TransportWlmStatsAction.class);
640643
actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class);
644+
actions.register(RemoteStoreMetadataAction.INSTANCE, TransportRemoteStoreMetadataAction.class);
641645
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
642646
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
643647
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
@@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
10531057
registerHandler.accept(new RestGetDecommissionStateAction());
10541058
registerHandler.accept(new RestRemoteStoreStatsAction());
10551059
registerHandler.accept(new RestRestoreRemoteStoreAction());
1060+
registerHandler.accept(new RestRemoteStoreMetadataAction());
10561061

10571062
// pull-based ingestion API
10581063
registerHandler.accept(new RestPauseIngestionAction());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Action to fetch metadata from remote store
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class RemoteStoreMetadataAction extends ActionType<RemoteStoreMetadataResponse> {
19+
public static final RemoteStoreMetadataAction INSTANCE = new RemoteStoreMetadataAction();
20+
public static final String NAME = "cluster:admin/remote_store/metadata";
21+
22+
private RemoteStoreMetadataAction() {
23+
super(NAME, RemoteStoreMetadataResponse::new);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.metadata;
10+
11+
import org.opensearch.action.support.broadcast.BroadcastRequest;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.StreamOutput;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* Request object for fetching remote store metadata of shards across one or more indices.
20+
*
21+
* @opensearch.internal
22+
*/
23+
@ExperimentalApi
24+
public class RemoteStoreMetadataRequest extends BroadcastRequest<RemoteStoreMetadataRequest> {
25+
private String[] shards;
26+
27+
public RemoteStoreMetadataRequest() {
28+
super((String[]) null);
29+
shards = new String[0];
30+
}
31+
32+
public RemoteStoreMetadataRequest(StreamInput in) throws IOException {
33+
super(in);
34+
shards = in.readStringArray();
35+
}
36+
37+
@Override
38+
public void writeTo(StreamOutput out) throws IOException {
39+
super.writeTo(out);
40+
out.writeStringArray(shards);
41+
}
42+
43+
public RemoteStoreMetadataRequest shards(String... shards) {
44+
this.shards = shards;
45+
return this;
46+
}
47+
48+
public String[] shards() {
49+
return this.shards;
50+
}
51+
}

0 commit comments

Comments
 (0)