Skip to content

Commit 1526597

Browse files
committed
Add restriction to have a single repository with shallow snapshot v2 setting
Signed-off-by: Sachin Kale <[email protected]>
1 parent 92d7fe8 commit 1526597

File tree

3 files changed

+180
-6
lines changed

3 files changed

+180
-6
lines changed

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.io.ByteArrayInputStream;
3131
import java.io.Closeable;
3232
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
3335
import java.util.List;
3436
import java.util.Locale;
3537
import java.util.Map;
@@ -75,25 +77,42 @@ public RemoteStorePinnedTimestampService(
7577
* and starts the asynchronous update task.
7678
*/
7779
public void start() {
78-
validateRemoteStoreConfiguration();
80+
blobContainer = validateAndCreateBlobContainer(settings, repositoriesService.get());
7981
startAsyncUpdateTask(RemoteStoreSettings.getPinnedTimestampsSchedulerInterval());
8082
}
8183

82-
private void validateRemoteStoreConfiguration() {
84+
private static BlobContainer validateAndCreateBlobContainer(Settings settings, RepositoriesService repositoriesService) {
8385
final String remoteStoreRepo = settings.get(
8486
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
8587
);
8688
assert remoteStoreRepo != null : "Remote Segment Store repository is not configured";
87-
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
89+
final Repository repository = repositoriesService.repository(remoteStoreRepo);
8890
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
8991
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
90-
blobContainer = blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN));
92+
return blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN));
9193
}
9294

9395
private void startAsyncUpdateTask(TimeValue pinnedTimestampsSchedulerInterval) {
9496
asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true);
9597
}
9698

