Skip to content

Fixed counter bug in in-memory bucket indexes #4668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ bucketlistDB-<X>.bulk.loads | meter | number of entries Buck
bucketlistDB-live.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB-live.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB-live.bulk.prefetch | timer | time to prefetch
bucketlistDB-live.bulk.eviction | timer | time to load for eviction scan
bucketlistDB-live.bulk.query | timer | time to load for query server
bucketlistDB-<X>.point.<Y> | timer | time to load single entry of type <Y> on BucketList <X> (live/hotArchive)
bucketlistDB-cache.hit | meter | number of cache hits on Live BucketList Disk random eviction cache
bucketlistDB-cache.miss | meter | number of cache misses on Live BucketList Disk random eviction cache
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ LiveBucketSnapshot::scanForEviction(

auto processQueue = [&]() {
auto loadResult = populateLoadedEntries(
keysToSearch, bl.loadKeysWithLimits(keysToSearch, nullptr));
keysToSearch,
bl.loadKeysWithLimits(keysToSearch, "eviction", nullptr));
for (auto& e : maybeEvictQueue)
{
// If TTL entry has not yet been deleted
Expand Down
2 changes: 2 additions & 0 deletions src/bucket/InMemoryIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ InMemoryIndex::InMemoryIndex(BucketManager const& bm,
continue;
}

mCounters.template count<LiveBucket>(be);

// Populate assetPoolIDMap
LedgerKey lk = getBucketLedgerKey(be);
if (be.type() == INITENTRY)
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/SearchableBucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ SearchableLiveBucketListSnapshot::loadInflationWinners(size_t maxWinners,
std::vector<LedgerEntry>
SearchableLiveBucketListSnapshot::loadKeysWithLimits(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
LedgerKeyMeter* lkMeter) const
std::string const& label, LedgerKeyMeter* lkMeter) const
{
auto timer = getBulkLoadTimer("prefetch", inKeys.size()).TimeScope();
auto timer = getBulkLoadTimer(label, inKeys.size()).TimeScope();
auto op = loadKeysInternal(inKeys, lkMeter, std::nullopt);
releaseAssertOrThrow(op);
return std::move(*op);
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/SearchableBucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SearchableLiveBucketListSnapshot

std::vector<LedgerEntry>
loadKeysWithLimits(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
LedgerKeyMeter* lkMeter) const;
std::string const& label, LedgerKeyMeter* lkMeter) const;

EvictionResultCandidates scanForEviction(
uint32_t ledgerSeq, EvictionCounters& counters, EvictionIterator iter,
Expand Down
146 changes: 129 additions & 17 deletions src/bucket/test/BucketIndexTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
// This file contains tests for the BucketIndex and higher-level operations
// concerning key-value lookup based on the BucketList.

#include "bucket/BucketInputIterator.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketSnapshotManager.h"
#include "bucket/BucketUtils.h"
#include "bucket/LiveBucket.h"
#include "bucket/LiveBucketList.h"
#include "bucket/test/BucketTestUtils.h"
#include "ledger/LedgerTypeUtils.h"
#include "ledger/test/LedgerTestUtils.h"
#include "lib/catch.hpp"
#include "main/Application.h"
Expand Down Expand Up @@ -308,7 +310,7 @@ class BucketIndexTest

// Test bulk load lookup
auto loadResult =
searchableBL->loadKeysWithLimits(mKeysToSearch, nullptr);
searchableBL->loadKeysWithLimits(mKeysToSearch, "test", nullptr);
validateResults(mTestEntries, loadResult);

if (expectedHitRate)
Expand Down Expand Up @@ -356,8 +358,8 @@ class BucketIndexTest
mKeysToSearch.size());

// Run bulk lookup again
auto loadResult2 =
searchableBL->loadKeysWithLimits(mKeysToSearch, nullptr);
auto loadResult2 = searchableBL->loadKeysWithLimits(
mKeysToSearch, "test", nullptr);
validateResults(mTestEntries, loadResult2);

checkHitRate(expectedHitRate, startingHitCount, startingMissCount,
Expand Down Expand Up @@ -401,7 +403,7 @@ class BucketIndexTest
}

auto blLoad =
searchableBL->loadKeysWithLimits(searchSubset, nullptr);
searchableBL->loadKeysWithLimits(searchSubset, "test", nullptr);
validateResults(testEntriesSubset, blLoad);
}
}
Expand All @@ -420,8 +422,8 @@ class BucketIndexTest
LedgerKeySet invalidKeys(keysNotInBL.begin(), keysNotInBL.end());

