|
8 | 8 |
|
9 | 9 | package org.opensearch.gateway.remote.model;
|
10 | 10 |
|
| 11 | +import org.opensearch.Version; |
| 12 | +import org.opensearch.cluster.ClusterModule; |
11 | 13 | import org.opensearch.cluster.metadata.IndexGraveyard;
|
12 | 14 | import org.opensearch.cluster.metadata.Metadata.Custom;
|
13 | 15 | import org.opensearch.common.blobstore.BlobPath;
|
|
16 | 18 | import org.opensearch.common.settings.ClusterSettings;
|
17 | 19 | import org.opensearch.common.settings.Settings;
|
18 | 20 | import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
|
| 21 | +import org.opensearch.core.common.io.stream.NamedWriteableRegistry.Entry; |
| 22 | +import org.opensearch.core.common.io.stream.StreamInput; |
| 23 | +import org.opensearch.core.common.io.stream.StreamOutput; |
19 | 24 | import org.opensearch.core.compress.Compressor;
|
20 | 25 | import org.opensearch.core.compress.NoneCompressor;
|
21 | 26 | import org.opensearch.core.index.Index;
|
| 27 | +import org.opensearch.core.xcontent.XContentBuilder; |
22 | 28 | import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
|
23 | 29 | import org.opensearch.gateway.remote.RemoteClusterStateUtils;
|
24 | 30 | import org.opensearch.index.remote.RemoteStoreUtils;
|
25 | 31 | import org.opensearch.index.translog.transfer.BlobStoreTransferService;
|
| 32 | +import org.opensearch.persistent.PersistentTaskParams; |
| 33 | +import org.opensearch.persistent.PersistentTasksCustomMetadata; |
| 34 | +import org.opensearch.persistent.PersistentTasksCustomMetadata.Assignment; |
26 | 35 | import org.opensearch.repositories.blobstore.BlobStoreRepository;
|
27 | 36 | import org.opensearch.test.OpenSearchTestCase;
|
28 | 37 | import org.opensearch.threadpool.TestThreadPool;
|
|
33 | 42 | import java.io.IOException;
|
34 | 43 | import java.io.InputStream;
|
35 | 44 | import java.util.List;
|
| 45 | +import java.util.Objects; |
36 | 46 |
|
37 | 47 | import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION;
|
38 | 48 | import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER;
|
@@ -216,24 +226,88 @@ public void testGetUploadedMetadata() throws IOException {
|
216 | 226 |
|
217 | 227 | public void testSerDe() throws IOException {
|
218 | 228 | Custom customMetadata = getCustomMetadata();
|
| 229 | + verifySerDe(customMetadata, IndexGraveyard.TYPE); |
| 230 | + } |
| 231 | + |
| 232 | + public void testSerDeForPersistentTasks() throws IOException { |
| 233 | + Custom customMetadata = getPersistentTasksMetadata(); |
| 234 | + verifySerDe(customMetadata, PersistentTasksCustomMetadata.TYPE); |
| 235 | + } |
| 236 | + |
| 237 | + private void verifySerDe(Custom objectToUpload, String objectType) throws IOException { |
219 | 238 | RemoteCustomMetadata remoteObjectForUpload = new RemoteCustomMetadata(
|
220 |
| - customMetadata, |
221 |
| - IndexGraveyard.TYPE, |
| 239 | + objectToUpload, |
| 240 | + objectType, |
222 | 241 | METADATA_VERSION,
|
223 | 242 | clusterUUID,
|
224 | 243 | compressor,
|
225 |
| - namedWriteableRegistry |
| 244 | + customWritableRegistry() |
226 | 245 | );
|
227 | 246 | try (InputStream inputStream = remoteObjectForUpload.serialize()) {
|
228 | 247 | remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath());
|
229 | 248 | assertThat(inputStream.available(), greaterThan(0));
|
230 | 249 | Custom readCustomMetadata = remoteObjectForUpload.deserialize(inputStream);
|
231 |
| - assertThat(readCustomMetadata, is(customMetadata)); |
| 250 | + assertThat(readCustomMetadata, is(objectToUpload)); |
232 | 251 | }
|
233 | 252 | }
|
234 | 253 |
|
| 254 | + private NamedWriteableRegistry customWritableRegistry() { |
| 255 | + List<Entry> entries = ClusterModule.getNamedWriteables(); |
| 256 | + entries.add(new Entry(PersistentTaskParams.class, TestPersistentTaskParams.PARAM_NAME, TestPersistentTaskParams::new)); |
| 257 | + return new NamedWriteableRegistry(entries); |
| 258 | + } |
| 259 | + |
235 | 260 | public static Custom getCustomMetadata() {
|
236 | 261 | return IndexGraveyard.builder().addTombstone(new Index("test-index", "3q2423")).build();
|
237 | 262 | }
|
238 | 263 |
|
| 264 | + private static Custom getPersistentTasksMetadata() { |
| 265 | + return PersistentTasksCustomMetadata.builder() |
| 266 | + .addTask("_task_1", "testTaskName", new TestPersistentTaskParams("task param data"), new Assignment(null, "_reason")) |
| 267 | + .build(); |
| 268 | + } |
| 269 | + |
| 270 | + public static class TestPersistentTaskParams implements PersistentTaskParams { |
| 271 | + |
| 272 | + private static final String PARAM_NAME = "testTaskName"; |
| 273 | + |
| 274 | + private final String data; |
| 275 | + |
| 276 | + public TestPersistentTaskParams(String data) { |
| 277 | + this.data = data; |
| 278 | + } |
| 279 | + |
| 280 | + public TestPersistentTaskParams(StreamInput in) throws IOException { |
| 281 | + this(in.readString()); |
| 282 | + } |
| 283 | + |
| 284 | + @Override |
| 285 | + public String getWriteableName() { |
| 286 | + return PARAM_NAME; |
| 287 | + } |
| 288 | + |
| 289 | + @Override |
| 290 | + public Version getMinimalSupportedVersion() { |
| 291 | + return Version.V_2_13_0; |
| 292 | + } |
| 293 | + |
| 294 | + @Override |
| 295 | + public void writeTo(StreamOutput out) throws IOException { |
| 296 | + out.writeString(data); |
| 297 | + } |
| 298 | + |
| 299 | + @Override |
| 300 | + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { |
| 301 | + return builder.startObject().field("data_field", data); |
| 302 | + } |
| 303 | + |
| 304 | + @Override |
| 305 | + public boolean equals(Object o) { |
| 306 | + if (this == o) return true; |
| 307 | + if (o == null || getClass() != o.getClass()) return false; |
| 308 | + TestPersistentTaskParams that = (TestPersistentTaskParams) o; |
| 309 | + return Objects.equals(data, that.data); |
| 310 | + } |
| 311 | + } |
| 312 | + |
239 | 313 | }
|
0 commit comments