99+
public static Map<String, Set<Long>> fetchPinnedTimestamps(Settings settings, RepositoriesService repositoriesService)
100+
throws IOException {
101+
BlobContainer blobContainer = validateAndCreateBlobContainer(settings, repositoriesService);
102+
Set<String> pinnedTimestamps = blobContainer.listBlobs().keySet();
103+
Map<String, Set<Long>> pinningEntityTimestampMap = new HashMap<>();
104+
for (String pinnedTimestamp : pinnedTimestamps) {
105+
String[] tokens = pinnedTimestamp.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR);
106+
Long timestamp = Long.parseLong(tokens[tokens.length - 1]);
107+
String pinningEntity = pinnedTimestamp.substring(pinnedTimestamp.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR));
108+
if (pinningEntityTimestampMap.containsKey(pinningEntity) == false) {
109+
pinningEntityTimestampMap.put(pinningEntity, new HashSet<>());
110+
}
111+
pinningEntityTimestampMap.get(pinningEntity).add(timestamp);
112+
}
113+
return pinningEntityTimestampMap;
114+
}
115+
97116
/**
98117
* Pins a timestamp in the remote store.
99118
*

server/src/main/java/org/opensearch/repositories/RepositoriesService.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@
6868
import org.opensearch.common.util.io.IOUtils;
6969
import org.opensearch.core.action.ActionListener;
7070
import org.opensearch.core.common.Strings;
71+
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
7172
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
73+
import org.opensearch.snapshots.SnapshotsService;
7274
import org.opensearch.threadpool.ThreadPool;
7375
import org.opensearch.transport.TransportService;
7476

@@ -84,6 +86,7 @@
8486
import java.util.stream.Stream;
8587

8688
import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
89+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
8790
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
8891

8992
/**
@@ -123,6 +126,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
123126
private final RepositoriesStatsArchive repositoriesStatsArchive;
124127
private final ClusterManagerTaskThrottler.ThrottlingKey putRepositoryTaskKey;
125128
private final ClusterManagerTaskThrottler.ThrottlingKey deleteRepositoryTaskKey;
129+
private final Settings settings;
126130

127131
public RepositoriesService(
128132
Settings settings,
@@ -132,6 +136,7 @@ public RepositoriesService(
132136
Map<String, Repository.Factory> internalTypesRegistry,
133137
ThreadPool threadPool
134138
) {
139+
this.settings = settings;
135140
this.typesRegistry = typesRegistry;
136141
this.internalTypesRegistry = internalTypesRegistry;
137142
this.clusterService = clusterService;
@@ -173,7 +178,7 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
173178
CryptoMetadata.fromRequest(request.cryptoSettings())
174179
);
175180
validate(request.name());
176-
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings());
181+
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings(), repositories, settings, this);
177182
if (newRepositoryMetadata.cryptoMetadata() != null) {
178183
validate(newRepositoryMetadata.cryptoMetadata().keyProviderName());
179184
}
@@ -684,7 +689,10 @@ public static void validate(final String identifier) {
684689
public static void validateRepositoryMetadataSettings(
685690
ClusterService clusterService,
686691
final String repositoryName,
687-
final Settings repositoryMetadataSettings
692+
final Settings repositoryMetadataSettings,
693+
Map<String, Repository> repositories,
694+
Settings settings,
695+
RepositoriesService repositoriesService
688696
) {
689697
// We can add more validations here for repository settings in the future.
690698
Version minVersionInCluster = clusterService.state().getNodes().getMinNodeVersion();
@@ -699,6 +707,41 @@ public static void validateRepositoryMetadataSettings(
699707
+ minVersionInCluster
700708
);
701709
}
710+
if (SHALLOW_SNAPSHOT_V2.get(repositoryMetadataSettings)) {
711+
if (minVersionInCluster.onOrAfter(Version.V_2_17_0) == false) {
712+
throw new RepositoryException(
713+
repositoryName,
714+
"setting "
715+
+ SHALLOW_SNAPSHOT_V2.getKey()
716+
+ " cannot be enabled as some of the nodes in cluster are on version older than "
717+
+ Version.V_2_17_0
718+
+ ". Minimum node version in cluster is: "
719+
+ minVersionInCluster
720+
);
721+
}
722+
if (repositoryWithShallowV2Exists(repositories)) {
723+
throw new RepositoryException(
724+
repositoryName,
725+
"setting "
726+
+ SHALLOW_SNAPSHOT_V2.getKey()
727+
+ " cannot be enabled as this setting can be enabled only on one repository "
728+
+ " and one or more repositories in the cluster have the setting as enabled"
729+
);
730+
}
731+
try {
732+
if (pinnedTimestampExistsWithDifferentRepository(repositoryName, settings, repositoriesService)) {
733+
throw new RepositoryException(
734+
repositoryName,
735+
"setting "
736+
+ SHALLOW_SNAPSHOT_V2.getKey()
737+
+ " cannot be enabled if there are existing snapshots created with shallow V2 "
738+
+ "setting using different repository."
739+
);
740+
}
741+
} catch (Exception e) {
742+
throw new RepositoryException(repositoryName, "Exception while fetching pinned timestamp details");
743+
}
744+
}
702745
// Validation to not allow users to create system repository via put repository call.
703746
if (isSystemRepositorySettingPresent(repositoryMetadataSettings)) {
704747
throw new RepositoryException(
@@ -710,6 +753,36 @@ public static void validateRepositoryMetadataSettings(
710753
}
711754
}
712755

756+
private static boolean repositoryWithShallowV2Exists(Map<String, Repository> repositories) {
757+
return repositories.values().stream().anyMatch(repo -> SHALLOW_SNAPSHOT_V2.get(repo.getMetadata().settings()));
758+
}
759+
760+
private static boolean pinnedTimestampExistsWithDifferentRepository(
761+
String newRepoName,
762+
Settings settings,
763+
RepositoriesService repositoriesService
764+
) throws IOException {
765+
Map<String, Set<Long>> pinningEntityTimestampMap = RemoteStorePinnedTimestampService.fetchPinnedTimestamps(
766+
settings,
767+
repositoriesService
768+
);
769+
for (String pinningEntiy : pinningEntityTimestampMap.keySet()) {
770+
String[] tokens = pinningEntiy.split(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER);
771+
if (tokens.length > 2) {
772+
logger.warn(
773+
"With more than one {} in the pinning entity = {}, not able to determine if there is a pinned timestamp created with different repository",
774+
SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER,
775+
pinningEntiy
776+
);
777+
return true;
778+
}
779+
if (tokens[0].equals(newRepoName) == false) {
780+
return true;
781+
}
782+
}
783+
return false;
784+
}
785+
713786
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
714787
if (isRepositoryInUse(clusterState, repository)) {
715788
throw new IllegalStateException("trying to modify or unregister repository that is currently used");

server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
package org.opensearch.repositories.blobstore;
3434

3535
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
36+
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
37+
import org.opensearch.action.support.master.AcknowledgedResponse;
3638
import org.opensearch.client.Client;
3739
import org.opensearch.cluster.metadata.RepositoryMetadata;
3840
import org.opensearch.common.settings.Settings;
@@ -41,10 +43,12 @@
4143
import org.opensearch.gateway.remote.RemoteClusterStateService;
4244
import org.opensearch.index.IndexSettings;
4345
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
46+
import org.opensearch.indices.RemoteStoreSettings;
4447
import org.opensearch.indices.replication.common.ReplicationType;
4548
import org.opensearch.repositories.IndexId;
4649
import org.opensearch.repositories.RepositoriesService;
4750
import org.opensearch.repositories.RepositoryData;
51+
import org.opensearch.repositories.RepositoryException;
4852
import org.opensearch.repositories.fs.FsRepository;
4953
import org.opensearch.snapshots.SnapshotId;
5054
import org.opensearch.snapshots.SnapshotInfo;
@@ -64,6 +68,9 @@
6468
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
6569
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
6670
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
71+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
72+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
73+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6774
import static org.hamcrest.Matchers.equalTo;
6875

6976
/**
@@ -81,6 +88,7 @@ protected Settings nodeSettings() {
8188
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
8289
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
8390
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), tempDir.getParent())
91+
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
8492
.build();
8593
}
8694

@@ -373,4 +381,78 @@ public void testRetrieveShallowCopySnapshotCase2() throws IOException {
373381
assertThat(snapshotIds, equalTo(originalSnapshots));
374382
}
375383

384+
public void testRepositoryCreationShallowV2() throws Exception {
385+
Client client = client();
386+
387+
Settings snapshotRepoSettings1 = Settings.builder()
388+
.put(node().settings())
389+
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
390+
.put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
391+
.put(SHALLOW_SNAPSHOT_V2.getKey(), true)
392+
.build();
393+
394+
// Create repo with shallow snapshot V2 enabled
395+
createRepository(client, "test-repo-1", snapshotRepoSettings1);
396+
397+
logger.info("--> verify the repository");
398+
VerifyRepositoryResponse verifyRepositoryResponse = client.admin().cluster().prepareVerifyRepository("test-repo-1").get();
399+
assertNotNull(verifyRepositoryResponse.getNodes());
400+
401+
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
402+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
403+
404+
Settings snapshotRepoSettings2 = Settings.builder()
405+
.put(node().settings())
406+
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
407+
.put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
408+
.put(SHALLOW_SNAPSHOT_V2.getKey(), true)
409+
.build();
410+
411+
// Create another repo with shallow snapshot V2 enabled, this should fail.
412+
assertThrows(RepositoryException.class, () -> createRepository(client, "test-repo-2", snapshotRepoSettings2));
413+
414+
// Disable shallow snapshot V2 setting on test-repo-1
415+
updateRepository(
416+
client,
417+
"test-repo-1",
418+
Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
419+
);
420+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
421+
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
422+
423+
// Create test-repo-2 with shallow snapshot V2 enabled, this should pass now.
424+
createRepository(client, "test-repo-2", snapshotRepoSettings2);
425+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get();
426+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
427+
428+
final String indexName = "test-idx";
429+
createIndex(indexName);
430+
ensureGreen();
431+
indexDocuments(client, indexName);
432+
433+
// Create pinned timestamp snapshot in test-repo-2
434+
SnapshotInfo snapshotInfo = createSnapshot("test-repo-2", "test-snap-1", new ArrayList<>());
435+
assertNotNull(snapshotInfo.snapshotId());
436+
437+
// As snapshot is present, even after disabling shallow snapshot setting in test-repo-2, we will not be able to
438+
// enable shallow snapshot v2 setting in test-repo-1
439+
updateRepository(
440+
client,
441+
"test-repo-2",
442+
Settings.builder().put(snapshotRepoSettings2).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
443+
);
444+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get();
445+
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
446+
447+
assertThrows(RepositoryException.class, () -> updateRepository(client, "test-repo-1", snapshotRepoSettings1));
448+
449+
// After deleting the snapshot, we will be able to enable shallow snapshot v2 setting in test-repo-1
450+
AcknowledgedResponse deleteSnapshotResponse = client().admin().cluster().prepareDeleteSnapshot("test-repo-2", "test-snap-1").get();
451+
452+
assertAcked(deleteSnapshotResponse);
453+
454+
updateRepository(client, "test-repo-1", snapshotRepoSettings1);
455+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
456+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
457+
}
376458
}

0 commit comments

Comments
 (0)