Skip to content

Commit 62e943c

Browse files
anshu1106Anshu Agarwal
andauthored
[Backport 2.x] Support centralize snapshot creation (#15569)
* [SnapshotV2] Support centralize snapshot creation (#15124) * Initial Commit to support centralize snapshot creation and implicit locking mechanism Signed-off-by: Anshu Agarwal <[email protected]> * Fix deserilization error Signed-off-by: Anshu Agarwal <[email protected]> * Fix gradle spotless check Signed-off-by: Anshu Agarwal <[email protected]> * Fix listener Signed-off-by: Anshu Agarwal <[email protected]> * Fix test Signed-off-by: Anshu Agarwal <[email protected]> * Fix snapshot generation Signed-off-by: Anshu Agarwal <[email protected]> * Modify cluster setting name Signed-off-by: Anshu Agarwal <[email protected]> * Add more tests Signed-off-by: Anshu Agarwal <[email protected]> * Uncomment pin timestamp code Signed-off-by: Anshu Agarwal <[email protected]> * Modify log messages Signed-off-by: Anshu Agarwal <[email protected]> * Add spotless check failure fix Signed-off-by: Anshu Agarwal <[email protected]> * Fix completion listener for snapshot v2 Signed-off-by: Anshu Agarwal <[email protected]> * Elevate cluster state update priority for repository metadata update task Signed-off-by: Anshu Agarwal <[email protected]> * Add more integ tests Signed-off-by: Anshu Agarwal <[email protected]> * Add priority as IMMEDIATE for cluster state repo update task only for v2 snapshots Signed-off-by: Anshu Agarwal <[email protected]> * Fix build error Signed-off-by: Anshu Agarwal <[email protected]> * Fix spotless error Signed-off-by: Anshu Agarwal <[email protected]> * Add repository setting for snapshot v2 Signed-off-by: Anshu Agarwal <[email protected]> * Address review comments Signed-off-by: Anshu Agarwal <[email protected]> * Add integ test to verify snapshot creation if shallow copy repo setting is disabled Signed-off-by: Anshu Agarwal <[email protected]> * Fix spotless vilation error Signed-off-by: Anshu Agarwal <[email protected]> * Address review comment Signed-off-by: Anshu Agarwal <[email protected]> * Address review comments Signed-off-by: Anshu Agarwal <[email protected]> * Add min version check for backward compatibility Signed-off-by: Anshu Agarwal <[email protected]> * address review comments Signed-off-by: Anshu Agarwal <[email protected]> * add integ test for master failover scenario Signed-off-by: Anshu Agarwal <[email protected]> * Add more integ tests Signed-off-by: Anshu Agarwal <[email protected]> * refactor code Signed-off-by: Anshu Agarwal <[email protected]> * add changelog Signed-off-by: Anshu Agarwal <[email protected]> * Add pinned timestamp setting in integ tests Signed-off-by: Anshu Agarwal <[email protected]> --------- Signed-off-by: Anshu Agarwal <[email protected]> Signed-off-by: Anshu Agarwal <[email protected]> Co-authored-by: Anshu Agarwal <[email protected]> (cherry picked from commit 23cba28) Signed-off-by: Anshu Agarwal <[email protected]> * Fix spotless check failure Signed-off-by: Anshu Agarwal <[email protected]> * fix :server:japicmp failure Signed-off-by: Anshu Agarwal <[email protected]> * backort PR#15602 Signed-off-by: Anshu Agarwal <[email protected]> --------- Signed-off-by: Anshu Agarwal <[email protected]> Signed-off-by: Anshu Agarwal <[email protected]> Co-authored-by: Anshu Agarwal <[email protected]>
1 parent 41e1075 commit 62e943c

File tree

21 files changed

+1196
-111
lines changed

21 files changed

+1196
-111
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2929
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
3030
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
3131
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
32+
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
3233
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
3334
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
3435
- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)).

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.cluster.metadata.Metadata;
4545
import org.opensearch.cluster.metadata.RepositoryMetadata;
4646
import org.opensearch.cluster.service.ClusterService;
47+
import org.opensearch.common.Priority;
4748
import org.opensearch.common.blobstore.BlobPath;
4849
import org.opensearch.common.blobstore.BlobStore;
4950
import org.opensearch.common.blobstore.BlobStoreException;
@@ -424,6 +425,7 @@ public void finalizeSnapshot(
424425
SnapshotInfo snapshotInfo,
425426
Version repositoryMetaVersion,
426427
Function<ClusterState, ClusterState> stateTransformer,
428+
Priority repositoryUpdatePriority,
427429
ActionListener<RepositoryData> listener
428430
) {
429431
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
@@ -436,6 +438,7 @@ public void finalizeSnapshot(
436438
snapshotInfo,
437439
repositoryMetaVersion,
438440
stateTransformer,
441+
repositoryUpdatePriority,
439442
listener
440443
);
441444
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java

Lines changed: 601 additions & 0 deletions
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.opensearch.cluster.ClusterState;
3737
import org.opensearch.cluster.metadata.Metadata;
3838
import org.opensearch.cluster.service.ClusterService;
39+
import org.opensearch.common.Priority;
3940
import org.opensearch.common.settings.Settings;
4041
import org.opensearch.core.action.ActionListener;
4142
import org.opensearch.core.xcontent.NamedXContentRegistry;
@@ -127,6 +128,7 @@ public void finalizeSnapshot(
127128
SnapshotInfo snapshotInfo,
128129
Version repositoryMetaVersion,
129130
Function<ClusterState, ClusterState> stateTransformer,
131+
Priority repositoryUpdatePriority,
130132
ActionListener<RepositoryData> listener
131133
) {
132134
super.finalizeSnapshot(
@@ -136,6 +138,7 @@ public void finalizeSnapshot(
136138
snapshotInfo,
137139
repositoryMetaVersion,
138140
stateTransformer,
141+
repositoryUpdatePriority,
139142
listener
140143
);
141144
}

server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@
4242
import org.opensearch.common.inject.Inject;
4343
import org.opensearch.core.action.ActionListener;
4444
import org.opensearch.core.common.io.stream.StreamInput;
45+
import org.opensearch.repositories.RepositoriesService;
46+
import org.opensearch.repositories.Repository;
4547
import org.opensearch.snapshots.SnapshotsService;
4648
import org.opensearch.threadpool.ThreadPool;
4749
import org.opensearch.transport.TransportService;
4850

4951
import java.io.IOException;
5052

53+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
54+
5155
/**
5256
* Transport action for create snapshot operation
5357
*
@@ -56,12 +60,15 @@
5660
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
5761
private final SnapshotsService snapshotsService;
5862

63+
private final RepositoriesService repositoriesService;
64+
5965
@Inject
6066
public TransportCreateSnapshotAction(
6167
TransportService transportService,
6268
ClusterService clusterService,
6369
ThreadPool threadPool,
6470
SnapshotsService snapshotsService,
71+
RepositoriesService repositoriesService,
6572
ActionFilters actionFilters,
6673
IndexNameExpressionResolver indexNameExpressionResolver
6774
) {
@@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
7582
indexNameExpressionResolver
7683
);
7784
this.snapshotsService = snapshotsService;
85+
this.repositoriesService = repositoriesService;
7886
}
7987

8088
@Override
@@ -110,7 +118,10 @@ protected void clusterManagerOperation(
110118
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
111119
}
112120
} else {
113-
if (request.waitForCompletion()) {
121+
Repository repository = repositoriesService.repository(request.repository());
122+
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());
123+
124+
if (request.waitForCompletion() || isSnapshotV2) {
114125
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
115126
} else {
116127
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));

server/src/main/java/org/opensearch/repositories/FilterRepository.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.cluster.metadata.Metadata;
4040
import org.opensearch.cluster.metadata.RepositoryMetadata;
4141
import org.opensearch.cluster.node.DiscoveryNode;
42+
import org.opensearch.common.Priority;
4243
import org.opensearch.common.lifecycle.Lifecycle;
4344
import org.opensearch.common.lifecycle.LifecycleListener;
4445
import org.opensearch.core.action.ActionListener;
@@ -123,6 +124,29 @@ public void finalizeSnapshot(
123124
);
124125
}
125126

127+
@Override
128+
public void finalizeSnapshot(
129+
ShardGenerations shardGenerations,
130+
long repositoryStateId,
131+
Metadata clusterMetadata,
132+
SnapshotInfo snapshotInfo,
133+
Version repositoryMetaVersion,
134+
Function<ClusterState, ClusterState> stateTransformer,
135+
Priority repositoryUpdatePriority,
136+
ActionListener<RepositoryData> listener
137+
) {
138+
in.finalizeSnapshot(
139+
shardGenerations,
140+
repositoryStateId,
141+
clusterMetadata,
142+
snapshotInfo,
143+
repositoryMetaVersion,
144+
stateTransformer,
145+
repositoryUpdatePriority,
146+
listener
147+
);
148+
}
149+
126150
@Override
127151
public void deleteSnapshots(
128152
Collection<SnapshotId> snapshotIds,

server/src/main/java/org/opensearch/repositories/Repository.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.cluster.metadata.RepositoryMetadata;
4242
import org.opensearch.cluster.node.DiscoveryNode;
4343
import org.opensearch.common.Nullable;
44+
import org.opensearch.common.Priority;
4445
import org.opensearch.common.annotation.PublicApi;
4546
import org.opensearch.common.lifecycle.LifecycleComponent;
4647
import org.opensearch.common.settings.Setting;
@@ -175,6 +176,32 @@ void finalizeSnapshot(
175176
ActionListener<RepositoryData> listener
176177
);
177178

179+
/**
180+
* Finalizes snapshotting process
181+
* <p>
182+
* This method is called on cluster-manager after all shards are snapshotted.
183+
*
184+
* @param shardGenerations updated shard generations
185+
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
186+
* @param clusterMetadata cluster metadata
187+
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
188+
* @param repositoryMetaVersion version of the updated repository metadata to write
189+
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
190+
* is used to remove any state tracked for the in-progress snapshot from the cluster state
191+
* @param repositoryUpdatePriority priority for the cluster state update task
192+
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
193+
*/
194+
void finalizeSnapshot(
195+
ShardGenerations shardGenerations,
196+
long repositoryStateId,
197+
Metadata clusterMetadata,
198+
SnapshotInfo snapshotInfo,
199+
Version repositoryMetaVersion,
200+
Function<ClusterState, ClusterState> stateTransformer,
201+
Priority repositoryUpdatePriority,
202+
ActionListener<RepositoryData> listener
203+
);
204+
178205
/**
179206
* Deletes snapshots
180207
*

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 74 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.opensearch.cluster.service.ClusterService;
6666
import org.opensearch.common.Nullable;
6767
import org.opensearch.common.Numbers;
68+
import org.opensearch.common.Priority;
6869
import org.opensearch.common.SetOnce;
6970
import org.opensearch.common.UUIDs;
7071
import org.opensearch.common.blobstore.BlobContainer;
@@ -267,6 +268,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
267268

268269
public static final Setting<Boolean> REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false);
269270

271+
public static final Setting<Boolean> SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false);
272+
270273
/**
271274
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
272275
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
@@ -1072,6 +1075,7 @@ private void doDeleteShardSnapshots(
10721075
repositoryStateId,
10731076
repoMetaVersion,
10741077
Function.identity(),
1078+
Priority.NORMAL,
10751079
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
10761080
);
10771081
}, listener::onFailure);
@@ -1101,39 +1105,46 @@ private void doDeleteShardSnapshots(
11011105
} else {
11021106
// Write the new repository data first (with the removed snapshot), using no shard generations
11031107
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
1104-
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> {
1105-
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1106-
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
1107-
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
1108-
2
1109-
);
1110-
cleanupUnlinkedRootAndIndicesBlobs(
1111-
snapshotIds,
1112-
foundIndices,
1113-
rootBlobs,
1114-
newRepoData,
1115-
remoteStoreLockManagerFactory,
1116-
afterCleanupsListener
1117-
);
1118-
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
1119-
writeUpdatedShardMetaDataAndComputeDeletes(
1120-
snapshotIds,
1121-
repositoryData,
1122-
false,
1123-
remoteStoreLockManagerFactory,
1124-
writeMetaAndComputeDeletesStep
1125-
);
1126-
writeMetaAndComputeDeletesStep.whenComplete(
1127-
deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(
1128-
repositoryData,
1108+
writeIndexGen(
1109+
updatedRepoData,
1110+
repositoryStateId,
1111+
repoMetaVersion,
1112+
Function.identity(),
1113+
Priority.NORMAL,
1114+
ActionListener.wrap(newRepoData -> {
1115+
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1116+
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
1117+
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
1118+
2
1119+
);
1120+
cleanupUnlinkedRootAndIndicesBlobs(
11291121
snapshotIds,
1130-
deleteResults,
1122+
foundIndices,
1123+
rootBlobs,
1124+
newRepoData,
11311125
remoteStoreLockManagerFactory,
11321126
afterCleanupsListener
1133-
),
1134-
afterCleanupsListener::onFailure
1135-
);
1136-
}, listener::onFailure));
1127+
);
1128+
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
1129+
writeUpdatedShardMetaDataAndComputeDeletes(
1130+
snapshotIds,
1131+
repositoryData,
1132+
false,
1133+
remoteStoreLockManagerFactory,
1134+
writeMetaAndComputeDeletesStep
1135+
);
1136+
writeMetaAndComputeDeletesStep.whenComplete(
1137+
deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(
1138+
repositoryData,
1139+
snapshotIds,
1140+
deleteResults,
1141+
remoteStoreLockManagerFactory,
1142+
afterCleanupsListener
1143+
),
1144+
afterCleanupsListener::onFailure
1145+
);
1146+
}, listener::onFailure)
1147+
);
11371148
}
11381149
}
11391150

@@ -1583,6 +1594,7 @@ public void cleanup(
15831594
repositoryStateId,
15841595
repositoryMetaVersion,
15851596
Function.identity(),
1597+
Priority.NORMAL,
15861598
ActionListener.wrap(
15871599
v -> cleanupStaleBlobs(
15881600
Collections.emptyList(),
@@ -1787,6 +1799,29 @@ public void finalizeSnapshot(
17871799
Version repositoryMetaVersion,
17881800
Function<ClusterState, ClusterState> stateTransformer,
17891801
final ActionListener<RepositoryData> listener
1802+
) {
1803+
finalizeSnapshot(
1804+
shardGenerations,
1805+
repositoryStateId,
1806+
clusterMetadata,
1807+
snapshotInfo,
1808+
repositoryMetaVersion,
1809+
stateTransformer,
1810+
Priority.NORMAL,
1811+
listener
1812+
);
1813+
}
1814+
1815+
@Override
1816+
public void finalizeSnapshot(
1817+
final ShardGenerations shardGenerations,
1818+
final long repositoryStateId,
1819+
final Metadata clusterMetadata,
1820+
SnapshotInfo snapshotInfo,
1821+
Version repositoryMetaVersion,
1822+
Function<ClusterState, ClusterState> stateTransformer,
1823+
Priority repositoryUpdatePriority,
1824+
final ActionListener<RepositoryData> listener
17901825
) {
17911826
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received ["
17921827
+ repositoryStateId
@@ -1834,6 +1869,7 @@ public void finalizeSnapshot(
18341869
repositoryStateId,
18351870
repositoryMetaVersion,
18361871
stateTransformer,
1872+
repositoryUpdatePriority,
18371873
ActionListener.wrap(newRepoData -> {
18381874
if (writeShardGens) {
18391875
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
@@ -2367,17 +2403,19 @@ public boolean isSystemRepository() {
23672403
* Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus
23682404
* pending and safe generation are set to the same value marking the end of the update of the repository data.
23692405
*
2370-
* @param repositoryData RepositoryData to write
2371-
* @param expectedGen expected repository generation at the start of the operation
2372-
* @param version version of the repository metadata to write
2373-
* @param stateFilter filter for the last cluster state update executed by this method
2406+
* @param repositoryData RepositoryData to write
2407+
* @param expectedGen expected repository generation at the start of the operation
2408+
* @param version version of the repository metadata to write
2409+
* @param stateFilter filter for the last cluster state update executed by this method
2410+
* @param repositoryUpdatePriority priority for the cluster state update task
23742411
* @param listener completion listener
23752412
*/
23762413
protected void writeIndexGen(
23772414
RepositoryData repositoryData,
23782415
long expectedGen,
23792416
Version version,
23802417
Function<ClusterState, ClusterState> stateFilter,
2418+
Priority repositoryUpdatePriority,
23812419
ActionListener<RepositoryData> listener
23822420
) {
23832421
assert isReadOnly() == false; // can not write to a read only repository
@@ -2402,7 +2440,7 @@ protected void writeIndexGen(
24022440
final StepListener<Long> setPendingStep = new StepListener<>();
24032441
clusterService.submitStateUpdateTask(
24042442
"set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
2405-
new ClusterStateUpdateTask() {
2443+
new ClusterStateUpdateTask(repositoryUpdatePriority) {
24062444

24072445
private long newGen;
24082446

@@ -2540,7 +2578,7 @@ public void onFailure(Exception e) {
25402578
// Step 3: Update CS to reflect new repository generation.
25412579
clusterService.submitStateUpdateTask(
25422580
"set safe repository generation [" + metadata.name() + "][" + newGen + "]",
2543-
new ClusterStateUpdateTask() {
2581+
new ClusterStateUpdateTask(repositoryUpdatePriority) {
25442582
@Override
25452583
public ClusterState execute(ClusterState currentState) {
25462584
final RepositoryMetadata meta = getRepoMetadata(currentState);

0 commit comments

Comments
 (0)