// Test bulk load
REQUIRE(searchableBL->loadKeysWithLimits(invalidKeys, nullptr).size() ==
0);
REQUIRE(searchableBL->loadKeysWithLimits(invalidKeys, "test", nullptr)
.size() == 0);

// Test individual load
for (auto const& key : invalidKeys)
Expand Down Expand Up @@ -745,6 +747,124 @@ TEST_CASE("do not load outdated values", "[bucket][bucketindex]")
testAllIndexTypes(f);
}

TEST_CASE("bucket entry counters", "[bucket][bucketindex]")
{
// Initialize global counter for all of bucketlist
std::map<LedgerEntryTypeAndDurability, size_t> totalEntryTypeCounts;
std::map<LedgerEntryTypeAndDurability, size_t> totalEntryTypeSizes;
for (uint32_t type =
static_cast<uint32_t>(LedgerEntryTypeAndDurability::ACCOUNT);
type < static_cast<uint32_t>(LedgerEntryTypeAndDurability::NUM_TYPES);
++type)
{
totalEntryTypeCounts[static_cast<LedgerEntryTypeAndDurability>(type)] =
0;
totalEntryTypeSizes[static_cast<LedgerEntryTypeAndDurability>(type)] =
0;
}

auto checkBucket = [&](auto bucket) {
if (bucket->isEmpty())
{
return;
}

// Local counter for each bucket
std::map<LedgerEntryTypeAndDurability, size_t> entryTypeCounts;
std::map<LedgerEntryTypeAndDurability, size_t> entryTypeSizes;
for (uint32_t type =
static_cast<uint32_t>(LedgerEntryTypeAndDurability::ACCOUNT);
type <
static_cast<uint32_t>(LedgerEntryTypeAndDurability::NUM_TYPES);
++type)
{
entryTypeCounts[static_cast<LedgerEntryTypeAndDurability>(type)] =
0;
entryTypeSizes[static_cast<LedgerEntryTypeAndDurability>(type)] = 0;
}

for (LiveBucketInputIterator iter(bucket); iter; ++iter)
{
auto be = *iter;
LedgerKey lk = getBucketLedgerKey(be);

auto count = [&](LedgerEntryTypeAndDurability type) {
entryTypeCounts[type]++;
entryTypeSizes[type] += xdr::xdr_size(be);
totalEntryTypeCounts[type]++;
totalEntryTypeSizes[type] += xdr::xdr_size(be);
};

switch (lk.type())
{
case ACCOUNT:
count(LedgerEntryTypeAndDurability::ACCOUNT);
break;
case TRUSTLINE:
count(LedgerEntryTypeAndDurability::TRUSTLINE);
break;
case OFFER:
count(LedgerEntryTypeAndDurability::OFFER);
break;
case DATA:
count(LedgerEntryTypeAndDurability::DATA);
break;
case CLAIMABLE_BALANCE:
count(LedgerEntryTypeAndDurability::CLAIMABLE_BALANCE);
break;
case LIQUIDITY_POOL:
count(LedgerEntryTypeAndDurability::LIQUIDITY_POOL);
break;
case CONTRACT_DATA:
if (isPersistentEntry(lk))
{
count(
LedgerEntryTypeAndDurability::PERSISTENT_CONTRACT_DATA);
}
else
{
count(
LedgerEntryTypeAndDurability::TEMPORARY_CONTRACT_DATA);
}
break;
case CONTRACT_CODE:
count(LedgerEntryTypeAndDurability::CONTRACT_CODE);
break;
case CONFIG_SETTING:
count(LedgerEntryTypeAndDurability::CONFIG_SETTING);
break;
case TTL:
count(LedgerEntryTypeAndDurability::TTL);
break;
}
}

auto const& indexCounters =
bucket->getIndexForTesting().getBucketEntryCounters();
REQUIRE(indexCounters.entryTypeCounts == entryTypeCounts);
REQUIRE(indexCounters.entryTypeSizes == entryTypeSizes);
};

auto f = [&](Config& cfg) {
auto test = BucketIndexTest(cfg);
test.buildMultiVersionTest();

for (auto i = 0; i < LiveBucketList::kNumLevels; ++i)
{
auto level = test.getBM().getLiveBucketList().getLevel(i);
checkBucket(level.getCurr());
checkBucket(level.getSnap());
}

auto summedCounters =
test.getBM().getLiveBucketList().sumBucketEntryCounters();
REQUIRE(summedCounters.entryTypeCounts == totalEntryTypeCounts);
REQUIRE(summedCounters.entryTypeSizes == totalEntryTypeSizes);
};

testAllIndexTypes(f);
}

