Skip to content

Commit ccf5289

Browse files
[Tiered Caching] [Bug Fix] Use concurrentMap instead of HashMap to fix Concurrent Modification Exception (opensearch-project#14221)
* use concurrentmap Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * revert feature flags Signed-off-by: Kiran Prakash <[email protected]> * changelog to releaselog Signed-off-by: Kiran Prakash <[email protected]> * use concurrentmap Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <[email protected]> * revert feature flags Signed-off-by: Kiran Prakash <[email protected]> * changelog to releaselog Signed-off-by: Kiran Prakash <[email protected]> * revert the test removal Signed-off-by: Kiran Prakash <[email protected]> * revert the conflict resolutions Signed-off-by: Kiran Prakash <[email protected]> --------- Signed-off-by: Kiran Prakash <[email protected]>
1 parent 435af89 commit ccf5289

File tree

3 files changed

+69
-11
lines changed

3 files changed

+69
-11
lines changed

release-notes/opensearch.release-notes-2.15.0.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,5 @@
7070
- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015))
7171
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
7272
- Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146))
73-
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
73+
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
74+
- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221))

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
import java.io.IOException;
7676
import java.util.Collection;
7777
import java.util.Collections;
78-
import java.util.HashMap;
7978
import java.util.HashSet;
8079
import java.util.Iterator;
8180
import java.util.List;
@@ -507,7 +506,7 @@ public int hashCode() {
507506
* */
508507
class IndicesRequestCacheCleanupManager implements Closeable {
509508
private final Set<CleanupKey> keysToClean;
510-
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
509+
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
511510
private final AtomicInteger staleKeysCount;
512511
private volatile double stalenessThreshold;
513512
private final IndicesRequestCacheCleaner cacheCleaner;
@@ -568,7 +567,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
568567

569568
// If the key doesn't exist, it's added with a value of 1.
570569
// If the key exists, its value is incremented by 1.
571-
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
570+
addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId);
571+
}
572+
573+
// pkg-private for testing
574+
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
575+
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
576+
.merge(readerCacheKeyId, 1, Integer::sum);
572577
}
573578

574579
/**
@@ -826,7 +831,7 @@ public void close() {
826831
}
827832

828833
// for testing
829-
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
834+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
830835
return cleanupKeyToCountMap;
831836
}
832837

server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java

+58-6
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
import java.util.ArrayList;
9696
import java.util.Arrays;
9797
import java.util.Collections;
98-
import java.util.HashMap;
98+
import java.util.ConcurrentModificationException;
9999
import java.util.List;
100100
import java.util.Map;
101101
import java.util.Optional;
@@ -105,7 +105,9 @@
105105
import java.util.concurrent.CountDownLatch;
106106
import java.util.concurrent.ExecutorService;
107107
import java.util.concurrent.Executors;
108+
import java.util.concurrent.Phaser;
108109
import java.util.concurrent.TimeUnit;
110+
import java.util.concurrent.atomic.AtomicBoolean;
109111
import java.util.concurrent.atomic.AtomicInteger;
110112

111113
import static java.util.Collections.emptyMap;
@@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
489491
indexShard.hashCode()
490492
);
491493
// test the mapping
492-
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
494+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
493495
// shard id should exist
494496
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
495497
// reader CacheKeyId should NOT exist
@@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
552554
);
553555

554556
// test the mapping
555-
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
557+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
556558
// shard id should exist
557559
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
558560
// reader CacheKeyId should NOT exist
@@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
720722
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
721723
assertEquals(1, cache.count());
722724
// test the mappings
723-
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
725+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
724726
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));
725727

726728
cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
@@ -793,8 +795,54 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
793795
IOUtils.close(secondReader);
794796
}
795797

796-
private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
797-
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
798+
// test adding to cleanupKeyToCountMap with multiple threads
799+
public void testAddToCleanupKeyToCountMap() throws Exception {
800+
threadPool = getThreadPool();
801+
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
802+
cache = getIndicesRequestCache(settings);
803+
804+
int numberOfThreads = 10;
805+
int numberOfIterations = 1000;
806+
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
807+
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
808+
809+
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
810+
811+
for (int i = 0; i < numberOfThreads; i++) {
812+
executorService.submit(() -> {
813+
phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time
814+
try {
815+
for (int j = 0; j < numberOfIterations; j++) {
816+
cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString());
817+
}
818+
} catch (ConcurrentModificationException e) {
819+
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
820+
exceptionDetected.set(true); // Set flag if exception is detected
821+
}
822+
});
823+
}
824+
phaser.arriveAndAwaitAdvance(); // Start all threads
825+
826+
// Main thread iterates over the map
827+
executorService.submit(() -> {
828+
try {
829+
for (int j = 0; j < numberOfIterations; j++) {
830+
cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> {
831+
v.forEach((k1, v1) -> {
832+
// Accessing the map to create contention
833+
v.get(k1);
834+
});
835+
});
836+
}
837+
} catch (ConcurrentModificationException e) {
838+
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
839+
exceptionDetected.set(true); // Set flag if exception is detected
840+
}
841+
});
842+
843+
executorService.shutdown();
844+
executorService.awaitTermination(60, TimeUnit.SECONDS);
845+
assertFalse(exceptionDetected.get());
798846
}
799847

800848
private IndicesRequestCache getIndicesRequestCache(Settings settings) {
@@ -808,6 +856,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
808856
);
809857
}
810858

859+
private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
860+
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
861+
}
862+
811863
private Loader getLoader(DirectoryReader reader) {
812864
return new Loader(reader, 0);
813865
}

0 commit comments

Comments
 (0)