Skip to content

Commit 0cacc98

Browse files
retainpink
andauthored
feat: add vertical scaling and SoftReference for snapshot repository data cache (#16489) (#16624)
- Applies `SoftReference` to cached repository data for efficient memory management under heap pressure. - Enables cache size configuration in `opensearch.yml`, adjustable within a range of 500KB to 1% of heap memory. - Sets the default cache size to `Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2)` so it’s generally proportional to heap size. In cases where 1% of the heap is less than 1000KB, indicating a low-memory environment, the default reverts to 500KB as before. - Since `BytesReference` internally uses `byte[]`, the compressed array size is capped at `Integer.MAX_VALUE - 8` to ensure compatibility with JDK limitations on array sizes. Therefore, the maximum cache size cannot exceed this limit. (cherry picked from commit 53d41d3) Signed-off-by: inpink <[email protected]> Signed-off-by: Andriy Redko <[email protected]> Co-authored-by: inpink <[email protected]>
1 parent 05123fe commit 0cacc98

File tree

5 files changed

+209
-17
lines changed

5 files changed

+209
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Increase segrep pressure checkpoint default limit to 30 ([#16577](https://github.com/opensearch-project/OpenSearch/pull/16577/files))
1212
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
1313
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))
14+
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))
1415

1516
### Dependencies
1617
- Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ public void apply(Settings value, Settings current, Settings previous) {
787787
// Snapshot related Settings
788788
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
789789
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
790+
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,
790791

791792
// Composite index settings
792793
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.opensearch.indices.RemoteStoreSettings;
143143
import org.opensearch.indices.recovery.RecoverySettings;
144144
import org.opensearch.indices.recovery.RecoveryState;
145+
import org.opensearch.monitor.jvm.JvmInfo;
145146
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
146147
import org.opensearch.repositories.IndexId;
147148
import org.opensearch.repositories.IndexMetaDataGenerations;
@@ -168,6 +169,7 @@
168169
import java.io.FilterInputStream;
169170
import java.io.IOException;
170171
import java.io.InputStream;
172+
import java.lang.ref.SoftReference;
171173
import java.nio.file.NoSuchFileException;
172174
import java.util.ArrayList;
173175
import java.util.Arrays;
@@ -197,6 +199,7 @@
197199
import java.util.stream.LongStream;
198200
import java.util.stream.Stream;
199201

202+
import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
200203
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
201204
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
202205
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
@@ -254,6 +257,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
254257
*/
255258
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";
256259

260+
public static final String SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME = "snapshot.repository_data.cache.threshold";
261+
262+
public static final double SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE = 0.01;
263+
264+
public static final long CACHE_MIN_THRESHOLD = ByteSizeUnit.KB.toBytes(500);
265+
266+
public static final long CACHE_MAX_THRESHOLD = calculateMaxSnapshotRepositoryDataCacheThreshold();
267+
268+
public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold();
269+
270+
/**
271+
* Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions.
272+
* This ensures compatibility across various JDK versions. For a practical usage example,
273+
* see this link: https://github.com/openjdk/jdk11u/blob/cee8535a9d3de8558b4b5028d68e397e508bef71/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ByteArrayChannel.java#L226
274+
*/
275+
private static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8;
276+
257277
/**
258278
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
259279
* contents will not result in the repository being marked as corrupted.
@@ -276,6 +296,58 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
276296
Setting.Property.Deprecated
277297
);
278298

299+
/**
300+
* Sets the cache size for snapshot repository data: the valid range is within 500Kb ... 1% of the node heap memory.
301+
*/
302+
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD = new Setting<>(
303+
SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME,
304+
CACHE_DEFAULT_THRESHOLD + "b",
305+
(s) -> {
306+
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME);
307+
long userDefinedLimitBytes = userDefinedLimit.getBytes();
308+
309+
if (userDefinedLimitBytes > CACHE_MAX_THRESHOLD) {
310+
throw new IllegalArgumentException(
311+
"["
312+
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
313+
+ "] cannot be larger than ["
314+
+ CACHE_MAX_THRESHOLD
315+
+ "] bytes."
316+
);
317+
}
318+
319+
if (userDefinedLimitBytes < CACHE_MIN_THRESHOLD) {
320+
throw new IllegalArgumentException(
321+
"["
322+
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
323+
+ "] cannot be smaller than ["
324+
+ CACHE_MIN_THRESHOLD
325+
+ "] bytes."
326+
);
327+
}
328+
329+
return userDefinedLimit;
330+
},
331+
Setting.Property.NodeScope
332+
);
333+
334+
public static long calculateDefaultSnapshotRepositoryDataCacheThreshold() {
335+
return Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2);
336+
}
337+
338+
public static long calculateMaxSnapshotRepositoryDataCacheThreshold() {
339+
long jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
340+
long defaultThresholdOfHeap = (long) (jvmHeapSize * SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE);
341+
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
342+
long maxThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);
343+
344+
return maxThreshold;
345+
}
346+
347+
protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, long defaultAbsoluteThreshold) {
348+
return Math.min(Math.max(defaultThresholdOfHeap, defaultAbsoluteThreshold), MAX_SAFE_ARRAY_SIZE);
349+
}
350+
279351
/**
280352
* Size hint for the IO buffer size to use when reading from and writing to the repository.
281353
*/
@@ -462,6 +534,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
462534

