Skip to content

Commit 1cec509

Browse files
sgup432dk2k
authored andcommitted
[Tiered Caching] Segmented cache changes (opensearch-project#16047)
* Segmented cache changes for TieredCache Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding change log Signed-off-by: Sagar Upadhyaya <[email protected]> * Allow segment number to be power of two Signed-off-by: Sagar Upadhyaya <[email protected]> * Moving common tiered cache IT methods to a common base class Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding disk took time IT test with multiple segment Signed-off-by: Sagar Upadhyaya <[email protected]> * Correcting changelog Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing comments Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing invalid segment count variable name Signed-off-by: Sagar Upadhyaya <[email protected]> * Introducing new settings for size for respective cache tier Signed-off-by: Sagar Upadhyaya <[email protected]> * Changing the default segmentCount logic Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing missing java doc issue Signed-off-by: Sagar Upadhyaya <[email protected]> --------- Signed-off-by: Sagar Upadhyaya <[email protected]> Signed-off-by: Sagar <[email protected]>
1 parent a9a1b1e commit 1cec509

File tree

23 files changed

+2170
-665
lines changed

23 files changed

+2170
-665
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
1313
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
1414
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
15+
- [Tiered Caching] Segmented cache changes ([#16047](https://github.com/opensearch-project/OpenSearch/pull/16047))
1516
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
1617
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
1718
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cache.common.tier;
10+
11+
import org.opensearch.common.cache.CacheType;
12+
import org.opensearch.common.cache.settings.CacheSettings;
13+
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
18+
public class TieredSpilloverCacheBaseIT extends OpenSearchIntegTestCase {
19+
20+
public Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage, int numberOfSegments) {
21+
return Settings.builder()
22+
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
23+
.put(
24+
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
25+
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
26+
)
27+
.put(
28+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
29+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
30+
).getKey(),
31+
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
32+
)
33+
.put(
34+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
35+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
36+
).getKey(),
37+
MockDiskCache.MockDiskCacheFactory.NAME
38+
)
39+
.put(
40+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
41+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
42+
).getKey(),
43+
numberOfSegments
44+
)
45+
.put(
46+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
47+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
48+
).getKey(),
49+
onHeapCacheSizeInBytesOrPercentage
50+
)
51+
.build();
52+
}
53+
54+
public int getNumberOfSegments() {
55+
return randomFrom(1, 2, 4, 8, 16, 64, 128, 256);
56+
}
57+
}

modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java

Lines changed: 127 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@
2222
import org.opensearch.cluster.metadata.IndexMetadata;
2323
import org.opensearch.common.cache.CacheType;
2424
import org.opensearch.common.cache.ICache;
25-
import org.opensearch.common.cache.settings.CacheSettings;
26-
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
27-
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
2825
import org.opensearch.common.settings.Settings;
2926
import org.opensearch.common.unit.TimeValue;
30-
import org.opensearch.common.util.FeatureFlags;
3127
import org.opensearch.index.cache.request.RequestCacheStats;
3228
import org.opensearch.index.query.QueryBuilders;
3329
import org.opensearch.indices.IndicesRequestCache;
@@ -43,13 +39,15 @@
4339
import java.util.Arrays;
4440
import java.util.Collection;
4541
import java.util.List;
42+
import java.util.Locale;
4643
import java.util.Map;
44+
import java.util.UUID;
4745
import java.util.concurrent.TimeUnit;
4846
import java.util.function.Function;
4947
import java.util.stream.Collectors;
5048
import java.util.stream.Stream;
5149

52-
import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;
50+
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE;
5351
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
5452
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
5553
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@@ -58,43 +56,15 @@
5856
import static org.hamcrest.Matchers.greaterThan;
5957

6058
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
61-
public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {
59+
public class TieredSpilloverCacheIT extends TieredSpilloverCacheBaseIT {
6260

6361
@Override
6462
protected Collection<Class<? extends Plugin>> nodePlugins() {
6563
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
6664
}
6765

68-
static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
69-
return Settings.builder()
70-
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
71-
.put(
72-
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
73-
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
74-
)
75-
.put(
76-
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
77-
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
78-
).getKey(),
79-
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
80-
)
81-
.put(
82-
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
83-
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
84-
).getKey(),
85-
MockDiskCache.MockDiskCacheFactory.NAME
86-
)
87-
.put(
88-
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
89-
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
90-
.getKey(),
91-
onHeapCacheSizeInBytesOrPercentage
92-
)
93-
.build();
94-
}
95-
9666
public void testPluginsAreInstalled() {
97-
internalCluster().startNode(Settings.builder().put(defaultSettings("1%")).build());
67+
internalCluster().startNode(Settings.builder().put(defaultSettings("1%", getNumberOfSegments())).build());
9868
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
9969
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
10070
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
@@ -111,7 +81,8 @@ public void testPluginsAreInstalled() {
11181
}
11282

11383
public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
114-
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%")).build());
84+
int numberOfSegments = getNumberOfSegments();
85+
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%", numberOfSegments)).build());
11586
Client client = client();
11687
assertAcked(
11788
client.admin()
@@ -147,9 +118,97 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
147118
);
148119
}
149120

