diff --git a/src/bucket/BucketBase.cpp b/src/bucket/BucketBase.cpp index d2ef06098e..b3ef0d395f 100644 --- a/src/bucket/BucketBase.cpp +++ b/src/bucket/BucketBase.cpp @@ -9,6 +9,7 @@ #include "bucket/BucketBase.h" #include "bucket/BucketInputIterator.h" #include "bucket/BucketManager.h" +#include "bucket/BucketMergeAdapter.h" #include "bucket/BucketOutputIterator.h" #include "bucket/BucketUtils.h" #include "bucket/HotArchiveBucket.h" @@ -146,6 +147,49 @@ BucketBase::randomBucketIndexName(std::string const& tmpDir) return randomFileName(tmpDir, ".index"); } +template +bool +BucketBase::updateMergeCountersForProtocolVersion( + MergeCounters& mc, uint32_t protocolVersion, + std::vector> const& shadowIterators) +{ + // Don't count shadow metrics for Hot Archive BucketList + if constexpr (std::is_same_v) + { + return true; + } + + bool keepShadowedLifecycleEntries = true; + + if (protocolVersionIsBefore( + protocolVersion, + LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY)) + { + ++mc.mPreInitEntryProtocolMerges; + keepShadowedLifecycleEntries = false; + } + else + { + ++mc.mPostInitEntryProtocolMerges; + } + + if (protocolVersionIsBefore(protocolVersion, + LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED)) + { + ++mc.mPreShadowRemovalProtocolMerges; + } + else + { + if (!shadowIterators.empty()) + { + throw std::runtime_error("Shadows are not supported"); + } + ++mc.mPostShadowRemovalProtocolMerges; + } + + return keepShadowedLifecycleEntries; +} + // The protocol used in a merge is the maximum of any of the protocols used in // its input buckets, _including_ any of its shadows. We need to be strict about // this for the same reason we change shadow algorithms along with merge @@ -221,91 +265,101 @@ calculateMergeProtocolVersion( // we switch shadowing-behaviour to a more conservative mode, in order to // support annihilation of INITENTRY and DEADENTRY pairs. See commentary // above in `maybePut`. - keepShadowedLifecycleEntries = true; - - // Don't count shadow metrics for Hot Archive BucketList - if constexpr (std::is_same_v) - { - return; - } - - if (protocolVersionIsBefore( - protocolVersion, - LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY)) - { - ++mc.mPreInitEntryProtocolMerges; - keepShadowedLifecycleEntries = false; - } - else - { - ++mc.mPostInitEntryProtocolMerges; - } - - if (protocolVersionIsBefore(protocolVersion, - LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED)) - { - ++mc.mPreShadowRemovalProtocolMerges; - } - else - { - if (!shadowIterators.empty()) - { - throw std::runtime_error("Shadows are not supported"); - } - ++mc.mPostShadowRemovalProtocolMerges; - } + keepShadowedLifecycleEntries = + BucketBase::updateMergeCountersForProtocolVersion( + mc, protocolVersion, shadowIterators); } // There are 4 "easy" cases for merging: exhausted iterators on either // side, or entries that compare non-equal. In all these cases we just // take the lesser (or existing) entry and advance only one iterator, // not scrutinizing the entry type further. -template +template static bool mergeCasesWithDefaultAcceptance( BucketEntryIdCmp const& cmp, MergeCounters& mc, - BucketInputIterator& oi, BucketInputIterator& ni, - BucketOutputIterator& out, + InputSource& inputSource, + std::function putFunc, std::vector>& shadowIterators, uint32_t protocolVersion, bool keepShadowedLifecycleEntries) { BUCKET_TYPE_ASSERT(BucketT); - if (!ni || (oi && ni && cmp(*oi, *ni))) + // Either of: + // + // - Out of new entries. + // - Old entry has smaller key. + // + // In both cases: take old entry. + if (inputSource.oldFirst()) { - // Either of: - // - // - Out of new entries. - // - Old entry has smaller key. - // - // In both cases: take old entry. + // Take old entry + auto entry = inputSource.getOldEntry(); ++mc.mOldEntriesDefaultAccepted; - BucketT::checkProtocolLegality(*oi, protocolVersion); - BucketT::countOldEntryType(mc, *oi); - BucketT::maybePut(out, *oi, shadowIterators, + BucketT::checkProtocolLegality(entry, protocolVersion); + BucketT::countOldEntryType(mc, entry); + BucketT::maybePut(putFunc, entry, shadowIterators, keepShadowedLifecycleEntries, mc); - ++oi; + inputSource.advanceOld(); return true; } - else if (!oi || (oi && ni && cmp(*ni, *oi))) + // Either of: + // + // - Out of old entries. + // - New entry has smaller key. + // + // In both cases: take new entry. + else if (inputSource.newFirst()) { - // Either of: - // - // - Out of old entries. - // - New entry has smaller key. - // - // In both cases: take new entry. + auto entry = inputSource.getNewEntry(); ++mc.mNewEntriesDefaultAccepted; - BucketT::checkProtocolLegality(*ni, protocolVersion); - BucketT::countNewEntryType(mc, *ni); - BucketT::maybePut(out, *ni, shadowIterators, + BucketT::checkProtocolLegality(entry, protocolVersion); + BucketT::countNewEntryType(mc, entry); + BucketT::maybePut(putFunc, entry, shadowIterators, keepShadowedLifecycleEntries, mc); - ++ni; + inputSource.advanceNew(); return true; } return false; } +template +template +void +BucketBase::mergeInternal( + BucketManager& bucketManager, InputSource& inputSource, PutFuncT putFunc, + uint32_t protocolVersion, + std::vector>& shadowIterators, + bool keepShadowedLifecycleEntries, MergeCounters& mc) +{ + BucketEntryIdCmp cmp; + size_t iter = 0; + + while (!inputSource.isDone()) + { + // Check if the merge should be stopped every few entries + if (++iter >= 1000) + { + iter = 0; + if (bucketManager.isShutdown()) + { + // Stop merging, as BucketManager is now shutdown + throw std::runtime_error( + "Incomplete bucket merge due to BucketManager shutdown"); + } + } + + if (!mergeCasesWithDefaultAcceptance( + cmp, mc, inputSource, putFunc, shadowIterators, protocolVersion, + keepShadowedLifecycleEntries)) + { + BucketT::template mergeCasesWithEqualKeys( + mc, inputSource, putFunc, shadowIterators, protocolVersion, + keepShadowedLifecycleEntries); + } + } +} + template std::shared_ptr BucketBase::merge( @@ -363,34 +417,15 @@ BucketBase::merge( keepTombstoneEntries, meta, mc, ctx, doFsync); - BucketEntryIdCmp cmp; - size_t iter = 0; + FileMergeInput inputSource(oi, ni); + auto putFunc = [&out](typename BucketT::EntryT const& entry) { + out.put(entry); + }; - while (oi || ni) - { - // Check if the merge should be stopped every few entries - if (++iter >= 1000) - { - iter = 0; - if (bucketManager.isShutdown()) - { - // Stop merging, as BucketManager is now shutdown - // This is safe as temp file has not been adopted yet, - // so it will be removed with the tmp dir - throw std::runtime_error( - "Incomplete bucket merge due to BucketManager shutdown"); - } - } + // Perform the merge + mergeInternal(bucketManager, inputSource, putFunc, protocolVersion, + shadowIterators, keepShadowedLifecycleEntries, mc); - if (!mergeCasesWithDefaultAcceptance( - cmp, mc, oi, ni, out, shadowIterators, protocolVersion, - keepShadowedLifecycleEntries)) - { - BucketT::mergeCasesWithEqualKeys(mc, oi, ni, out, shadowIterators, - protocolVersion, - keepShadowedLifecycleEntries); - } - } if (countMergeEvents) { bucketManager.incrMergeCounters(mc); @@ -408,6 +443,20 @@ BucketBase::merge( return out.getBucket(bucketManager, &mk); } +template void BucketBase::mergeInternal< + MemoryMergeInput, std::function>( + BucketManager&, MemoryMergeInput&, + std::function, uint32_t, + std::vector>&, bool, MergeCounters&); + +template void +BucketBase::mergeInternal< + MemoryMergeInput, + std::function>( + BucketManager&, MemoryMergeInput&, + std::function, uint32_t, + std::vector>&, bool, MergeCounters&); + template class BucketBase; template class BucketBase; } \ No newline at end of file diff --git a/src/bucket/BucketBase.h b/src/bucket/BucketBase.h index cbd43236b8..f773a4c2a4 100644 --- a/src/bucket/BucketBase.h +++ b/src/bucket/BucketBase.h @@ -4,12 +4,12 @@ // under the apache license, version 2.0. see the copying file at the root // of this distribution or at http://www.apache.org/licenses/license-2.0 +#include "bucket/BucketInputIterator.h" #include "bucket/BucketUtils.h" #include "util/NonCopyable.h" #include "util/ProtocolVersion.h" #include "xdr/Stellar-types.h" #include -#include #include namespace asio @@ -130,6 +130,23 @@ class BucketBase : public NonMovableOrCopyable bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx, bool doFsync); + // Returns whether shadowed lifecycle entries should be kept + static bool updateMergeCountersForProtocolVersion( + MergeCounters& mc, uint32_t protocolVersion, + std::vector> const& shadowIterators); + + // Helper function that implements the core merge algorithm logic for both + // iterator based and in-memory merges. + // PutFunc will be called to write entries that are the result of the merge. + // We have to use a template here to break a dependency on the BucketT type, + // but PutFuncT == std::function + template + static void + mergeInternal(BucketManager& bucketManager, InputSource& inputSource, + PutFuncT putFunc, uint32_t protocolVersion, + std::vector>& shadowIterators, + bool keepShadowedLifecycleEntries, MergeCounters& mc); + static std::string randomBucketName(std::string const& tmpDir); static std::string randomBucketIndexName(std::string const& tmpDir); diff --git a/src/bucket/BucketListBase.cpp b/src/bucket/BucketListBase.cpp index 647e65d07a..e2787adb4b 100644 --- a/src/bucket/BucketListBase.cpp +++ b/src/bucket/BucketListBase.cpp @@ -26,6 +26,7 @@ BucketLevel::BucketLevel(uint32_t i) : mLevel(i) , mCurr(std::make_shared()) , mSnap(std::make_shared()) + , mNextCurrInMemory(nullptr) { } @@ -79,6 +80,7 @@ void BucketLevel::setCurr(std::shared_ptr b) { mNextCurr.clear(); + mNextCurrInMemory.reset(); mCurr = b; } @@ -127,13 +129,82 @@ template void BucketLevel::commit() { - if (mNextCurr.isLive()) + if (mNextCurrInMemory) { + // If we have an in-memory merged result, use that + setCurr(mNextCurrInMemory); + } + else if (mNextCurr.isLive()) + { + // Otherwise use the async merge result setCurr(mNextCurr.resolve()); } releaseAssert(!mNextCurr.isMerging()); } +template <> +template +void +BucketLevel::prepareFirstLevel(Application& app, + uint32_t currLedger, + uint32_t currLedgerProtocol, + bool countMergeEvents, bool doFsync, + VectorT const&... inputVectors) +{ + ZoneScoped; + + // This method should only be called on level 0 + releaseAssert(mLevel == 0); + + // We shouldn't have an in-progress merge + releaseAssert(!mNextCurr.isMerging()); + releaseAssert(!mNextCurrInMemory); + + auto curr = + BucketListBase::shouldMergeWithEmptyCurr(currLedger, mLevel) + ? std::make_shared() + : mCurr; + + // On startup, the current bucket may not be initialized in memory, so + // fallback to normal prepare + if (!curr->hasInMemoryEntries()) + { + auto snap = LiveBucket::fresh( + app.getBucketManager(), currLedgerProtocol, inputVectors..., + countMergeEvents, app.getClock().getIOContext(), doFsync); + prepare(app, currLedger, currLedgerProtocol, snap, /*shadows=*/{}, + countMergeEvents); + return; + } + + auto snap = LiveBucket::fresh( + app.getBucketManager(), currLedgerProtocol, inputVectors..., + countMergeEvents, app.getClock().getIOContext(), doFsync, + /*storeInMemory=*/true, /*shouldIndex=*/false); + + auto& bucketManager = app.getBucketManager(); + auto& ctx = app.getClock().getIOContext(); + mNextCurrInMemory = + LiveBucket::mergeInMemory(bucketManager, currLedgerProtocol, curr, snap, + countMergeEvents, ctx, doFsync); +} + +template <> +template +void +BucketLevel::prepareFirstLevel( + Application& app, uint32_t currLedger, uint32_t currLedgerProtocol, + bool countMergeEvents, bool doFsync, VectorT const&... inputVectors) +{ + // Hot Archive does not support in-memory merge, so we just use the + // normal prepare method + auto snap = HotArchiveBucket::fresh( + app.getBucketManager(), currLedgerProtocol, inputVectors..., + countMergeEvents, app.getClock().getIOContext(), doFsync); + prepare(app, currLedger, currLedgerProtocol, snap, /*shadows=*/{}, + countMergeEvents); +} + // prepare builds a FutureBucket for the _next state_ of the current level, // kicking off a merge that will finish sometime later. // @@ -179,6 +250,8 @@ BucketLevel::prepare( // If more than one absorb is pending at the same time, we have a logic // error in our caller (and all hell will break loose). releaseAssert(!mNextCurr.isMerging()); + releaseAssert(!mNextCurrInMemory); + auto curr = BucketListBase::shouldMergeWithEmptyCurr(currLedger, mLevel) ? std::make_shared() @@ -657,12 +730,8 @@ BucketListBase::addBatchInternal(Application& app, uint32_t currLedger, !app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING; bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC; releaseAssert(shadows.size() == 0); - mLevels[0].prepare(app, currLedger, currLedgerProtocol, - BucketT::fresh(app.getBucketManager(), - currLedgerProtocol, inputVectors..., - countMergeEvents, - app.getClock().getIOContext(), doFsync), - shadows, countMergeEvents); + mLevels[0].prepareFirstLevel(app, currLedger, currLedgerProtocol, + countMergeEvents, doFsync, inputVectors...); mLevels[0].commit(); // We almost always want to try to resolve completed merges to single diff --git a/src/bucket/BucketListBase.h b/src/bucket/BucketListBase.h index ac4bfc0ccb..0fd56413cf 100644 --- a/src/bucket/BucketListBase.h +++ b/src/bucket/BucketListBase.h @@ -363,6 +363,7 @@ template class BucketLevel FutureBucket mNextCurr; std::shared_ptr mCurr; std::shared_ptr mSnap; + std::shared_ptr mNextCurrInMemory; public: BucketLevel(uint32_t i); @@ -379,6 +380,16 @@ template class BucketLevel uint32_t currLedgerProtocol, std::shared_ptr snap, std::vector> const& shadows, bool countMergeEvents); + + // Special version of prepare for level 0 that does an in-memory merge if + // possible. Falls back to regular prepare if in-memory merge is not + // possible., such as for the HotArchiveBucketList, or in some edge cases + // on startup. + template + void prepareFirstLevel(Application& app, uint32_t currLedger, + uint32_t currLedgerProtocol, bool countMergeEvents, + bool doFsync, VectorT const&... inputVectors); + std::shared_ptr snap(); }; diff --git a/src/bucket/BucketMergeAdapter.h b/src/bucket/BucketMergeAdapter.h new file mode 100644 index 0000000000..0708db9749 --- /dev/null +++ b/src/bucket/BucketMergeAdapter.h @@ -0,0 +1,176 @@ +#pragma once + +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "bucket/BucketInputIterator.h" +#include "bucket/LedgerCmp.h" +#include + +namespace stellar +{ + +// These classes provide wrappers around the inputs to a BucketMerge, namely +// either BucketInputIterators for file based merges, or a vector of bucket +// entries for in-memory merges. +template class MergeInput +{ + public: + // Check if we're done - both iterators are exhausted + virtual bool isDone() const = 0; + + // Check if old entry should go first (old < new or new is exhausted) + virtual bool oldFirst() const = 0; + + // Check if new entry should go first (new < old or old is exhausted) + virtual bool newFirst() const = 0; + + // Check if old and new entries have equal keys and are both not exhausted + virtual bool equalKeys() const = 0; + + virtual typename BucketT::EntryT const& getOldEntry() = 0; + virtual typename BucketT::EntryT const& getNewEntry() = 0; + + // Advance iterators + virtual void advanceOld() = 0; + virtual void advanceNew() = 0; + + virtual ~MergeInput() = default; +}; + +template class FileMergeInput : public MergeInput +{ + private: + BucketInputIterator& mOldIter; + BucketInputIterator& mNewIter; + BucketEntryIdCmp mCmp; + + public: + FileMergeInput(BucketInputIterator& oldIter, + BucketInputIterator& newIter) + : mOldIter(oldIter), mNewIter(newIter) + { + } + + bool + isDone() const override + { + return !mOldIter && !mNewIter; + } + + bool + oldFirst() const override + { + return !mNewIter || (mOldIter && mCmp(*mOldIter, *mNewIter)); + } + + bool + newFirst() const override + { + return !mOldIter || (mNewIter && mCmp(*mNewIter, *mOldIter)); + } + + bool + equalKeys() const override + { + return mOldIter && mNewIter && !mCmp(*mOldIter, *mNewIter) && + !mCmp(*mNewIter, *mOldIter); + } + + typename BucketT::EntryT const& + getOldEntry() override + { + return *mOldIter; + } + + typename BucketT::EntryT const& + getNewEntry() override + { + return *mNewIter; + } + + void + advanceOld() override + { + ++mOldIter; + } + + void + advanceNew() override + { + ++mNewIter; + } +}; + +template class MemoryMergeInput : public MergeInput +{ + private: + std::vector const& mOldEntries; + std::vector const& mNewEntries; + BucketEntryIdCmp mCmp; + size_t mOldIdx = 0; + size_t mNewIdx = 0; + + public: + MemoryMergeInput(std::vector const& oldEntries, + std::vector const& newEntries) + : mOldEntries(oldEntries), mNewEntries(newEntries) + { + } + + bool + isDone() const override + { + return mOldIdx >= mOldEntries.size() && mNewIdx >= mNewEntries.size(); + } + + bool + oldFirst() const override + { + return mNewIdx >= mNewEntries.size() || + (mOldIdx < mOldEntries.size() && + mCmp(mOldEntries.at(mOldIdx), mNewEntries.at(mNewIdx))); + } + + bool + newFirst() const override + { + return mOldIdx >= mOldEntries.size() || + (mNewIdx < mNewEntries.size() && + mCmp(mNewEntries.at(mNewIdx), mOldEntries.at(mOldIdx))); + } + + bool + equalKeys() const override + { + return mOldIdx < mOldEntries.size() && mNewIdx < mNewEntries.size() && + !mCmp(mOldEntries.at(mOldIdx), mNewEntries.at(mNewIdx)) && + !mCmp(mNewEntries.at(mNewIdx), mOldEntries.at(mOldIdx)); + } + + typename BucketT::EntryT const& + getOldEntry() override + { + return mOldEntries.at(mOldIdx); + } + + typename BucketT::EntryT const& + getNewEntry() override + { + return mNewEntries.at(mNewIdx); + } + + void + advanceOld() override + { + ++mOldIdx; + } + + void + advanceNew() override + { + ++mNewIdx; + } +}; +} \ No newline at end of file diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp index c9d4271c47..2eb095d179 100644 --- a/src/bucket/BucketOutputIterator.cpp +++ b/src/bucket/BucketOutputIterator.cpp @@ -7,6 +7,7 @@ #include "bucket/BucketManager.h" #include "bucket/HotArchiveBucket.h" #include "bucket/LiveBucket.h" +#include "bucket/LiveBucketIndex.h" #include "ledger/LedgerTypeUtils.h" #include "util/GlobalChecks.h" #include "util/ProtocolVersion.h" @@ -165,8 +166,10 @@ BucketOutputIterator::put(typename BucketT::EntryT const& e) template std::shared_ptr -BucketOutputIterator::getBucket(BucketManager& bucketManager, - MergeKey* mergeKey) +BucketOutputIterator::getBucket( + BucketManager& bucketManager, MergeKey* mergeKey, + std::optional> inMemoryState, + bool shouldIndex) { ZoneScoped; if (mBuf) @@ -196,14 +199,37 @@ BucketOutputIterator::getBucket(BucketManager& bucketManager, // either it's a new bucket or we just reconstructed a bucket // we already have, in any case ensure we have an index if (auto b = bucketManager.getBucketIfExists(hash); - !b || !b->isIndexed()) + ((!b || !b->isIndexed()) && shouldIndex)) { - index = - createIndex(bucketManager, mFilename, hash, mCtx, nullptr); + // Create index using in-memory state instead of file IO if available + if constexpr (std::is_same_v) + { + if (inMemoryState) + { + index = std::make_unique( + bucketManager, *inMemoryState, mMeta); + } + } + + if (!index) + { + index = createIndex(bucketManager, mFilename, hash, mCtx, + nullptr); + } + } + + auto b = bucketManager.adoptFileAsBucket( + mFilename.string(), hash, mergeKey, std::move(index)); + + if constexpr (std::is_same_v) + { + if (inMemoryState) + { + b->setInMemoryEntries(std::move(*inMemoryState)); + } } - return bucketManager.adoptFileAsBucket(mFilename.string(), hash, - mergeKey, std::move(index)); + return b; } template class BucketOutputIterator; diff --git a/src/bucket/BucketOutputIterator.h b/src/bucket/BucketOutputIterator.h index f3baaf8a7f..c70e3739bc 100644 --- a/src/bucket/BucketOutputIterator.h +++ b/src/bucket/BucketOutputIterator.h @@ -52,7 +52,10 @@ template class BucketOutputIterator void put(typename BucketT::EntryT const& e); - std::shared_ptr getBucket(BucketManager& bucketManager, - MergeKey* mergeKey = nullptr); + std::shared_ptr getBucket( + BucketManager& bucketManager, MergeKey* mergeKey = nullptr, + std::optional> inMemoryState = + std::nullopt, + bool shouldIndex = true); }; } diff --git a/src/bucket/HotArchiveBucket.cpp b/src/bucket/HotArchiveBucket.cpp index c01d2eeeab..4b76c33b58 100644 --- a/src/bucket/HotArchiveBucket.cpp +++ b/src/bucket/HotArchiveBucket.cpp @@ -4,6 +4,7 @@ #include "bucket/HotArchiveBucket.h" #include "bucket/BucketInputIterator.h" +#include "bucket/BucketMergeAdapter.h" #include "bucket/BucketOutputIterator.h" #include "bucket/BucketUtils.h" @@ -83,26 +84,29 @@ HotArchiveBucket::convertToBucketEntry( void HotArchiveBucket::maybePut( - HotArchiveBucketOutputIterator& out, HotArchiveBucketEntry const& entry, + std::function putFunc, + HotArchiveBucketEntry const& entry, std::vector& shadowIterators, bool keepShadowedLifecycleEntries, MergeCounters& mc) { // Archived BucketList is only present after protocol 21, so shadows are // never supported - out.put(entry); + putFunc(entry); } +template void HotArchiveBucket::mergeCasesWithEqualKeys( - MergeCounters& mc, HotArchiveBucketInputIterator& oi, - HotArchiveBucketInputIterator& ni, HotArchiveBucketOutputIterator& out, + MergeCounters& mc, InputSource& inputSource, + std::function putFunc, std::vector& shadowIterators, uint32_t protocolVersion, bool keepShadowedLifecycleEntries) { + auto const& oldEntry = inputSource.getOldEntry(); + auto const& newEntry = inputSource.getNewEntry(); + // If two identical keys have the same type, throw an error. Otherwise, // take the newer key. - HotArchiveBucketEntry const& oldEntry = *oi; - HotArchiveBucketEntry const& newEntry = *ni; if (oldEntry.type() == newEntry.type()) { throw std::runtime_error( @@ -110,9 +114,9 @@ HotArchiveBucket::mergeCasesWithEqualKeys( "the same type."); } - out.put(newEntry); - ++ni; - ++oi; + putFunc(newEntry); + inputSource.advanceNew(); + inputSource.advanceOld(); } uint32_t @@ -146,4 +150,17 @@ HotArchiveBucket::bucketEntryToLoadResult( return isTombstoneEntry(*be) ? nullptr : be; } +template void +HotArchiveBucket::mergeCasesWithEqualKeys>( + MergeCounters& mc, FileMergeInput& inputSource, + std::function putFunc, + std::vector& shadowIterators, + uint32_t protocolVersion, bool keepShadowedLifecycleEntries); + +template void +HotArchiveBucket::mergeCasesWithEqualKeys>( + MergeCounters& mc, MemoryMergeInput& inputSource, + std::function putFunc, + std::vector& shadowIterators, + uint32_t protocolVersion, bool keepShadowedLifecycleEntries); } \ No newline at end of file diff --git a/src/bucket/HotArchiveBucket.h b/src/bucket/HotArchiveBucket.h index dd69393181..10576fd293 100644 --- a/src/bucket/HotArchiveBucket.h +++ b/src/bucket/HotArchiveBucket.h @@ -65,9 +65,9 @@ class HotArchiveBucket // Note: this functions is called maybePut for interoperability with // LiveBucket. This function always writes te given entry to the output - // iterator. + // iterator using putFunc. static void - maybePut(HotArchiveBucketOutputIterator& out, + maybePut(std::function putFunc, HotArchiveBucketEntry const& entry, std::vector& shadowIterators, bool keepShadowedLifecycleEntries, MergeCounters& mc); @@ -89,9 +89,10 @@ class HotArchiveBucket { } + template static void mergeCasesWithEqualKeys( - MergeCounters& mc, HotArchiveBucketInputIterator& oi, - HotArchiveBucketInputIterator& ni, HotArchiveBucketOutputIterator& out, + MergeCounters& mc, InputSource& inputSource, + std::function putFunc, std::vector& shadowIterators, uint32_t protocolVersion, bool keepShadowedLifecycleEntries); diff --git a/src/bucket/InMemoryIndex.cpp b/src/bucket/InMemoryIndex.cpp index 9b22e8c102..4fd4a2af6d 100644 --- a/src/bucket/InMemoryIndex.cpp +++ b/src/bucket/InMemoryIndex.cpp @@ -36,10 +36,87 @@ InMemoryBucketState::scan(IterT start, LedgerKey const& searchKey) const return {IndexReturnT(), mEntries.begin()}; } +InMemoryIndex::InMemoryIndex(BucketManager& bm, + std::vector const& inMemoryState, + BucketMetadata const& metadata) +{ + ZoneScoped; + + // 4 bytes of size info between BucketEntries on disk + constexpr std::streamoff xdrOverheadBetweenEntries = 4; + + // 4 bytes of BucketEntry overhead for METAENTRY + constexpr std::streamoff xdrOverheadForMetaEntry = 4; + + std::streamoff lastOffset = xdr::xdr_size(metadata) + + xdrOverheadForMetaEntry + + xdrOverheadBetweenEntries; + std::optional firstOffer; + std::optional lastOffer; + + for (auto const& be : inMemoryState) + { + releaseAssertOrThrow(be.type() != METAENTRY); + mCounters.template count(be); + + // Populate assetPoolIDMap + LedgerKey lk = getBucketLedgerKey(be); + if (be.type() == INITENTRY) + { + if (lk.type() == LIQUIDITY_POOL) + { + auto const& poolParams = be.liveEntry() + .data.liquidityPool() + .body.constantProduct() + .params; + mAssetPoolIDMap[poolParams.assetA].emplace_back( + lk.liquidityPool().liquidityPoolID); + mAssetPoolIDMap[poolParams.assetB].emplace_back( + lk.liquidityPool().liquidityPoolID); + } + } + + // Populate inMemoryState + mInMemoryState.insert(be); + + // Populate offerRange + if (!firstOffer && lk.type() == OFFER) + { + firstOffer = lastOffset; + } + if (!lastOffer && lk.type() > OFFER) + { + lastOffer = lastOffset; + } + + lastOffset += xdr::xdr_size(be) + xdrOverheadBetweenEntries; + } + + if (firstOffer) + { + if (lastOffer) + { + mOfferRange = {*firstOffer, *lastOffer}; + } + // If we didn't see any entries after offers, then the upper bound is + // EOF + else + { + mOfferRange = {*firstOffer, + std::numeric_limits::max()}; + } + } + else + { + mOfferRange = std::nullopt; + } +} + InMemoryIndex::InMemoryIndex(BucketManager const& bm, std::filesystem::path const& filename, SHA256* hasher) { + ZoneScoped; XDRInputFileStream in; in.open(filename.string()); BucketEntry be; @@ -120,4 +197,14 @@ InMemoryIndex::InMemoryIndex(BucketManager const& bm, mOfferRange = std::nullopt; } } + +#ifdef BUILD_TESTS +bool +InMemoryIndex::operator==(InMemoryIndex const& in) const +{ + return mInMemoryState == in.mInMemoryState && + mAssetPoolIDMap == in.mAssetPoolIDMap && + mOfferRange == in.mOfferRange && mCounters == in.mCounters; +} +#endif } \ No newline at end of file diff --git a/src/bucket/InMemoryIndex.h b/src/bucket/InMemoryIndex.h index 2eb6ce0d2a..8f36c9ca05 100644 --- a/src/bucket/InMemoryIndex.h +++ b/src/bucket/InMemoryIndex.h @@ -197,6 +197,10 @@ class InMemoryIndex InMemoryIndex(BucketManager const& bm, std::filesystem::path const& filename, SHA256* hasher); + InMemoryIndex(BucketManager& bm, + std::vector const& inMemoryState, + BucketMetadata const& metadata); + IterT begin() const { @@ -233,13 +237,7 @@ class InMemoryIndex } #ifdef BUILD_TESTS - bool - operator==(InMemoryIndex const& in) const - { - return mInMemoryState == in.mInMemoryState && - mAssetPoolIDMap == in.mAssetPoolIDMap && - mCounters == in.mCounters; - } + bool operator==(InMemoryIndex const& in) const; #endif }; } \ No newline at end of file diff --git a/src/bucket/LiveBucket.cpp b/src/bucket/LiveBucket.cpp index feef5698b1..185f455971 100644 --- a/src/bucket/LiveBucket.cpp +++ b/src/bucket/LiveBucket.cpp @@ -4,11 +4,12 @@ #include "bucket/LiveBucket.h" #include "bucket/BucketApplicator.h" +#include "bucket/BucketBase.h" #include "bucket/BucketInputIterator.h" +#include "bucket/BucketMergeAdapter.h" #include "bucket/BucketOutputIterator.h" #include "bucket/BucketUtils.h" #include "bucket/LedgerCmp.h" -#include "ledger/LedgerTypeUtils.h" #include namespace stellar @@ -76,7 +77,8 @@ LiveBucket::countOldEntryType(MergeCounters& mc, BucketEntry const& e) } void -LiveBucket::maybePut(LiveBucketOutputIterator& out, BucketEntry const& entry, +LiveBucket::maybePut(std::function putFunc, + BucketEntry const& entry, std::vector& shadowIterators, bool keepShadowedLifecycleEntries, MergeCounters& mc) { @@ -122,7 +124,7 @@ LiveBucket::maybePut(LiveBucketOutputIterator& out, BucketEntry const& entry, (entry.type() == INITENTRY || entry.type() == DEADENTRY)) { // Never shadow-out entries in this case; no point scanning shadows. - out.put(entry); + putFunc(entry); return; } @@ -146,13 +148,14 @@ LiveBucket::maybePut(LiveBucketOutputIterator& out, BucketEntry const& entry, } } // Nothing shadowed. - out.put(entry); + putFunc(entry); } +template void LiveBucket::mergeCasesWithEqualKeys( - MergeCounters& mc, LiveBucketInputIterator& oi, LiveBucketInputIterator& ni, - LiveBucketOutputIterator& out, + MergeCounters& mc, InputSource& inputSource, + std::function putFunc, std::vector& shadowIterators, uint32_t protocolVersion, bool keepShadowedLifecycleEntries) { @@ -219,8 +222,8 @@ LiveBucket::mergeCasesWithEqualKeys( // invariant is maintained for that newer entry too (it is still // preceded by a DEAD state). - BucketEntry const& oldEntry = *oi; - BucketEntry const& newEntry = *ni; + BucketEntry const& oldEntry = inputSource.getOldEntry(); + BucketEntry const& newEntry = inputSource.getNewEntry(); LiveBucket::checkProtocolLegality(oldEntry, protocolVersion); LiveBucket::checkProtocolLegality(newEntry, protocolVersion); countOldEntryType(mc, oldEntry); @@ -239,8 +242,8 @@ LiveBucket::mergeCasesWithEqualKeys( newLive.type(LIVEENTRY); newLive.liveEntry() = newEntry.liveEntry(); ++mc.mNewInitEntriesMergedWithOldDead; - maybePut(out, newLive, shadowIterators, keepShadowedLifecycleEntries, - mc); + maybePut(putFunc, newLive, shadowIterators, + keepShadowedLifecycleEntries, mc); } else if (oldEntry.type() == INITENTRY) { @@ -252,7 +255,7 @@ LiveBucket::mergeCasesWithEqualKeys( newInit.type(INITENTRY); newInit.liveEntry() = newEntry.liveEntry(); ++mc.mOldInitEntriesMergedWithNewLive; - maybePut(out, newInit, shadowIterators, + maybePut(putFunc, newInit, shadowIterators, keepShadowedLifecycleEntries, mc); } else @@ -265,11 +268,11 @@ LiveBucket::mergeCasesWithEqualKeys( { // Neither is in INIT state, take the newer one. ++mc.mNewEntriesMergedWithOldNeitherInit; - maybePut(out, newEntry, shadowIterators, keepShadowedLifecycleEntries, - mc); + maybePut(putFunc, newEntry, shadowIterators, + keepShadowedLifecycleEntries, mc); } - ++oi; - ++ni; + inputSource.advanceOld(); + inputSource.advanceNew(); } bool @@ -371,7 +374,8 @@ LiveBucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion, std::vector const& initEntries, std::vector const& liveEntries, std::vector const& deadEntries, - bool countMergeEvents, asio::io_context& ctx, bool doFsync) + bool countMergeEvents, asio::io_context& ctx, bool doFsync, + bool storeInMemory, bool shouldIndex) { ZoneScoped; // When building fresh buckets after protocol version 10 (i.e. version @@ -407,6 +411,12 @@ LiveBucket::fresh(BucketManager& bucketManager, uint32_t protocolVersion, bucketManager.incrMergeCounters(mc); } + if (storeInMemory) + { + return out.getBucket(bucketManager, nullptr, std::move(entries), + shouldIndex); + } + return out.getBucket(bucketManager); } @@ -433,6 +443,8 @@ LiveBucket::LiveBucket(std::string const& filename, Hash const& hash, LiveBucket::LiveBucket() : BucketBase() { + // Empty bucket is trivially stored in memory + mEntries = std::vector(); } uint32_t @@ -450,6 +462,71 @@ LiveBucket::maybeInitializeCache(size_t totalBucketListAccountsSizeBytes, mIndex->maybeInitializeCache(totalBucketListAccountsSizeBytes, cfg); } +std::shared_ptr +LiveBucket::mergeInMemory(BucketManager& bucketManager, + uint32_t maxProtocolVersion, + std::shared_ptr const& oldBucket, + std::shared_ptr const& newBucket, + bool countMergeEvents, asio::io_context& ctx, + bool doFsync) +{ + ZoneScoped; + releaseAssertOrThrow(oldBucket->hasInMemoryEntries()); + releaseAssertOrThrow(newBucket->hasInMemoryEntries()); + + auto const& oldEntries = oldBucket->getInMemoryEntries(); + auto const& newEntries = newBucket->getInMemoryEntries(); + + std::vector mergedEntries; + mergedEntries.reserve(oldEntries.size() + newEntries.size()); + + // Prepare metadata for the merged bucket + BucketMetadata meta; + meta.ledgerVersion = maxProtocolVersion; + if (protocolVersionStartsFrom( + maxProtocolVersion, + LiveBucket::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION)) + { + meta.ext.v(1); + meta.ext.bucketListType() = BucketListType::LIVE; + } + + // First level never has shadows + std::vector> shadowIterators{}; + MergeCounters mc; + updateMergeCountersForProtocolVersion(mc, maxProtocolVersion, + shadowIterators); + + MemoryMergeInput inputSource(oldEntries, newEntries); + std::function putFunc = + [&mergedEntries](BucketEntry const& entry) { + mergedEntries.emplace_back(entry); + }; + + mergeInternal(bucketManager, inputSource, putFunc, maxProtocolVersion, + shadowIterators, true, mc); + + if (countMergeEvents) + { + bucketManager.incrMergeCounters(mc); + } + + // Write merge output to a bucket and save to disk + LiveBucketOutputIterator out(bucketManager.getTmpDir(), + /*keepTombstoneEntries=*/true, meta, mc, ctx, + doFsync); + + for (auto const& e : mergedEntries) + { + out.put(e); + } + + // Store the merged entries in memory in the new bucket in case this is + // still a level 0 bucket. If the Bucket is level 1, this won't be used but + // will be GC'd quickly anyway + return out.getBucket(bucketManager, nullptr, std::move(mergedEntries)); +} + BucketEntryCounters const& LiveBucket::getBucketEntryCounters() const { @@ -470,4 +547,16 @@ LiveBucket::bucketEntryToLoadResult(std::shared_ptr const& be) ? nullptr : std::make_shared(be->liveEntry()); } + +template void LiveBucket::mergeCasesWithEqualKeys>( + MergeCounters& mc, FileMergeInput& inputSource, + std::function putFunc, + std::vector& shadowIterators, + uint32_t protocolVersion, bool keepShadowedLifecycleEntries); + +template void LiveBucket::mergeCasesWithEqualKeys>( + MergeCounters& mc, MemoryMergeInput& inputSource, + std::function putFunc, + std::vector& shadowIterators, + uint32_t protocolVersion, bool keepShadowedLifecycleEntries); } \ No newline at end of file diff --git a/src/bucket/LiveBucket.h b/src/bucket/LiveBucket.h index dfca865028..5325b7a33b 100644 --- a/src/bucket/LiveBucket.h +++ b/src/bucket/LiveBucket.h @@ -32,6 +32,12 @@ typedef BucketInputIterator LiveBucketInputIterator; class LiveBucket : public BucketBase, public std::enable_shared_from_this { + // Stores all BucketEntries (except METAENTRY) in the same order that they + // appear in the bucket file for level 0 entries. Because level 0 merges + // block the main thread when we write to the BucketList, we use the + // in-memory entries to produce the new bucket instead of file IO. + std::optional> mEntries{}; + public: // Entry type that this bucket stores using EntryT = BucketEntry; @@ -71,9 +77,10 @@ class LiveBucket : public BucketBase, std::vector const& liveEntries, std::vector const& deadEntries); + template static void mergeCasesWithEqualKeys( - MergeCounters& mc, LiveBucketInputIterator& oi, - LiveBucketInputIterator& ni, LiveBucketOutputIterator& out, + MergeCounters& mc, InputSource& inputSource, + std::function putFunc, std::vector& shadowIterators, uint32_t protocolVersion, bool keepShadowedLifecycleEntries); @@ -94,12 +101,14 @@ class LiveBucket : public BucketBase, // Create a fresh bucket from given vectors of init (created) and live // (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will // be sorted, hashed, and adopted in the provided BucketManager. + // If storeInMemory is true, populates mEntries. static std::shared_ptr fresh(BucketManager& bucketManager, uint32_t protocolVersion, std::vector const& initEntries, std::vector const& liveEntries, std::vector const& deadEntries, bool countMergeEvents, - asio::io_context& ctx, bool doFsync); + asio::io_context& ctx, bool doFsync, bool storeInMemory = false, + bool shouldIndex = true); // Returns true if the given BucketEntry should be dropped in the bottom // level bucket (i.e. DEADENTRY) @@ -111,16 +120,45 @@ class LiveBucket : public BucketBase, // Whenever a given BucketEntry is "eligible" to be written as the merge // result in the output bucket, this function writes the entry to the output // iterator if the entry is not shadowed. - static void maybePut(LiveBucketOutputIterator& out, + // putFunc will be called to actually write the result of maybePut. + static void maybePut(std::function putFunc, BucketEntry const& entry, std::vector& shadowIterators, bool keepShadowedLifecycleEntries, MergeCounters& mc); + // Merge two buckets in memory without using FutureBucket. + // This is used only for level 0 merges. Note that the resulting Bucket is + // still written to disk. + static std::shared_ptr + mergeInMemory(BucketManager& bucketManager, uint32_t maxProtocolVersion, + std::shared_ptr const& oldBucket, + std::shared_ptr const& newBucket, + bool countMergeEvents, asio::io_context& ctx, bool doFsync); + static void countOldEntryType(MergeCounters& mc, BucketEntry const& e); static void countNewEntryType(MergeCounters& mc, BucketEntry const& e); uint32_t getBucketVersion() const; + bool + hasInMemoryEntries() const + { + return mEntries.has_value(); + } + + void + setInMemoryEntries(std::vector&& entries) + { + mEntries = std::move(entries); + } + + std::vector const& + getInMemoryEntries() const + { + releaseAssertOrThrow(mEntries.has_value()); + return *mEntries; + } + // Initializes the random eviction cache if it has not already been // initialized. totalBucketListAccountsSizeBytes is the total size, in // bytes, of all BucketEntries in the BucketList that hold ACCOUNT entries, diff --git a/src/bucket/LiveBucketIndex.cpp b/src/bucket/LiveBucketIndex.cpp index fa0756cd65..f96e83a0ff 100644 --- a/src/bucket/LiveBucketIndex.cpp +++ b/src/bucket/LiveBucketIndex.cpp @@ -81,6 +81,16 @@ LiveBucketIndex::LiveBucketIndex(BucketManager const& bm, Archive& ar, releaseAssertOrThrow(pageSize != 0); } +LiveBucketIndex::LiveBucketIndex(BucketManager& bm, + std::vector const& inMemoryState, + BucketMetadata const& metadata) + : mInMemoryIndex( + std::make_unique(bm, inMemoryState, metadata)) + , mCacheHitMeter(bm.getCacheHitMeter()) + , mCacheMissMeter(bm.getCacheMissMeter()) +{ +} + void LiveBucketIndex::maybeInitializeCache(size_t totalBucketListAccountsSizeBytes, Config const& cfg) const diff --git a/src/bucket/LiveBucketIndex.h b/src/bucket/LiveBucketIndex.h index abef94058f..d8786ba37e 100644 --- a/src/bucket/LiveBucketIndex.h +++ b/src/bucket/LiveBucketIndex.h @@ -108,6 +108,11 @@ class LiveBucketIndex : public NonMovableOrCopyable LiveBucketIndex(BucketManager const& bm, Archive& ar, std::streamoff pageSize); + // Constructor for creating new index from in-memory state + LiveBucketIndex(BucketManager& bm, + std::vector const& inMemoryState, + BucketMetadata const& metadata); + // Initializes the random eviction cache if it has not already been // initialized. The random eviction cache itself has an entry limit, but we // expose a memory limit in the validator config. To account for this, we diff --git a/src/bucket/test/BucketIndexTests.cpp b/src/bucket/test/BucketIndexTests.cpp index 1207585d2a..a6e9dddc38 100644 --- a/src/bucket/test/BucketIndexTests.cpp +++ b/src/bucket/test/BucketIndexTests.cpp @@ -5,6 +5,7 @@ // This file contains tests for the BucketIndex and higher-level operations // concerning key-value lookup based on the BucketList. +#include "bucket/BucketIndexUtils.h" #include "bucket/BucketInputIterator.h" #include "bucket/BucketManager.h" #include "bucket/BucketSnapshotManager.h" @@ -271,6 +272,26 @@ class BucketIndexTest auto startingHitCount = hitMeter.count(); auto startingMissCount = missMeter.count(); + auto sumOfInMemoryEntries = 0; + auto& liveBL = getBM().getLiveBucketList(); + for (uint32_t i = 0; i < LiveBucketList::kNumLevels; ++i) + { + auto level = liveBL.getLevel(i); + auto curr = level.getCurr(); + auto snap = level.getSnap(); + if (curr->hasInMemoryEntries() && !curr->isEmpty()) + { + sumOfInMemoryEntries += + curr->getBucketEntryCounters().numEntries(); + } + + if (snap->hasInMemoryEntries() && !snap->isEmpty()) + { + sumOfInMemoryEntries += + snap->getBucketEntryCounters().numEntries(); + } + } + // Checks hit rate then sets startingHitCount and startingMissCount // to current values auto checkHitRate = [&](auto expectedHitRate, auto& startingHitCount, @@ -286,7 +307,10 @@ class BucketIndexTest REQUIRE(missMeter.count() == startingMissCount); // All point loads should be hits - REQUIRE(hitMeter.count() == startingHitCount + numLoads); + // in-memory entries do not hit the cache, so we subtract them + // from the total number of loads + REQUIRE(hitMeter.count() == + startingHitCount + numLoads - sumOfInMemoryEntries); } else { @@ -294,7 +318,7 @@ class BucketIndexTest auto newHits = hitMeter.count() - startingHitCount; REQUIRE(newMisses > 0); REQUIRE(newHits > 0); - REQUIRE(newMisses + newHits == numLoads); + REQUIRE(newMisses + newHits == numLoads - sumOfInMemoryEntries); auto hitRate = static_cast(newHits) / (newMisses + newHits); @@ -316,10 +340,11 @@ class BucketIndexTest if (expectedHitRate) { // We should have no cache hits since we're starting from an empty - // cache + // cache. In-memory entries are not counted in cache metrics. REQUIRE(hitMeter.count() == startingHitCount); - REQUIRE(missMeter.count() == - startingMissCount + mKeysToSearch.size()); + REQUIRE(missMeter.count() == startingMissCount + + mKeysToSearch.size() - + sumOfInMemoryEntries); startingHitCount = hitMeter.count(); startingMissCount = missMeter.count(); @@ -689,7 +714,7 @@ TEST_CASE("bl cache", "[bucket][bucketindex]") }; auto checkCompleteCacheSize = [](auto b) { - if (!b->isEmpty()) + if (!b->isEmpty() && !b->hasInMemoryEntries()) { auto cacheSize = b->getMaxCacheSize(); auto accountsInBucket = @@ -708,7 +733,7 @@ TEST_CASE("bl cache", "[bucket][bucketindex]") auto totalAccountCount = 0; auto checkPartialCacheSize = [&cachedAccountEntries, &totalAccountCount](auto b) { - if (!b->isEmpty()) + if (!b->isEmpty() && !b->hasInMemoryEntries()) { cachedAccountEntries += b->getMaxCacheSize(); totalAccountCount += b->getBucketEntryCounters().entryTypeCounts.at( @@ -865,6 +890,66 @@ TEST_CASE("bucket entry counters", "[bucket][bucketindex]") testAllIndexTypes(f); } +// Test that indexes created via an in-memory merge are identical to those +// created via a disk-based merge. Note that while both indexes are "in-memory" +// indexes, one is constructed from a file vs. a vector of BucketEntries +TEST_CASE("in-memory index construction", "[bucket][bucketindex]") +{ + auto test = [&](auto const& entries) { + VirtualClock clock; + Config cfg(getTestConfig(0, Config::TESTDB_BUCKET_DB_PERSISTENT)); + + // in-memory index types only + cfg.BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 0; + Application::pointer app = createTestApplication(clock, cfg); + + // Create a bucket with in-memory entries and manually index it, once + // using file IO and once with in-memory state + auto b = LiveBucket::fresh( + app->getBucketManager(), getAppLedgerVersion(app), {}, entries, {}, + /*countMergeEvents=*/true, clock.getIOContext(), + /*doFsync=*/true, /*storeInMemory=*/true, + /*shouldIndex=*/false); + + auto indexFromFile = createIndex( + app->getBucketManager(), b->getFilename(), b->getHash(), + clock.getIOContext(), nullptr); + + LiveBucketInputIterator iter(b); + auto indexFromMemory = std::make_unique( + app->getBucketManager(), b->getInMemoryEntries(), + iter.getMetadata()); + + REQUIRE(indexFromFile); + REQUIRE(indexFromMemory); + REQUIRE((*indexFromFile == *indexFromMemory)); + }; + + SECTION("no offers") + { + std::vector entries = + LedgerTestUtils::generateValidUniqueLedgerEntriesWithExclusions( + {CONFIG_SETTING, OFFER}, 100); + test(entries); + } + + SECTION("with offers at end of file") + { + std::vector entries = + LedgerTestUtils::generateValidUniqueLedgerEntriesWithTypes( + {ACCOUNT, OFFER}, 100); + test(entries); + } + + SECTION("with offers in middle of file") + { + std::vector entries = + LedgerTestUtils::generateValidUniqueLedgerEntriesWithTypes( + {ACCOUNT, OFFER, CONTRACT_DATA}, 100); + test(entries); + } +} + TEST_CASE("load from historical snapshots", "[bucket][bucketindex]") { auto f = [&](Config& cfg) { @@ -935,7 +1020,11 @@ TEST_CASE("serialize bucket indexes", "[bucket][bucketindex]") auto level = liveBL.getLevel(i); for (auto const& b : {level.getCurr(), level.getSnap()}) { - liveBuckets.emplace(b->getHash()); + // In memory bucket indexes are not saved to disk + if (!b->hasInMemoryEntries()) + { + liveBuckets.emplace(b->getHash()); + } } } diff --git a/src/bucket/test/BucketManagerTests.cpp b/src/bucket/test/BucketManagerTests.cpp index 5c22b3b997..8bbf60af09 100644 --- a/src/bucket/test/BucketManagerTests.cpp +++ b/src/bucket/test/BucketManagerTests.cpp @@ -234,7 +234,15 @@ TEST_CASE_VERSIONS("bucketmanager ownership", "[bucket][bucketmanager]") std::string indexFilename = app->getBucketManager().bucketIndexFilename(b->getHash()); CHECK(fs::exists(filename)); - CHECK(fs::exists(indexFilename)); + + if (b->getIndexForTesting().getPageSize() == 0) + { + CHECK(!fs::exists(indexFilename)); + } + else + { + CHECK(fs::exists(indexFilename)); + } b.reset(); app->getBucketManager().forgetUnreferencedBuckets( diff --git a/src/herder/test/UpgradesTests.cpp b/src/herder/test/UpgradesTests.cpp index b0f2b71ca3..84b7d6b2a6 100644 --- a/src/herder/test/UpgradesTests.cpp +++ b/src/herder/test/UpgradesTests.cpp @@ -2103,13 +2103,12 @@ TEST_CASE("upgrade to version 11", "[upgrades]") // Check several subtle characteristics of the post-upgrade // environment: // - Old-protocol merges stop happening (there should have - // been 6 before the upgrade, but we re-use a merge we did - // at ledger 1 for ledger 2 spill, so the counter is at 5) + // been 6 before the upgrade) // - New-protocol merges start happening. // - At the upgrade (5), we find 1 INITENTRY in lev[0].curr // - The next two (6, 7), propagate INITENTRYs to lev[0].snap // - From 8 on, the INITENTRYs propagate to lev[1].curr - REQUIRE(mc.mPreInitEntryProtocolMerges == 5); + REQUIRE(mc.mPreInitEntryProtocolMerges == 6); REQUIRE(mc.mPostInitEntryProtocolMerges != 0); auto& lev0 = bm.getLiveBucketList().getLevel(0); auto& lev1 = bm.getLiveBucketList().getLevel(1); @@ -2219,25 +2218,25 @@ TEST_CASE("upgrade to version 12", "[upgrades]") // One more old-style merge despite the upgrade // At ledger 8, level 2 spills, and starts an old-style // merge, as level 1 snap is still of old version - REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 7); break; case 7: REQUIRE(getVers(lev0Snap) == newProto); REQUIRE(getVers(lev1Curr) == oldProto); REQUIRE(mc.mPostShadowRemovalProtocolMerges == 4); - REQUIRE(mc.mPreShadowRemovalProtocolMerges == 5); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); break; case 6: REQUIRE(getVers(lev0Snap) == newProto); REQUIRE(getVers(lev1Curr) == oldProto); REQUIRE(mc.mPostShadowRemovalProtocolMerges == 3); - REQUIRE(mc.mPreShadowRemovalProtocolMerges == 5); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); break; case 5: REQUIRE(getVers(lev0Curr) == newProto); REQUIRE(getVers(lev0Snap) == oldProto); REQUIRE(mc.mPostShadowRemovalProtocolMerges == 1); - REQUIRE(mc.mPreShadowRemovalProtocolMerges == 5); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); break; default: break;