Skip to content

Commit 959d17e

Browse files
committed
Implementation of RemoteWritableEntity for objects to uploaded to remote store (opensearch-project#13834)
* Implementation of RemoteWritableEntity for objects to uploaded to remote store Signed-off-by: Sooraj Sinha <[email protected]>
1 parent 8cf895b commit 959d17e

17 files changed

+2408
-0
lines changed

server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.common.remote;
1010

11+
import org.opensearch.common.annotation.ExperimentalApi;
1112
import org.opensearch.core.action.ActionListener;
1213

1314
import java.io.IOException;
@@ -18,6 +19,7 @@
1819
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
1920
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
2021
*/
22+
@ExperimentalApi
2123
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {
2224

2325
public void writeAsync(U entity, ActionListener<Void> listener);

server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.opensearch.common.remote;
1010

11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
1113
import java.io.IOException;
1214
import java.io.InputStream;
1315

@@ -17,6 +19,7 @@
1719
*
1820
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
1921
*/
22+
@ExperimentalApi
2023
public interface RemoteWriteableEntity<T> {
2124
/**
2225
* @return An InputStream created by serializing the entity T

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,30 @@
88

99
package org.opensearch.gateway.remote;
1010

11+
import org.opensearch.cluster.metadata.Metadata;
12+
import org.opensearch.core.xcontent.ToXContent;
13+
1114
import java.nio.charset.StandardCharsets;
1215
import java.util.Base64;
16+
import java.util.Map;
1317

1418
/**
1519
* Utility class for Remote Cluster State
1620
*/
1721
public class RemoteClusterStateUtils {
22+
23+
public static final String DELIMITER = "__";
1824
public static final String PATH_DELIMITER = "/";
25+
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
26+
public static final String METADATA_NAME_FORMAT = "%s.dat";
27+
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
28+
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;
29+
30+
// ToXContent Params with gateway mode.
31+
// We are using gateway context mode to persist all custom metadata.
32+
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(
33+
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
34+
);
1935

2036
public static String encodeString(String content) {
2137
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote.model;
10+
11+
import org.opensearch.common.io.Streams;
12+
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
13+
import org.opensearch.common.remote.BlobPathParameters;
14+
import org.opensearch.core.compress.Compressor;
15+
import org.opensearch.core.xcontent.NamedXContentRegistry;
16+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
17+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
18+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
19+
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
20+
import org.opensearch.index.remote.RemoteStoreUtils;
21+
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
22+
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.util.List;
26+
27+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
28+
29+
/**
30+
* Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store
31+
*/
32+
public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEntity<ClusterMetadataManifest> {
33+
34+
public static final String MANIFEST = "manifest";
35+
public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6;
36+
37+
public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
38+
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
39+
public static final String COMMITTED = "C";
40+
public static final String PUBLISHED = "P";
41+
42+
/**
43+
* Manifest format compatible with older codec v0, where codec version was missing.
44+
*/
45+
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 =
46+
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);
47+
/**
48+
* Manifest format compatible with older codec v1, where global metadata was missing.
49+
*/
50+
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
51+
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);
52+
53+
/**
54+
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata.
55+
*/
56+
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
57+
"cluster-metadata-manifest",
58+
METADATA_MANIFEST_NAME_FORMAT,
59+
ClusterMetadataManifest::fromXContent
60+
);
61+
62+
private ClusterMetadataManifest clusterMetadataManifest;
63+
64+
public RemoteClusterMetadataManifest(
65+
final ClusterMetadataManifest clusterMetadataManifest,
66+
final String clusterUUID,
67+
final Compressor compressor,
68+
final NamedXContentRegistry namedXContentRegistry
69+
) {
70+
super(clusterUUID, compressor, namedXContentRegistry);
71+
this.clusterMetadataManifest = clusterMetadataManifest;
72+
}
73+
74+
public RemoteClusterMetadataManifest(
75+
final String blobName,
76+
final String clusterUUID,
77+
final Compressor compressor,
78+
final NamedXContentRegistry namedXContentRegistry
79+
) {
80+
super(clusterUUID, compressor, namedXContentRegistry);
81+
this.blobName = blobName;
82+
}
83+
84+
@Override
85+
public BlobPathParameters getBlobPathParameters() {
86+
return new BlobPathParameters(List.of(MANIFEST), MANIFEST);
87+
}
88+
89+
@Override
90+
public String generateBlobFileName() {
91+
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__
92+
// <codec_version>
93+
String blobFileName = String.join(
94+
DELIMITER,
95+
MANIFEST,
96+
RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()),
97+
RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()),
98+
(clusterMetadataManifest.isCommitted() ? COMMITTED : PUBLISHED),
99+
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
100+
String.valueOf(clusterMetadataManifest.getCodecVersion())
101+
// Keep the codec version at last place only, during we read last place to determine codec version.
102+
);
103+
this.blobFileName = blobFileName;
104+
return blobFileName;
105+
}
106+
107+
@Override
108+
public UploadedMetadata getUploadedMetadata() {
109+
assert blobName != null;
110+
return new UploadedMetadataAttribute(MANIFEST, blobName);
111+
}
112+
113+
@Override
114+
public InputStream serialize() throws IOException {
115+
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
116+
clusterMetadataManifest,
117+
generateBlobFileName(),
118+
getCompressor(),
119+
RemoteClusterStateUtils.FORMAT_PARAMS
120+
).streamInput();
121+
}
122+
123+
@Override
124+
public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException {
125+
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat();
126+
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
127+
}
128+
129+
private int getManifestCodecVersion() {
130+
assert blobName != null;
131+
String[] splitName = blobName.split(DELIMITER);
132+
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) {
133+
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
134+
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
135+
// is used.
136+
return ClusterMetadataManifest.CODEC_V0;
137+
} else {
138+
throw new IllegalArgumentException("Manifest file name is corrupted");
139+
}
140+
}
141+
142+
private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() {
143+
long codecVersion = getManifestCodecVersion();
144+
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
145+
return CLUSTER_METADATA_MANIFEST_FORMAT;
146+
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
147+
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
148+
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
149+
return CLUSTER_METADATA_MANIFEST_FORMAT_V0;
150+
}
151+
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version");
152+
}
153+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote.model;
10+
11+
import org.opensearch.cluster.coordination.CoordinationMetadata;
12+
import org.opensearch.common.io.Streams;
13+
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
14+
import org.opensearch.common.remote.BlobPathParameters;
15+
import org.opensearch.core.compress.Compressor;
16+
import org.opensearch.core.xcontent.NamedXContentRegistry;
17+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
18+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
19+
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
20+
import org.opensearch.index.remote.RemoteStoreUtils;
21+
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
22+
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.util.List;
26+
27+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
28+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION;
29+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT;
30+
31+
/**
32+
* Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store
33+
*/
34+
public class RemoteCoordinationMetadata extends AbstractRemoteWritableBlobEntity<CoordinationMetadata> {
35+
36+
public static final String COORDINATION_METADATA = "coordination";
37+
public static final ChecksumBlobStoreFormat<CoordinationMetadata> COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
38+
"coordination",
39+
METADATA_NAME_PLAIN_FORMAT,
40+
CoordinationMetadata::fromXContent
41+
);
42+
43+
private CoordinationMetadata coordinationMetadata;
44+
private long metadataVersion;
45+
46+
public RemoteCoordinationMetadata(
47+
final CoordinationMetadata coordinationMetadata,
48+
final long metadataVersion,
49+
final String clusterUUID,
50+
final Compressor compressor,
51+
final NamedXContentRegistry namedXContentRegistry
52+
) {
53+
super(clusterUUID, compressor, namedXContentRegistry);
54+
this.coordinationMetadata = coordinationMetadata;
55+
this.metadataVersion = metadataVersion;
56+
}
57+
58+
public RemoteCoordinationMetadata(
59+
final String blobName,
60+
final String clusterUUID,
61+
final Compressor compressor,
62+
final NamedXContentRegistry namedXContentRegistry
63+
) {
64+
super(clusterUUID, compressor, namedXContentRegistry);
65+
this.blobName = blobName;
66+
}
67+
68+
@Override
69+
public BlobPathParameters getBlobPathParameters() {
70+
return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA);
71+
}
72+
73+
@Override
74+
public String generateBlobFileName() {
75+
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version>
76+
String blobFileName = String.join(
77+
DELIMITER,
78+
getBlobPathParameters().getFilePrefix(),
79+
RemoteStoreUtils.invertLong(metadataVersion),
80+
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
81+
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION)
82+
);
83+
this.blobFileName = blobFileName;
84+
return blobFileName;
85+
}
86+
87+
@Override
88+
public InputStream serialize() throws IOException {
89+
return COORDINATION_METADATA_FORMAT.serialize(
90+
coordinationMetadata,
91+
generateBlobFileName(),
92+
getCompressor(),
93+
RemoteClusterStateUtils.FORMAT_PARAMS
94+
).streamInput();
95+
}
96+
97+
@Override
98+
public CoordinationMetadata deserialize(final InputStream inputStream) throws IOException {
99+
return COORDINATION_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
100+
}
101+
102+
@Override
103+
public UploadedMetadata getUploadedMetadata() {
104+
assert blobName != null;
105+
return new UploadedMetadataAttribute(COORDINATION_METADATA, blobName);
106+
}
107+
}

0 commit comments

Comments
 (0)