-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Implementation of RemoteWritableEntity for objects to uploaded to remote store #13834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.common.io.Streams; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.common.remote.BlobPathParameters; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; | ||
import org.opensearch.gateway.remote.RemoteClusterStateUtils; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.List; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; | ||
|
||
/** | ||
* Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store | ||
*/ | ||
public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEntity<ClusterMetadataManifest> { | ||
|
||
public static final String MANIFEST = "manifest"; | ||
public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; | ||
|
||
public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; | ||
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; | ||
shwetathareja marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public static final String COMMITTED = "C"; | ||
public static final String PUBLISHED = "P"; | ||
|
||
/** | ||
* Manifest format compatible with older codec v0, where codec version was missing. | ||
*/ | ||
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 = | ||
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); | ||
/** | ||
* Manifest format compatible with older codec v1, where global metadata was missing. | ||
*/ | ||
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 = | ||
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); | ||
|
||
/** | ||
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata. | ||
*/ | ||
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( | ||
"cluster-metadata-manifest", | ||
METADATA_MANIFEST_NAME_FORMAT, | ||
ClusterMetadataManifest::fromXContent | ||
); | ||
|
||
private ClusterMetadataManifest clusterMetadataManifest; | ||
|
||
public RemoteClusterMetadataManifest( | ||
final ClusterMetadataManifest clusterMetadataManifest, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.clusterMetadataManifest = clusterMetadataManifest; | ||
} | ||
|
||
public RemoteClusterMetadataManifest( | ||
final String blobName, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.blobName = blobName; | ||
} | ||
|
||
@Override | ||
public BlobPathParameters getBlobPathParameters() { | ||
return new BlobPathParameters(List.of(MANIFEST), MANIFEST); | ||
} | ||
|
||
@Override | ||
public String generateBlobFileName() { | ||
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__ | ||
// <codec_version> | ||
String blobFileName = String.join( | ||
DELIMITER, | ||
MANIFEST, | ||
RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()), | ||
RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()), | ||
(clusterMetadataManifest.isCommitted() ? COMMITTED : PUBLISHED), | ||
RemoteStoreUtils.invertLong(System.currentTimeMillis()), | ||
String.valueOf(clusterMetadataManifest.getCodecVersion()) | ||
// Keep the codec version at last place only, during we read last place to determine codec version. | ||
); | ||
this.blobFileName = blobFileName; | ||
return blobFileName; | ||
} | ||
|
||
@Override | ||
public UploadedMetadata getUploadedMetadata() { | ||
assert blobName != null; | ||
return new UploadedMetadataAttribute(MANIFEST, blobName); | ||
} | ||
|
||
@Override | ||
public InputStream serialize() throws IOException { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize( | ||
clusterMetadataManifest, | ||
generateBlobFileName(), | ||
getCompressor(), | ||
RemoteClusterStateUtils.FORMAT_PARAMS | ||
).streamInput(); | ||
} | ||
|
||
@Override | ||
public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException { | ||
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat(); | ||
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); | ||
} | ||
|
||
private int getManifestCodecVersion() { | ||
assert blobName != null; | ||
String[] splitName = blobName.split(DELIMITER); | ||
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) { | ||
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. | ||
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 | ||
// is used. | ||
return ClusterMetadataManifest.CODEC_V0; | ||
Check warning on line 136 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java
|
||
} else { | ||
throw new IllegalArgumentException("Manifest file name is corrupted"); | ||
} | ||
} | ||
|
||
private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() { | ||
long codecVersion = getManifestCodecVersion(); | ||
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT; | ||
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT_V1; | ||
Check warning on line 147 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java
|
||
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { | ||
return CLUSTER_METADATA_MANIFEST_FORMAT_V0; | ||
Check warning on line 149 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java
|
||
} | ||
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); | ||
Check warning on line 151 in server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java
|
||
soosinha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.cluster.coordination.CoordinationMetadata; | ||
import org.opensearch.common.io.Streams; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.common.remote.BlobPathParameters; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; | ||
import org.opensearch.gateway.remote.RemoteClusterStateUtils; | ||
import org.opensearch.index.remote.RemoteStoreUtils; | ||
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.List; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; | ||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; | ||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; | ||
|
||
/** | ||
* Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store | ||
*/ | ||
public class RemoteCoordinationMetadata extends AbstractRemoteWritableBlobEntity<CoordinationMetadata> { | ||
|
||
public static final String COORDINATION_METADATA = "coordination"; | ||
public static final ChecksumBlobStoreFormat<CoordinationMetadata> COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( | ||
"coordination", | ||
METADATA_NAME_PLAIN_FORMAT, | ||
CoordinationMetadata::fromXContent | ||
); | ||
|
||
private CoordinationMetadata coordinationMetadata; | ||
private long metadataVersion; | ||
|
||
public RemoteCoordinationMetadata( | ||
final CoordinationMetadata coordinationMetadata, | ||
final long metadataVersion, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.coordinationMetadata = coordinationMetadata; | ||
this.metadataVersion = metadataVersion; | ||
} | ||
|
||
public RemoteCoordinationMetadata( | ||
final String blobName, | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
super(clusterUUID, compressor, namedXContentRegistry); | ||
this.blobName = blobName; | ||
} | ||
|
||
@Override | ||
public BlobPathParameters getBlobPathParameters() { | ||
return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA); | ||
} | ||
|
||
@Override | ||
public String generateBlobFileName() { | ||
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version> | ||
soosinha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
String blobFileName = String.join( | ||
DELIMITER, | ||
getBlobPathParameters().getFilePrefix(), | ||
RemoteStoreUtils.invertLong(metadataVersion), | ||
RemoteStoreUtils.invertLong(System.currentTimeMillis()), | ||
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) | ||
); | ||
this.blobFileName = blobFileName; | ||
return blobFileName; | ||
} | ||
|
||
@Override | ||
public InputStream serialize() throws IOException { | ||
return COORDINATION_METADATA_FORMAT.serialize( | ||
coordinationMetadata, | ||
generateBlobFileName(), | ||
getCompressor(), | ||
RemoteClusterStateUtils.FORMAT_PARAMS | ||
).streamInput(); | ||
} | ||
|
||
@Override | ||
public CoordinationMetadata deserialize(final InputStream inputStream) throws IOException { | ||
return COORDINATION_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); | ||
} | ||
|
||
@Override | ||
public UploadedMetadata getUploadedMetadata() { | ||
assert blobName != null; | ||
return new UploadedMetadataAttribute(COORDINATION_METADATA, blobName); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.