Skip to content

Commit f804ba4

Browse files
opensearch-trigger-bot[bot]github-actions[bot]
authored andcommitted
[Tiered Caching] [Bug Fix] Use concurrentMap instead of HashMap to fix Concurrent Modification Exception (opensearch-project#14221) (opensearch-project#14253)
* use concurrentmap * Update IndicesRequestCacheTests.java * Update IndicesRequestCacheTests.java * Update CHANGELOG.md * Update IndicesRequestCache.java * Update IndicesRequestCacheTests.java * Update IndicesRequestCacheTests.java * revert feature flags * changelog to releaselog * use concurrentmap * Update IndicesRequestCacheTests.java * Update IndicesRequestCacheTests.java * Update CHANGELOG.md * Update IndicesRequestCache.java * Update IndicesRequestCacheTests.java * Update IndicesRequestCacheTests.java * revert feature flags * changelog to releaselog * revert the test removal * revert the conflict resolutions --------- (cherry picked from commit ccf5289) Signed-off-by: Kiran Prakash <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Signed-off-by: kkewwei <[email protected]>
1 parent 427830d commit f804ba4

File tree

3 files changed

+69
-11
lines changed

3 files changed

+69
-11
lines changed

release-notes/opensearch.release-notes-2.15.0.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,5 @@
7070
- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015))
7171
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
7272
- Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146))
73-
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
73+
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))
74+
- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221))

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
import java.io.IOException;
7676
import java.util.Collection;
7777
import java.util.Collections;
78-
import java.util.HashMap;
7978
import java.util.HashSet;
8079
import java.util.Iterator;
8180
import java.util.List;
@@ -508,7 +507,7 @@ public int hashCode() {
508507
* */
509508
class IndicesRequestCacheCleanupManager implements Closeable {
510509
private final Set<CleanupKey> keysToClean;
511-
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
510+
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
512511
private final AtomicInteger staleKeysCount;
513512
private volatile double stalenessThreshold;
514513
private final IndicesRequestCacheCleaner cacheCleaner;
@@ -569,7 +568,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
569568

570569
// If the key doesn't exist, it's added with a value of 1.
571570
// If the key exists, its value is incremented by 1.
572-
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
571+
addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId);
572+
}
573+
574+
// pkg-private for testing
575+
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
576+
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
577+
.merge(readerCacheKeyId, 1, Integer::sum);
573578
}
574579

575580
/**
@@ -827,7 +832,7 @@ public void close() {
827832
}
828833

829834
// for testing
830-
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
835+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
831836
return cleanupKeyToCountMap;
832837
}
833838

server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java

+58-6
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
import java.util.ArrayList;
9696
import java.util.Arrays;
9797
import java.util.Collections;
98-
import java.util.HashMap;
98+
import java.util.ConcurrentModificationException;
9999
import java.util.List;
100100
import java.util.Map;
101101
import java.util.Optional;
@@ -105,7 +105,9 @@
105105
import java.util.concurrent.CountDownLatch;
106106
import java.util.concurrent.ExecutorService;
107107
import java.util.concurrent.Executors;
108+
import java.util.concurrent.Phaser;
108109
import java.util.concurrent.TimeUnit;
110+
import java.util.concurrent.atomic.AtomicBoolean;
109111
import java.util.concurrent.atomic.AtomicInteger;
110112

111113
import static java.util.Collections.emptyMap;
@@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
489491
indexShard.hashCode()
490492
);
491493
// test the mapping
492-
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
494+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
493495
// shard id should exist
494496
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
495497
// reader CacheKeyId should NOT exist
@@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
552554
);
553555

554556
// test the mapping
555-
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
557+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
556558
// shard id should exist
557559
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
558560
// reader CacheKeyId should NOT exist
@@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
720722
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
721723
assertEquals(1, cache.count());
722724
// test the mappings
723-
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
725+
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
724726
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));
725727

726728
cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
@@ -793,8 +795,54 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
793795
IOUtils.close(secondReader);
794796
}
795797

796-
private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
797-
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
798+
// test adding to cleanupKeyToCountMap with multiple threads
799+
public void testAddToCleanupKeyToCountMap() throws Exception {
800+
threadPool = getThreadPool();
801+
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
802+
cache = getIndicesRequestCache(settings);
803+
804+
int numberOfThreads = 10;
805+
int numberOfIterations = 1000;
806+
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
807+
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
808+
809+
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
810+
811+
for (int i = 0; i < numberOfThreads; i++) {
812+
executorService.submit(() -> {
813+
phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time
814+
try {
815+
for (int j = 0; j < numberOfIterations; j++) {
816+
cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString());
817+
}
818+
} catch (ConcurrentModificationException e) {
819+
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
820+
exceptionDetected.set(true); // Set flag if exception is detected
821+
}
822+
});
823+
}
824+
phaser.arriveAndAwaitAdvance(); // Start all threads
825+
826+
// Main thread iterates over the map
827+
executorService.submit(() -> {
828+
try {
829+
for (int j = 0; j < numberOfIterations; j++) {
830+
cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> {
831+
v.forEach((k1, v1) -> {
832+
// Accessing the map to create contention
833+
v.get(k1);
834+
});
835+
});
836+
}
837+
} catch (ConcurrentModificationException e) {
838+
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
839+
exceptionDetected.set(true); // Set flag if exception is detected
840+
}
841+
});
842+
843+
executorService.shutdown();
844+
executorService.awaitTermination(60, TimeUnit.SECONDS);
845+
assertFalse(exceptionDetected.get());
798846
}
799847

800848
private IndicesRequestCache getIndicesRequestCache(Settings settings) {
@@ -808,6 +856,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
808856
);
809857
}
810858

859+
private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
860+
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
861+
}
862+
811863
private Loader getLoader(DirectoryReader reader) {
812864
return new Loader(reader, 0);
813865
}

0 commit comments

Comments
 (0)