463535
private volatile boolean enableAsyncDeletion;
464536

537+
protected final long repositoryDataCacheThreshold;
538+
465539
/**
466540
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
467541
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
@@ -519,6 +593,7 @@ protected BlobStoreRepository(
519593
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
520594
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
521595
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
596+
this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes();
522597
}
523598

524599
@Override
@@ -1157,7 +1232,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
11571232
cached = null;
11581233
} else {
11591234
genToLoad = latestKnownRepoGen.get();
1160-
cached = latestKnownRepositoryData.get();
1235+
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
1236+
cached = (softRef != null) ? softRef.get() : null;
11611237
}
11621238
if (genToLoad > generation) {
11631239
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
@@ -3025,15 +3101,19 @@ public void endVerification(String seed) {
30253101
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);
30263102

30273103
// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
3028-
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
3104+
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>(
3105+
new SoftReference<>(null)
3106+
);
30293107

30303108
@Override
30313109
public void getRepositoryData(ActionListener<RepositoryData> listener) {
30323110
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
30333111
listener.onFailure(corruptedStateException(null));
30343112
return;
30353113
}
3036-
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
3114+
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
3115+
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
3116+
30373117
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
30383118
// the latest known repository generation
30393119
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
@@ -3082,7 +3162,8 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
30823162
genToLoad = latestKnownRepoGen.get();
30833163
}
30843164
try {
3085-
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
3165+
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
3166+
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
30863167
final RepositoryData loaded;
30873168
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
30883169
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
@@ -3149,19 +3230,22 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
31493230
try {
31503231
serialized = CompressorRegistry.defaultCompressor().compress(updated);
31513232
final int len = serialized.length();
3152-
if (len > ByteSizeUnit.KB.toBytes(500)) {
3233+
long cacheWarningThreshold = Math.min(repositoryDataCacheThreshold * 10, MAX_SAFE_ARRAY_SIZE);
3234+
if (len > repositoryDataCacheThreshold) {
31533235
logger.debug(
3154-
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
3236+
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
31553237
+ " serialized size",
31563238
len,
3157-
metadata.name()
3239+
metadata.name(),
3240+
repositoryDataCacheThreshold
31583241
);
3159-
if (len > ByteSizeUnit.MB.toBytes(5)) {
3242+
if (len > cacheWarningThreshold) {
31603243
logger.warn(
3161-
"Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh"
3244+
"Your repository metadata blob for repository [{}] is larger than [{}] bytes. Consider moving to a fresh"
31623245
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable"
31633246
+ " repository behavior going forward.",
3164-
metadata.name()
3247+
metadata.name(),
3248+
cacheWarningThreshold
31653249
);
31663250
}
31673251
// Set empty repository data to not waste heap for an outdated cached value
@@ -3173,11 +3257,12 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
31733257
logger.warn("Failed to serialize repository data", e);
31743258
return;
31753259
}
3176-
latestKnownRepositoryData.updateAndGet(known -> {
3260+
latestKnownRepositoryData.updateAndGet(knownRef -> {
3261+
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null;
31773262
if (known != null && known.v1() > generation) {
3178-
return known;
3263+
return knownRef;
31793264
}
3180-
return new Tuple<>(generation, serialized);
3265+
return new SoftReference<>(new Tuple<>(generation, serialized));
31813266
});
31823267
}
31833268
}

server/src/test/java/org/opensearch/common/settings/MemorySizeSettingsTests.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434

3535
import org.opensearch.common.settings.Setting.Property;
3636
import org.opensearch.common.util.PageCacheRecycler;
37+
import org.opensearch.core.common.unit.ByteSizeUnit;
3738
import org.opensearch.core.common.unit.ByteSizeValue;
3839
import org.opensearch.indices.IndexingMemoryController;
3940
import org.opensearch.indices.IndicesQueryCache;
4041
import org.opensearch.indices.IndicesRequestCache;
4142
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
4243
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
4344
import org.opensearch.monitor.jvm.JvmInfo;
45+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
4446
import org.opensearch.test.OpenSearchTestCase;
4547

4648
import static org.hamcrest.Matchers.equalTo;
@@ -127,22 +129,75 @@ public void testIndicesFieldDataCacheSetting() {
127129
);
128130
}
129131

132+
public void testSnapshotRepositoryDataCacheSizeSetting() {
133+
assertMemorySizeSettingInRange(
134+
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,
135+
"snapshot.repository_data.cache.threshold",
136+
new ByteSizeValue(BlobStoreRepository.calculateDefaultSnapshotRepositoryDataCacheThreshold()),
137+
ByteSizeUnit.KB.toBytes(500),
138+
1.0
139+
);
140+
}
141+
130142
private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue) {
131143
assertMemorySizeSetting(setting, settingKey, defaultValue, Settings.EMPTY);
132144
}
133145

134146
private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue, Settings settings) {
147+
assertMemorySizeSetting(setting, settingKey, defaultValue, 25.0, 1024, settings);
148+
}
149+
150+
private void assertMemorySizeSetting(
151+
Setting<ByteSizeValue> setting,
152+
String settingKey,
153+
ByteSizeValue defaultValue,
154+
double availablePercentage,
155+
long availableBytes,
156+
Settings settings
157+
) {
135158
assertThat(setting, notNullValue());
136159
assertThat(setting.getKey(), equalTo(settingKey));
137160
assertThat(setting.getProperties(), hasItem(Property.NodeScope));
138161
assertThat(setting.getDefault(settings), equalTo(defaultValue));
139-
Settings settingWithPercentage = Settings.builder().put(settingKey, "25%").build();
162+
Settings settingWithPercentage = Settings.builder().put(settingKey, percentageAsString(availablePercentage)).build();
140163
assertThat(
141164
setting.get(settingWithPercentage),
142-
equalTo(new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25)))
165+
equalTo(
166+
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * percentageAsFraction(availablePercentage)))
167+
)
143168
);
144-
Settings settingWithBytesValue = Settings.builder().put(settingKey, "1024b").build();
145-
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(1024)));
169+
Settings settingWithBytesValue = Settings.builder().put(settingKey, availableBytes + "b").build();
170+
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(availableBytes)));
146171
}
147172

173+
private void assertMemorySizeSettingInRange(
174+
Setting<ByteSizeValue> setting,
175+
String settingKey,
176+
ByteSizeValue defaultValue,
177+
long minBytes,
178+
double maxPercentage
179+
) {
180+
assertMemorySizeSetting(setting, settingKey, defaultValue, maxPercentage, minBytes, Settings.EMPTY);
181+
182+
assertThrows(IllegalArgumentException.class, () -> {
183+
Settings settingWithTooSmallValue = Settings.builder().put(settingKey, minBytes - 1).build();
184+
setting.get(settingWithTooSmallValue);
185+
});
186+
187+
assertThrows(IllegalArgumentException.class, () -> {
188+
double unavailablePercentage = maxPercentage + 0.1;
189+
Settings settingWithPercentageExceedingLimit = Settings.builder()
190+
.put(settingKey, percentageAsString(unavailablePercentage))
191+
.build();
192+
setting.get(settingWithPercentageExceedingLimit);
193+
});
194+
}
195+
196+
private double percentageAsFraction(double availablePercentage) {
197+
return availablePercentage / 100.0;
198+
}
199+
200+
private String percentageAsString(double availablePercentage) {
201+
return availablePercentage + "%";
202+
}
148203
}

server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import java.util.stream.Collectors;
9393

9494
import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData;
95+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.calculateMaxWithinIntLimit;
9596
import static org.hamcrest.Matchers.equalTo;
9697
import static org.hamcrest.Matchers.nullValue;
9798
import static org.mockito.ArgumentMatchers.any;
@@ -653,4 +654,53 @@ public void testGetRestrictedSystemRepositorySettings() {
653654
assertTrue(settings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
654655
repository.close();
655656
}
657+
658+
public void testSnapshotRepositoryDataCacheDefaultSetting() {
659+
// given
660+
BlobStoreRepository repository = setupRepo();
661+
long maxThreshold = BlobStoreRepository.calculateMaxSnapshotRepositoryDataCacheThreshold();
662+
663+
// when
664+
long expectedThreshold = Math.max(ByteSizeUnit.KB.toBytes(500), maxThreshold / 2);
665+
666+
// then
667+
assertEquals(repository.repositoryDataCacheThreshold, expectedThreshold);
668+
}
669+
670+
public void testHeapThresholdUsed() {
671+
// given
672+
long defaultThresholdOfHeap = ByteSizeUnit.GB.toBytes(1);
673+
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
674+
675+
// when
676+
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);
677+
678+
// then
679+
assertEquals(defaultThresholdOfHeap, expectedThreshold);
680+
}
681+
682+
public void testAbsoluteThresholdUsed() {
683+
// given
684+
long defaultThresholdOfHeap = ByteSizeUnit.KB.toBytes(499);
685+
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
686+
687+
// when
688+
long result = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);
689+
690+
// then
691+
assertEquals(defaultAbsoluteThreshold, result);
692+
}
693+
694+
public void testThresholdCappedAtIntMax() {
695+
// given
696+
int maxSafeArraySize = Integer.MAX_VALUE - 8;
697+
long defaultThresholdOfHeap = (long) maxSafeArraySize + 1;
698+
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
699+
700+
// when
701+
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);
702+
703+
// then
704+
assertEquals(maxSafeArraySize, expectedThreshold);
705+
}
656706
}

0 commit comments

Comments
 (0)