Skip to content

Commit 3fc0139

Browse files
authored
Optimise snapshot deletion to speed up snapshot deletion and creation (opensearch-project#15568)
--------- Signed-off-by: Ashish Singh <[email protected]>
1 parent deeb2de commit 3fc0139

File tree

4 files changed

+86
-15
lines changed

4 files changed

+86
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4646
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
4747
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
4848
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
49+
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
4950
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
5051

5152
### Dependencies

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.opensearch.common.Nullable;
6767
import org.opensearch.common.Numbers;
6868
import org.opensearch.common.Priority;
69+
import org.opensearch.common.Randomness;
6970
import org.opensearch.common.SetOnce;
7071
import org.opensearch.common.UUIDs;
7172
import org.opensearch.common.blobstore.BlobContainer;
@@ -831,7 +832,7 @@ boolean getPrefixModeVerification() {
831832
* maintains single lazy instance of {@link BlobContainer}
832833
*/
833834
protected BlobContainer blobContainer() {
834-
// assertSnapshotOrGenericThread();
835+
assertSnapshotOrGenericThread();
835836

836837
BlobContainer blobContainer = this.blobContainer.get();
837838
if (blobContainer == null) {
@@ -1204,6 +1205,10 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
12041205
ActionListener<Void> listener
12051206
) {
12061207
final List<Tuple<BlobPath, String>> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults);
1208+
long startTimeNs = System.nanoTime();
1209+
Randomness.shuffle(filesToDelete);
1210+
logger.debug("[{}] shuffled the filesToDelete with timeElapsedNs={}", metadata.name(), (System.nanoTime() - startTimeNs));
1211+
12071212
if (filesToDelete.isEmpty()) {
12081213
listener.onResponse(null);
12091214
return;
@@ -1221,8 +1226,8 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
12211226
staleFilesToDeleteInBatch.size()
12221227
);
12231228

1224-
// Start as many workers as fit into the snapshot pool at once at the most
1225-
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size());
1229+
// Start as many workers as fit into the snapshot_deletion pool at once at the most
1230+
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
12261231
for (int i = 0; i < workers; ++i) {
12271232
executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, groupedListener);
12281233
}
@@ -1326,7 +1331,7 @@ private void executeStaleShardDelete(
13261331
if (filesToDelete == null) {
13271332
return;
13281333
}
1329-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
1334+
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
13301335
try {
13311336
// filtering files for which remote store lock release and cleanup succeeded,
13321337
// remaining files for which it failed will be retried in next snapshot delete run.
@@ -1390,7 +1395,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(
13901395
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted
13911396
) {
13921397

1393-
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
1398+
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
13941399
final List<IndexId> indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);
13951400

13961401
if (indices.isEmpty()) {
@@ -1578,7 +1583,7 @@ private void cleanupStaleBlobs(
15781583
listener.onResponse(deleteResult);
15791584
}, listener::onFailure), 2);
15801585

1581-
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
1586+
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
15821587
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
15831588
if (staleRootBlobs.isEmpty()) {
15841589
groupedListener.onResponse(DeleteResult.ZERO);
@@ -1781,7 +1786,7 @@ void cleanupStaleIndices(
17811786

17821787
// Start as many workers as fit into the snapshot pool at once at the most
17831788
final int workers = Math.min(
1784-
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
1789+
threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(),
17851790
foundIndices.size() - survivingIndexIds.size()
17861791
);
17871792
for (int i = 0; i < workers; ++i) {
@@ -1833,7 +1838,7 @@ private void executeOneStaleIndexDelete(
18331838
return;
18341839
}
18351840
final String indexSnId = indexEntry.getKey();
1836-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
1841+
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.supply(listener, () -> {
18371842
try {
18381843
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
18391844
List<String> matchingShardPaths = findMatchingShardPaths(indexSnId, snapshotShardPaths);
@@ -2097,8 +2102,7 @@ public void finalizeSnapshot(
20972102
stateTransformer,
20982103
repositoryUpdatePriority,
20992104
ActionListener.wrap(newRepoData -> {
2100-
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
2101-
listener.onResponse(newRepoData);
2105+
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData, newRepoData, listener);
21022106
}, onUpdateFailure)
21032107
);
21042108
}, onUpdateFailure), 2 + indices.size());
@@ -2254,7 +2258,12 @@ private void logShardPathsOperationWarning(IndexId indexId, SnapshotId snapshotI
22542258
}
22552259

22562260
// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
2257-
private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
2261+
private void cleanupOldShardGens(
2262+
RepositoryData existingRepositoryData,
2263+
RepositoryData updatedRepositoryData,
2264+
RepositoryData newRepositoryData,
2265+
ActionListener<RepositoryData> listener
2266+
) {
22582267
final List<String> toDelete = new ArrayList<>();
22592268
updatedRepositoryData.shardGenerations()
22602269
.obsoleteShardGenerations(existingRepositoryData.shardGenerations())
@@ -2263,10 +2272,62 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
22632272
(shardId, oldGen) -> toDelete.add(shardPath(indexId, shardId).buildAsString() + INDEX_FILE_PREFIX + oldGen)
22642273
)
22652274
);
2275+
if (toDelete.isEmpty()) {
2276+
listener.onResponse(newRepositoryData);
2277+
return;
2278+
}
22662279
try {
2267-
deleteFromContainer(rootBlobContainer(), toDelete);
2280+
AtomicInteger counter = new AtomicInteger();
2281+
Collection<List<String>> subList = toDelete.stream()
2282+
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
2283+
.values();
2284+
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);
2285+
logger.info(
2286+
"[{}] cleanupOldShardGens toDeleteSize={} groupSize={}",
2287+
metadata.name(),
2288+
toDelete.size(),
2289+
staleFilesToDeleteInBatch.size()
2290+
);
2291+
final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.wrap(r -> {
2292+
logger.info("[{}] completed cleanupOldShardGens", metadata.name());
2293+
listener.onResponse(newRepositoryData);
2294+
}, ex -> {
2295+
logger.error(new ParameterizedMessage("[{}] exception in cleanupOldShardGens", metadata.name()), ex);
2296+
listener.onResponse(newRepositoryData);
2297+
}), staleFilesToDeleteInBatch.size());
2298+
2299+
// Start as many workers as fit into the snapshot pool at once at the most
2300+
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
2301+
for (int i = 0; i < workers; ++i) {
2302+
executeOldShardGensCleanup(staleFilesToDeleteInBatch, groupedListener);
2303+
}
22682304
} catch (Exception e) {
2269-
logger.warn("Failed to clean up old shard generation blobs", e);
2305+
logger.warn(new ParameterizedMessage(" [{}] Failed to clean up old shard generation blobs", metadata.name()), e);
2306+
listener.onResponse(newRepositoryData);
2307+
}
2308+
}
2309+
2310+
private void executeOldShardGensCleanup(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
2311+
throws InterruptedException {
2312+
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
2313+
if (filesToDelete != null) {
2314+
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
2315+
try {
2316+
deleteFromContainer(rootBlobContainer(), filesToDelete);
2317+
l.onResponse(null);
2318+
} catch (Exception e) {
2319+
logger.warn(
2320+
() -> new ParameterizedMessage(
2321+
"[{}] Failed to delete following blobs during cleanupOldFiles : {}",
2322+
metadata.name(),
2323+
filesToDelete
2324+
),
2325+
e
2326+
);
2327+
l.onFailure(e);
2328+
}
2329+
executeOldShardGensCleanup(staleFilesToDeleteInBatch, listener);
2330+
}));
22702331
}
22712332
}
22722333

