Skip to content

Commit 49e2701

Browse files
sgup432Peter Alfonsi
authored andcommitted
Fix negative requestStats memory_size issue (opensearch-project#13553)
This solves the bug where RequestStats memory_size metric was going negative in certain scenarios as reported in the issue. It turns out that the issue occurs when an indexShard is deleted and then reallocated on the same node. So whenever stale entries from older shard are deleted, those are accounted for the new shard which has the same shardId. --------- Signed-off-by: Sagar Upadhyaya <[email protected]>
1 parent a5c5675 commit 49e2701

File tree

5 files changed

+290
-34
lines changed

5 files changed

+290
-34
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
3738
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
3839
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
3940
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@@ -43,11 +44,17 @@
4344
import org.opensearch.action.search.SearchResponse;
4445
import org.opensearch.action.search.SearchType;
4546
import org.opensearch.client.Client;
47+
import org.opensearch.cluster.ClusterState;
4648
import org.opensearch.cluster.metadata.IndexMetadata;
49+
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
50+
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
4751
import org.opensearch.common.settings.Settings;
4852
import org.opensearch.common.time.DateFormatter;
4953
import org.opensearch.common.unit.TimeValue;
5054
import org.opensearch.common.util.FeatureFlags;
55+
import org.opensearch.core.index.Index;
56+
import org.opensearch.core.index.shard.ShardId;
57+
import org.opensearch.env.NodeEnvironment;
5158
import org.opensearch.index.IndexNotFoundException;
5259
import org.opensearch.index.cache.request.RequestCacheStats;
5360
import org.opensearch.index.query.QueryBuilders;
@@ -58,6 +65,8 @@
5865
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;
5966
import org.opensearch.test.hamcrest.OpenSearchAssertions;
6067

68+
import java.nio.file.Files;
69+
import java.nio.file.Path;
6170
import java.time.ZoneId;
6271
import java.time.ZoneOffset;
6372
import java.time.ZonedDateTime;
@@ -69,6 +78,7 @@
6978

7079
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
7180
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
81+
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
7282
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
7383
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY;
7484
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
@@ -1240,6 +1250,101 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
12401250
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
12411251
}
12421252