121+
public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
122+
int numberOfSegments = getNumberOfSegments();
123+
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
124+
// just a bit higher so that each segment can atleast hold 1 entry.
125+
int onHeapCacheSizeInBytes = onHeapCacheSizePerSegmentInBytes * numberOfSegments;
126+
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments)).build());
127+
Client client = client();
128+
assertAcked(
129+
client.admin()
130+
.indices()
131+
.prepareCreate("index")
132+
.setMapping("k", "type=keyword")
133+
.setSettings(
134+
Settings.builder()
135+
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
136+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
137+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
138+
.put("index.refresh_interval", -1)
139+
)
140+
.get()
141+
);
142+
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
143+
// to disk. And then hit requests so that few items are cached into cache.
144+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
145+
Settings.builder()
146+
.put(
147+
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
148+
new TimeValue(100, TimeUnit.SECONDS)
149+
)
150+
.build()
151+
);
152+
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
153+
int numberOfIndexedItems = numberOfSegments + 1; // Best case if all keys are distributed among different
154+
// segment, atleast one of the segment will have 2 entries and we will see evictions.
155+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
156+
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
157+
}
158+
ensureSearchable("index");
159+
refreshAndWaitForReplication();
160+
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
161+
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
162+
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
163+
long perQuerySizeInCacheInBytes = -1;
164+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
165+
SearchResponse resp = client.prepareSearch("index")
166+
.setRequestCache(true)
167+
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
168+
.get();
169+
if (perQuerySizeInCacheInBytes == -1) {
170+
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
171+
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
172+
}
173+
assertSearchResponse(resp);
174+
}
175+
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
176+
// Considering disk cache won't be used due to took time policy having a high value, we expect overall cache
177+
// size to be less than or equal to onHeapCache size.
178+
assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes);
179+
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
180+
// We should atleast one eviction considering disk cache isn't able to hold anything due to policy.
181+
assertTrue(requestCacheStats.getEvictions() > 0);
182+
assertEquals(0, requestCacheStats.getHitCount());
183+
long lastEvictionSeen = requestCacheStats.getEvictions();
184+
185+
// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
186+
// to cache all entries.
187+
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
188+
Settings.builder()
189+
.put(
190+
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
191+
new TimeValue(0, TimeUnit.MILLISECONDS)
192+
)
193+
.build()
194+
);
195+
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
196+
for (int iterator = 0; iterator < numberOfIndexedItems * 2; iterator++) {
197+
SearchResponse resp = client.prepareSearch("index")
198+
.setRequestCache(true)
199+
.setQuery(QueryBuilders.termQuery(UUID.randomUUID().toString(), UUID.randomUUID().toString()))
200+
.get();
201+
assertSearchResponse(resp);
202+
}
203+
204+
requestCacheStats = getRequestCacheStats(client, "index");
205+
// We shouldn't see any new evictions now.
206+
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
207+
}
208+
150209
public void testWithDynamicTookTimePolicy() throws Exception {
151210
int onHeapCacheSizeInBytes = 2000;
152-
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b")).build());
211+
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
153212
Client client = client();
154213
assertAcked(
155214
client.admin()
@@ -271,9 +330,10 @@ public void testWithDynamicTookTimePolicy() throws Exception {
271330

272331
public void testInvalidationWithIndicesRequestCache() throws Exception {
273332
int onHeapCacheSizeInBytes = 2000;
333+
int numberOfSegments = getNumberOfSegments();
274334
internalCluster().startNode(
275335
Settings.builder()
276-
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
336+
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
277337
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
278338
.build()
279339
);
@@ -354,10 +414,11 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
354414
}
355415

356416
public void testWithExplicitCacheClear() throws Exception {
417+
int numberOfSegments = getNumberOfSegments();
357418
int onHeapCacheSizeInBytes = 2000;
358419
internalCluster().startNode(
359420
Settings.builder()
360-
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
421+
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
361422
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
362423
.build()
363424
);
@@ -426,10 +487,13 @@ public void testWithExplicitCacheClear() throws Exception {
426487
}
427488

428489
public void testWithDynamicDiskCacheSetting() throws Exception {
429-
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
490+
int numberOfSegments = getNumberOfSegments();
491+
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
492+
// that all items are
493+
// cached onto disk.
430494
internalCluster().startNode(
431495
Settings.builder()
432-
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
496+
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
433497
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
434498
.build()
435499
);
@@ -540,6 +604,27 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
540604
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
541605
}
542606

607+
public void testWithInvalidSegmentNumberSetting() throws Exception {
608+
int numberOfSegments = getNumberOfSegments();
609+
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
610+
// that all items are
611+
// cached onto disk.
612+
assertThrows(
613+
String.format(
614+
Locale.ROOT,
615+
INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE,
616+
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
617+
),
618+
IllegalArgumentException.class,
619+
() -> internalCluster().startNode(
620+
Settings.builder()
621+
.put(defaultSettings(onHeapCacheSizeInBytes + "b", 300))
622+
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
623+
.build()
624+
)
625+
);
626+
}
627+
543628
private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
544629
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
545630
}
@@ -550,7 +635,7 @@ public MockDiskCachePlugin() {}
550635

551636
@Override
552637
public Map<String, ICache.Factory> getCacheFactoryMap() {
553-
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
638+
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
554639
}
555640

556641
@Override

0 commit comments

Comments
 (0)