TEST_CASE("load from historical snapshots", "[bucket][bucketindex]")
{
auto f = [&](Config& cfg) {
Expand Down Expand Up @@ -839,6 +959,9 @@ TEST_CASE("serialize bucket indexes", "[bucket][bucketindex]")

auto& inMemoryIndex = b->getIndexForTesting();
REQUIRE((inMemoryIndex == *onDiskIndex));
auto inMemoryCounters = inMemoryIndex.getBucketEntryCounters();
auto onDiskCounters = onDiskIndex->getBucketEntryCounters();
REQUIRE(inMemoryCounters == onDiskCounters);
}

// Restart app with different config to test that indexes created with
Expand All @@ -862,17 +985,6 @@ TEST_CASE("serialize bucket indexes", "[bucket][bucketindex]")

auto& inMemoryIndex = b->getIndexForTesting();
REQUIRE(inMemoryIndex.getPageSize() == (1UL << 10));
auto inMemoryCoutners = inMemoryIndex.getBucketEntryCounters();
// Ensure the inMemoryIndex has some non-zero counters.
REQUIRE(!inMemoryCoutners.entryTypeCounts.empty());
REQUIRE(!inMemoryCoutners.entryTypeSizes.empty());
bool allZero = true;
for (auto const& [k, v] : inMemoryCoutners.entryTypeCounts)
{
allZero = allZero && (v == 0);
allZero = allZero && (inMemoryCoutners.entryTypeSizes.at(k) == 0);
}
REQUIRE(!allZero);

// Check if on-disk index rewritten with correct config params
auto indexFilename = test.getBM().bucketIndexFilename(bucketHash);
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/LedgerTxn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2950,7 +2950,7 @@ LedgerTxnRoot::Impl::prefetchInternal(UnorderedSet<LedgerKey> const& keys,
insertIfNotLoaded(keysToSearch, key);
}
auto blLoad = getSearchableLiveBucketListSnapshot().loadKeysWithLimits(
keysToSearch, lkMeter);
keysToSearch, "prefetch", lkMeter);
cacheResult(populateLoadedEntries(keysToSearch, blLoad, lkMeter));

return total;
Expand Down
4 changes: 2 additions & 2 deletions src/main/QueryServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ QueryServer::getLedgerEntryRaw(std::string const& params,
// Otherwise default to current ledger
else
{
loadedKeys =
bl.loadKeysWithLimits(orderedKeys, /*lkMeter=*/nullptr);
loadedKeys = bl.loadKeysWithLimits(orderedKeys, "query",
/*lkMeter=*/nullptr);
root["ledgerSeq"] = bl.getLedgerSeq();
}

Expand Down