@@ -2383,10 +2444,11 @@ public long getRemoteDownloadThrottleTimeInNanos() {
23832444
}
23842445

23852446
protected void assertSnapshotOrGenericThread() {
2386-
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
2447+
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_DELETION + ']')
2448+
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
23872449
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread ["
23882450
+ Thread.currentThread()
2389-
+ "] to be the snapshot or generic thread.";
2451+
+ "] to be the snapshot_deletion or snapshot or generic thread.";
23902452
}
23912453

23922454
@Override

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public static class Names {
105105
public static final String REFRESH = "refresh";
106106
public static final String WARMER = "warmer";
107107
public static final String SNAPSHOT = "snapshot";
108+
public static final String SNAPSHOT_DELETION = "snapshot_deletion";
108109
public static final String FORCE_MERGE = "force_merge";
109110
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
110111
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
@@ -176,6 +177,7 @@ public static ThreadPoolType fromType(String type) {
176177
map.put(Names.REFRESH, ThreadPoolType.SCALING);
177178
map.put(Names.WARMER, ThreadPoolType.SCALING);
178179
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
180+
map.put(Names.SNAPSHOT_DELETION, ThreadPoolType.SCALING);
179181
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
180182
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
181183
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
@@ -234,6 +236,7 @@ public ThreadPool(
234236
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
235237
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
236238
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
239+
final int snapshotDeletionPoolMax = boundedBy(4 * allocatedProcessors, 64, 256);
237240
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
238241
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
239242
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
@@ -251,6 +254,10 @@ public ThreadPool(
251254
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
252255
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
253256
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
257+
builders.put(
258+
Names.SNAPSHOT_DELETION,
259+
new ScalingExecutorBuilder(Names.SNAPSHOT_DELETION, 1, snapshotDeletionPoolMax, TimeValue.timeValueMinutes(5))
260+
);
254261
builders.put(
255262
Names.FETCH_SHARD_STARTED,
256263
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))

server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
148148
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
149149
sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);
150150
sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive);
151+
sizes.put(ThreadPool.Names.SNAPSHOT_DELETION, n -> ThreadPool.boundedBy(4 * n, 64, 256));
151152
sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors);
152153
sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors);
153154
sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessors);

0 commit comments

Comments
 (0)