1253+
public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
1254+
String node_1 = internalCluster().startNode(Settings.builder().build());
1255+
Client client = client(node_1);
1256+
1257+
logger.info("Starting a node in the cluster");
1258+
1259+
assertThat(cluster().size(), equalTo(1));
1260+
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("1").execute().actionGet();
1261+
assertThat(healthResponse.isTimedOut(), equalTo(false));
1262+
1263+
String indexName = "test";
1264+
1265+
logger.info("Creating an index: {} with 2 shards", indexName);
1266+
createIndex(
1267+
indexName,
1268+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
1269+
);
1270+
1271+
ensureGreen(indexName);
1272+
1273+
logger.info("Writing few docs and searching those which will cache items in RequestCache");
1274+
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
1275+
indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again"));
1276+
SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
1277+
assertSearchResponse(resp);
1278+
resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get();
1279+
1280+
RequestCacheStats stats = getNodeCacheStats(client);
1281+
assertTrue(stats.getMemorySizeInBytes() > 0);
1282+
1283+
logger.info("Disabling allocation");
1284+
Settings newSettings = Settings.builder()
1285+
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name())
1286+
.build();
1287+
client().admin().cluster().prepareUpdateSettings().setTransientSettings(newSettings).execute().actionGet();
1288+
1289+
logger.info("Starting a second node");
1290+
String node_2 = internalCluster().startDataOnlyNode(Settings.builder().build());
1291+
assertThat(cluster().size(), equalTo(2));
1292+
healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
1293+
assertThat(healthResponse.isTimedOut(), equalTo(false));
1294+
1295+
logger.info("Moving the shard:{} from node:{} to node:{}", indexName + "#0", node_1, node_2);
1296+
MoveAllocationCommand cmd = new MoveAllocationCommand(indexName, 0, node_1, node_2);
1297+
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
1298+
ClusterHealthResponse clusterHealth = client().admin()
1299+
.cluster()
1300+
.prepareHealth()
1301+
.setWaitForNoRelocatingShards(true)
1302+
.setWaitForNoInitializingShards(true)
1303+
.get();
1304+
assertThat(clusterHealth.isTimedOut(), equalTo(false));
1305+
1306+
ClusterState state = client().admin().cluster().prepareState().get().getState();
1307+
final Index index = state.metadata().index(indexName).getIndex();
1308+
1309+
assertBusy(() -> {
1310+
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false));
1311+
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true));
1312+
});
1313+
1314+
logger.info("Moving the shard: {} again from node:{} to node:{}", indexName + "#0", node_2, node_1);
1315+
cmd = new MoveAllocationCommand(indexName, 0, node_2, node_1);
1316+
internalCluster().client().admin().cluster().prepareReroute().add(cmd).get();
1317+
clusterHealth = client().admin()
1318+
.cluster()
1319+
.prepareHealth()
1320+
.setWaitForNoRelocatingShards(true)
1321+
.setWaitForNoInitializingShards(true)
1322+
.get();
1323+
assertThat(clusterHealth.isTimedOut(), equalTo(false));
1324+
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
1325+
1326+
assertBusy(() -> {
1327+
assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true));
1328+
assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false));
1329+
});
1330+
1331+
logger.info("Clearing the cache for index:{}. And verify the request stats doesn't go negative", indexName);
1332+
ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(indexName);
1333+
client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet();
1334+
1335+
stats = getNodeCacheStats(client(node_1));
1336+
assertTrue(stats.getMemorySizeInBytes() == 0);
1337+
stats = getNodeCacheStats(client(node_2));
1338+
assertTrue(stats.getMemorySizeInBytes() == 0);
1339+
}
1340+
1341+
private Path shardDirectory(String server, Index index, int shard) {
1342+
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
1343+
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));
1344+
assert paths.length == 1;
1345+
return paths[0];
1346+
}
1347+
12431348
private void setupIndex(Client client, String index) throws Exception {
12441349
assertAcked(
12451350
client.admin()

server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.index.cache.request;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
3537
import org.apache.lucene.util.Accountable;
3638
import org.opensearch.common.metrics.CounterMetric;
3739
import org.opensearch.core.common.bytes.BytesReference;
@@ -43,13 +45,14 @@
4345
*/
4446
public final class ShardRequestCache {
4547

48+
private static final Logger logger = LogManager.getLogger(ShardRequestCache.class);
4649
final CounterMetric evictionsMetric = new CounterMetric();
4750
final CounterMetric totalMetric = new CounterMetric();
4851
final CounterMetric hitCount = new CounterMetric();
4952
final CounterMetric missCount = new CounterMetric();
5053

5154
public RequestCacheStats stats() {
52-
return new RequestCacheStats(totalMetric.count(), evictionsMetric.count(), hitCount.count(), missCount.count());
55+
return new RequestCacheStats(Math.max(0, totalMetric.count()), evictionsMetric.count(), hitCount.count(), missCount.count());
5356
}
5457

5558
public void onHit() {
@@ -74,6 +77,15 @@ public void onRemoval(long keyRamBytesUsed, BytesReference value, boolean evicte
7477
dec += value.ramBytesUsed();
7578
}
7679
totalMetric.dec(dec);
80+
if (totalMetric.count() < 0) {
81+
totalMetric.inc(dec);
82+
logger.warn(
83+
"Ignoring the operation to deduct memory: {} from RequestStats memory_size metric as it will "
84+
+ "go negative. Current memory: {}. This is a bug.",
85+
dec,
86+
totalMetric.count()
87+
);
88+
}
7789
}
7890

7991
// Old functions which increment size by passing in an Accountable. Functional but no longer used.
@@ -82,15 +94,6 @@ public void onCached(Accountable key, BytesReference value) {
8294
}
8395

8496
public void onRemoval(Accountable key, BytesReference value, boolean evicted) {
85-
if (evicted) {
86-
evictionsMetric.inc();
87-
}
88-
long dec = 0;
89-
if (key != null) {
90-
dec += key.ramBytesUsed();
91-
}
92-
if (value != null) {
93-
dec += value.ramBytesUsed();
94-
}
97+
onRemoval(key.ramBytesUsed(), value, evicted);
9598
}
9699
}

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
205205
);
206206
}
207207

