47
47
import org .opensearch .common .cache .LoadAwareCacheLoader ;
48
48
import org .opensearch .common .cache .RemovalListener ;
49
49
import org .opensearch .common .cache .RemovalNotification ;
50
+ import org .opensearch .common .cache .RemovalReason ;
50
51
import org .opensearch .common .cache .policy .CachedQueryResult ;
51
52
import org .opensearch .common .cache .serializer .BytesReferenceSerializer ;
52
53
import org .opensearch .common .cache .service .CacheService ;
@@ -216,19 +217,16 @@ void clear(CacheEntity entity) {
216
217
public void onRemoval (RemovalNotification <ICacheKey <Key >, BytesReference > notification ) {
217
218
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
218
219
// shards as part of request cache.
219
-
220
220
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
221
221
Key key = notification .getKey ().key ;
222
222
RemovalNotification <Key , BytesReference > newNotification = new RemovalNotification <>(
223
223
key ,
224
224
notification .getValue (),
225
225
notification .getRemovalReason ()
226
226
);
227
-
228
227
cacheEntityLookup .apply (key .shardId ).ifPresent (entity -> entity .onRemoval (newNotification ));
229
- cacheCleanupManager .updateCleanupKeyToCountMapOnCacheEviction (
230
- new CleanupKey (cacheEntityLookup .apply (key .shardId ).orElse (null ), key .readerCacheKeyId )
231
- );
228
+ CleanupKey cleanupKey = new CleanupKey (cacheEntityLookup .apply (key .shardId ).orElse (null ), key .readerCacheKeyId );
229
+ cacheCleanupManager .updateStaleCountOnEntryRemoval (cleanupKey , newNotification );
232
230
}
233
231
234
232
private ICacheKey <Key > getICacheKey (Key key ) {
@@ -272,10 +270,11 @@ BytesReference getOrCompute(
272
270
OpenSearchDirectoryReader .addReaderCloseListener (reader , cleanupKey );
273
271
}
274
272
}
275
- cacheCleanupManager .updateCleanupKeyToCountMapOnCacheInsertion (cleanupKey );
273
+ cacheCleanupManager .updateStaleCountOnCacheInsert (cleanupKey );
276
274
} else {
277
275
cacheEntity .onHit ();
278
276
}
277
+
279
278
return value ;
280
279
}
281
280
@@ -508,7 +507,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
508
507
*
509
508
* @param cleanupKey the CleanupKey to be updated in the map
510
509
*/
511
- private void updateCleanupKeyToCountMapOnCacheInsertion (CleanupKey cleanupKey ) {
510
+ private void updateStaleCountOnCacheInsert (CleanupKey cleanupKey ) {
512
511
if (stalenessThreshold == 0.0 || cleanupKey .entity == null ) {
513
512
return ;
514
513
}
@@ -524,8 +523,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
524
523
cleanupKeyToCountMap .computeIfAbsent (shardId , k -> new HashMap <>()).merge (cleanupKey .readerCacheKeyId , 1 , Integer ::sum );
525
524
}
526
525
527
- private void updateCleanupKeyToCountMapOnCacheEviction (CleanupKey cleanupKey ) {
528
- if (stalenessThreshold == 0.0 || cleanupKey .entity == null ) {
526
+ /**
527
+ * Handles the eviction of a cache entry.
528
+ *
529
+ * <p>This method is called when an entry is evicted from the cache.
530
+ * We consider all removal notifications except with the reason Replaced
531
+ * {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
532
+ * Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
533
+ * Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
534
+ *
535
+ * @param cleanupKey the CleanupKey that has been evicted from the cache
536
+ * @param notification RemovalNotification of the cache entry evicted
537
+ */
538
+ private void updateStaleCountOnEntryRemoval (CleanupKey cleanupKey , RemovalNotification <Key , BytesReference > notification ) {
539
+ if (notification .getRemovalReason () == RemovalReason .REPLACED ) {
540
+ // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
541
+ // does not affect the staleness count, we skip such notifications.
542
+ return ;
543
+ }
544
+ if (cleanupKey .entity == null ) {
545
+ // entity will only be null when the shard is closed/deleted
546
+ // we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
547
+ // readers
548
+ staleKeysCount .decrementAndGet ();
529
549
return ;
530
550
}
531
551
IndexShard indexShard = (IndexShard ) cleanupKey .entity .getCacheIdentity ();
@@ -535,23 +555,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
535
555
}
536
556
ShardId shardId = indexShard .shardId ();
537
557
538
- cleanupKeyToCountMap .computeIfPresent (shardId , (shard , keyCountMap ) -> {
539
- keyCountMap .computeIfPresent (cleanupKey .readerCacheKeyId , (key , currentValue ) -> {
540
- // decrement the stale key count
558
+ cleanupKeyToCountMap .compute (shardId , (key , readerCacheKeyMap ) -> {
559
+ if (readerCacheKeyMap == null || !readerCacheKeyMap .containsKey (cleanupKey .readerCacheKeyId )) {
560
+ // If ShardId is not present or readerCacheKeyId is not present
561
+ // it should have already been accounted for and hence been removed from this map
562
+ // so decrement staleKeysCount
541
563
staleKeysCount .decrementAndGet ();
542
- int newValue = currentValue - 1 ;
543
- // Remove the key if the new value is zero by returning null; otherwise, update with the new value.
544
- return newValue == 0 ? null : newValue ;
545
- });
546
- return keyCountMap ;
564
+ // Return the current map
565
+ return readerCacheKeyMap ;
566
+ } else {
567
+ // If it is in the map, it is not stale yet.
568
+ // Proceed to adjust the count for the readerCacheKeyId in the map
569
+ // but do not decrement the staleKeysCount
570
+ Integer count = readerCacheKeyMap .get (cleanupKey .readerCacheKeyId );
571
+ // this should never be null
572
+ assert (count != null && count >= 0 );
573
+ // Reduce the count by 1
574
+ int newCount = count - 1 ;
575
+ if (newCount > 0 ) {
576
+ // Update the map with the new count
577
+ readerCacheKeyMap .put (cleanupKey .readerCacheKeyId , newCount );
578
+ } else {
579
+ // Remove the readerCacheKeyId entry if new count is zero
580
+ readerCacheKeyMap .remove (cleanupKey .readerCacheKeyId );
581
+ }
582
+ // If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
583
+ return readerCacheKeyMap .isEmpty () ? null : readerCacheKeyMap ;
584
+ }
547
585
});
548
586
}
549
587
550
588
/**
551
589
* Updates the count of stale keys in the cache.
552
590
* This method is called when a CleanupKey is added to the keysToClean set.
553
591
*
554
- * It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
592
+ * <p> It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
555
593
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
556
594
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
557
595
*
@@ -569,7 +607,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
569
607
ShardId shardId = indexShard .shardId ();
570
608
571
609
// Using computeIfPresent to atomically operate on the countMap for a given shardId
572
- cleanupKeyToCountMap .computeIfPresent (shardId , (key , countMap ) -> {
610
+ cleanupKeyToCountMap .computeIfPresent (shardId , (currentShardId , countMap ) -> {
573
611
if (cleanupKey .readerCacheKeyId == null ) {
574
612
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
575
613
int totalSum = countMap .values ().stream ().mapToInt (Integer ::intValue ).sum ();
@@ -578,18 +616,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
578
616
return null ;
579
617
} else {
580
618
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
581
- countMap .computeIfPresent (cleanupKey .readerCacheKeyId , (k , v ) -> {
582
- staleKeysCount .addAndGet (v );
619
+ countMap .computeIfPresent (cleanupKey .readerCacheKeyId , (readerCacheKey , count ) -> {
620
+ staleKeysCount .addAndGet (count );
583
621
// Return null to remove the key after updating staleKeysCount
584
622
return null ;
585
623
});
586
-
587
624
// Check if countMap is empty after removal to decide if we need to remove the shardId entry
588
625
if (countMap .isEmpty ()) {
589
- return null ; // Returning null removes the entry for shardId
626
+ // Returning null removes the entry for shardId
627
+ return null ;
590
628
}
591
629
}
592
- return countMap ; // Return the modified countMap to keep the mapping
630
+ // Return the modified countMap to retain updates
631
+ return countMap ;
593
632
});
594
633
}
595
634
@@ -715,6 +754,11 @@ public void close() {
715
754
this .cacheCleaner .close ();
716
755
}
717
756
757
+ // for testing
758
+ ConcurrentMap <ShardId , HashMap <String , Integer >> getCleanupKeyToCountMap () {
759
+ return cleanupKeyToCountMap ;
760
+ }
761
+
718
762
private final class IndicesRequestCacheCleaner implements Runnable , Releasable {
719
763
720
764
private final IndicesRequestCacheCleanupManager cacheCleanupManager ;
0 commit comments