208+
// package private for testing
209+
void invalidateAll() {
210+
cache.invalidateAll();
211+
}
212+
208213
@Override
209214
public void close() throws IOException {
210215
cache.invalidateAll();
@@ -233,8 +238,17 @@ public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notifi
233238
// shards as part of request cache.
234239
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
235240
Key key = notification.getKey().key;
236-
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
237-
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
241+
IndicesService.IndexShardCacheEntity indexShardCacheEntity = (IndicesService.IndexShardCacheEntity) cacheEntityLookup.apply(
242+
key.shardId
243+
).orElse(null);
244+
if (indexShardCacheEntity != null) {
245+
// Here we match the hashcode to avoid scenario where we deduct stats of older IndexShard(with same
246+
// shardId) from current IndexShard.
247+
if (key.indexShardHashCode == System.identityHashCode(indexShardCacheEntity.getCacheIdentity())) {
248+
indexShardCacheEntity.onRemoval(notification);
249+
}
250+
}
251+
CleanupKey cleanupKey = new CleanupKey(indexShardCacheEntity, key.readerCacheKeyId);
238252
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification);
239253
}
240254

@@ -266,7 +280,8 @@ BytesReference getOrCompute(
266280
.getReaderCacheHelper();
267281
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
268282
assert readerCacheKeyId != null;
269-
final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId);
283+
IndexShard indexShard = ((IndexShard) cacheEntity.getCacheIdentity());
284+
final Key key = new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard));
270285
Loader cacheLoader = new Loader(cacheEntity, loader);
271286
BytesReference value = cache.computeIfAbsent(getICacheKey(key), cacheLoader);
272287
if (cacheLoader.isLoaded()) {
@@ -299,7 +314,8 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade
299314
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
300315
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
301316
}
302-
cache.invalidate(getICacheKey(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)));
317+
IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity();
318+
cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard))));
303319
}
304320

305321
/**
@@ -377,19 +393,24 @@ interface CacheEntity extends Accountable {
377393
*/
378394
static class Key implements Accountable, Writeable {
379395
public final ShardId shardId; // use as identity equality
396+
public final int indexShardHashCode; // While ShardId is usually sufficient to uniquely identify an
397+
// indexShard but in case where the same indexShard is deleted and reallocated on same node, we need the
398+
// hashcode(default) to identify the older indexShard but with same shardId.
380399
public final String readerCacheKeyId;
381400
public final BytesReference value;
382401

383-
Key(ShardId shardId, BytesReference value, String readerCacheKeyId) {
402+
Key(ShardId shardId, BytesReference value, String readerCacheKeyId, int indexShardHashCode) {
384403
this.shardId = shardId;
385404
this.value = value;
386405
this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId);
406+
this.indexShardHashCode = indexShardHashCode;
387407
}
388408

389409
Key(StreamInput in) throws IOException {
390410
this.shardId = in.readOptionalWriteable(ShardId::new);
391411
this.readerCacheKeyId = in.readOptionalString();
392412
this.value = in.readBytesReference();
413+
this.indexShardHashCode = in.readInt();
393414
}
394415

395416
@Override
@@ -411,6 +432,7 @@ public boolean equals(Object o) {
411432
if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false;
412433
if (!shardId.equals(key.shardId)) return false;
413434
if (!value.equals(key.value)) return false;
435+
if (indexShardHashCode != key.indexShardHashCode) return false;
414436
return true;
415437
}
416438

@@ -419,6 +441,7 @@ public int hashCode() {
419441
int result = shardId.hashCode();
420442
result = 31 * result + readerCacheKeyId.hashCode();
421443
result = 31 * result + value.hashCode();
444+
result = 31 * result + indexShardHashCode;
422445
return result;
423446
}
424447

@@ -427,6 +450,7 @@ public void writeTo(StreamOutput out) throws IOException {
427450
out.writeOptionalWriteable(shardId);
428451
out.writeOptionalString(readerCacheKeyId);
429452
out.writeBytesReference(value);
453+
out.writeInt(indexShardHashCode);
430454
}
431455
}
432456

server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random,
4545
value[i] = (byte) (random.nextInt(126 - 32) + 32);
4646
}
4747
BytesReference keyValue = new BytesArray(value);
48-
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString()); // same UUID source as used in real key
48+
return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString(), shard.hashCode()); // same UUID
49+
// source as used in real key
4950
}
5051
}

0 commit comments

Comments
 (0)