Skip to content

Level 0 merge #4697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release/v22.3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 46 additions & 33 deletions src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,49 @@ BucketBase<BucketT, IndexT>::randomBucketIndexName(std::string const& tmpDir)
return randomFileName(tmpDir, ".index");
}

template <class BucketT, class IndexT>
bool
BucketBase<BucketT, IndexT>::updateMergeCountersForProtocolVersion(
MergeCounters& mc, uint32_t protocolVersion,
std::vector<BucketInputIterator<BucketT>> const& shadowIterators)
{
// Don't count shadow metrics for Hot Archive BucketList
if constexpr (std::is_same_v<BucketT, HotArchiveBucket>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be calling this function on HotArchiveBuckets at all, right? (shadows were dropped in protocol 12). Could we enforce that? I realize this was here before, but since we're already making changes, this is a good opportunity to harden the code.

{
return true;
}

bool keepShadowedLifecycleEntries = true;

if (protocolVersionIsBefore(
protocolVersion,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
++mc.mPreInitEntryProtocolMerges;
keepShadowedLifecycleEntries = false;
}
else
{
++mc.mPostInitEntryProtocolMerges;
}

if (protocolVersionIsBefore(protocolVersion,
LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED))
{
++mc.mPreShadowRemovalProtocolMerges;
}
else
{
if (!shadowIterators.empty())
{
throw std::runtime_error("Shadows are not supported");
}
++mc.mPostShadowRemovalProtocolMerges;
}

return keepShadowedLifecycleEntries;
}

// The protocol used in a merge is the maximum of any of the protocols used in
// its input buckets, _including_ any of its shadows. We need to be strict about
// this for the same reason we change shadow algorithms along with merge
Expand Down Expand Up @@ -221,39 +264,9 @@ calculateMergeProtocolVersion(
// we switch shadowing-behaviour to a more conservative mode, in order to
// support annihilation of INITENTRY and DEADENTRY pairs. See commentary
// above in `maybePut`.
keepShadowedLifecycleEntries = true;

// Don't count shadow metrics for Hot Archive BucketList
if constexpr (std::is_same_v<BucketT, HotArchiveBucket>)
{
return;
}

if (protocolVersionIsBefore(
protocolVersion,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
++mc.mPreInitEntryProtocolMerges;
keepShadowedLifecycleEntries = false;
}
else
{
++mc.mPostInitEntryProtocolMerges;
}

if (protocolVersionIsBefore(protocolVersion,
LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED))
{
++mc.mPreShadowRemovalProtocolMerges;
}
else
{
if (!shadowIterators.empty())
{
throw std::runtime_error("Shadows are not supported");
}
++mc.mPostShadowRemovalProtocolMerges;
}
keepShadowedLifecycleEntries =
BucketBase<BucketT, IndexT>::updateMergeCountersForProtocolVersion(
mc, protocolVersion, shadowIterators);
}

// There are 4 "easy" cases for merging: exhausted iterators on either
Expand Down
7 changes: 6 additions & 1 deletion src/bucket/BucketBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
// under the apache license, version 2.0. see the copying file at the root
// of this distribution or at http://www.apache.org/licenses/license-2.0

#include "bucket/BucketInputIterator.h"
#include "bucket/BucketUtils.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-types.h"
#include <filesystem>
#include <optional>
#include <string>

namespace asio
Expand Down Expand Up @@ -130,6 +130,11 @@ class BucketBase : public NonMovableOrCopyable
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);

// Returns whether shadowed lifecycle entries should be kept
static bool updateMergeCountersForProtocolVersion(
MergeCounters& mc, uint32_t protocolVersion,
std::vector<BucketInputIterator<BucketT>> const& shadowIterators);

static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

Expand Down
83 changes: 76 additions & 7 deletions src/bucket/BucketListBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ BucketLevel<BucketT>::BucketLevel(uint32_t i)
: mLevel(i)
, mCurr(std::make_shared<BucketT>())
, mSnap(std::make_shared<BucketT>())
, mNextCurrInMemory(nullptr)
{
}

Expand Down Expand Up @@ -79,6 +80,7 @@ void
BucketLevel<BucketT>::setCurr(std::shared_ptr<BucketT> b)
{
mNextCurr.clear();
mNextCurrInMemory.reset();
mCurr = b;
}

Expand Down Expand Up @@ -127,13 +129,82 @@ template <typename BucketT>
void
BucketLevel<BucketT>::commit()
{
if (mNextCurr.isLive())
if (mNextCurrInMemory)
{
// If we have an in-memory merged result, use that
setCurr(mNextCurrInMemory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a level 0 assert here as well?

}
else if (mNextCurr.isLive())
{
// Otherwise use the async merge result
setCurr(mNextCurr.resolve());
}
releaseAssert(!mNextCurr.isMerging());
}

template <>
template <typename... VectorT>
void
BucketLevel<LiveBucket>::prepareFirstLevel(Application& app,
uint32_t currLedger,
uint32_t currLedgerProtocol,
bool countMergeEvents, bool doFsync,
VectorT const&... inputVectors)
{
ZoneScoped;

// This method should only be called on level 0
releaseAssert(mLevel == 0);

// We shouldn't have an in-progress merge
releaseAssert(!mNextCurr.isMerging());
releaseAssert(!mNextCurrInMemory);

auto curr =
BucketListBase<LiveBucket>::shouldMergeWithEmptyCurr(currLedger, mLevel)
? std::make_shared<LiveBucket>()
: mCurr;

// On startup, the current bucket may not be initialized in memory, so
// fallback to normal prepare
if (!curr->hasInMemoryEntries())
{
auto snap = LiveBucket::fresh(
app.getBucketManager(), currLedgerProtocol, inputVectors...,
countMergeEvents, app.getClock().getIOContext(), doFsync);
prepare(app, currLedger, currLedgerProtocol, snap, /*shadows=*/{},
countMergeEvents);
return;
}

auto snap = LiveBucket::fresh(
app.getBucketManager(), currLedgerProtocol, inputVectors...,
countMergeEvents, app.getClock().getIOContext(), doFsync,
/*storeInMemory=*/true, /*shouldIndex=*/false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldIndex is false here because indexing is redundant: we merge this newly created bucket right away and create a new (persistent) index anyway. Is that correct? I think a comment with some explanation would be nice here.


auto& bucketManager = app.getBucketManager();
auto& ctx = app.getClock().getIOContext();
mNextCurrInMemory =
LiveBucket::mergeInMemory(bucketManager, currLedgerProtocol, curr, snap,
countMergeEvents, ctx, doFsync);
}

template <>
template <typename... VectorT>
void
BucketLevel<HotArchiveBucket>::prepareFirstLevel(
Application& app, uint32_t currLedger, uint32_t currLedgerProtocol,
bool countMergeEvents, bool doFsync, VectorT const&... inputVectors)
{
// Hot Archive does not support in-memory merge, so we just use the
// normal prepare method
auto snap = HotArchiveBucket::fresh(
app.getBucketManager(), currLedgerProtocol, inputVectors...,
countMergeEvents, app.getClock().getIOContext(), doFsync);
prepare(app, currLedger, currLedgerProtocol, snap, /*shadows=*/{},
countMergeEvents);
}

// prepare builds a FutureBucket for the _next state_ of the current level,
// kicking off a merge that will finish sometime later.
//
Expand Down Expand Up @@ -179,6 +250,8 @@ BucketLevel<BucketT>::prepare(
// If more than one absorb is pending at the same time, we have a logic
// error in our caller (and all hell will break loose).
releaseAssert(!mNextCurr.isMerging());
releaseAssert(!mNextCurrInMemory);

auto curr =
BucketListBase<BucketT>::shouldMergeWithEmptyCurr(currLedger, mLevel)
? std::make_shared<BucketT>()
Expand Down Expand Up @@ -657,12 +730,8 @@ BucketListBase<BucketT>::addBatchInternal(Application& app, uint32_t currLedger,
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING;
bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC;
releaseAssert(shadows.size() == 0);
mLevels[0].prepare(app, currLedger, currLedgerProtocol,
BucketT::fresh(app.getBucketManager(),
currLedgerProtocol, inputVectors...,
countMergeEvents,
app.getClock().getIOContext(), doFsync),
shadows, countMergeEvents);
mLevels[0].prepareFirstLevel(app, currLedger, currLedgerProtocol,
countMergeEvents, doFsync, inputVectors...);
mLevels[0].commit();

// We almost always want to try to resolve completed merges to single
Expand Down
11 changes: 11 additions & 0 deletions src/bucket/BucketListBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ template <class BucketT> class BucketLevel
FutureBucket<BucketT> mNextCurr;
std::shared_ptr<BucketT> mCurr;
std::shared_ptr<BucketT> mSnap;
std::shared_ptr<BucketT> mNextCurrInMemory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use std::variant to represent "next curr" in a single variable? (it'd be safer and less error-prone)


public:
BucketLevel(uint32_t i);
Expand All @@ -379,6 +380,16 @@ template <class BucketT> class BucketLevel
uint32_t currLedgerProtocol, std::shared_ptr<BucketT> snap,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool countMergeEvents);

// Special version of prepare for level 0 that does an in-memory merge if
// possible. Falls back to regular prepare if in-memory merge is not
// possible., such as for the HotArchiveBucketList, or in some edge cases
// on startup.
template <typename... VectorT>
void prepareFirstLevel(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol, bool countMergeEvents,
bool doFsync, VectorT const&... inputVectors);

std::shared_ptr<BucketT> snap();
};

Expand Down
40 changes: 33 additions & 7 deletions src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "bucket/BucketManager.h"
#include "bucket/HotArchiveBucket.h"
#include "bucket/LiveBucket.h"
#include "bucket/LiveBucketIndex.h"
#include "ledger/LedgerTypeUtils.h"
#include "util/GlobalChecks.h"
#include "util/ProtocolVersion.h"
Expand Down Expand Up @@ -165,8 +166,10 @@ BucketOutputIterator<BucketT>::put(typename BucketT::EntryT const& e)

template <typename BucketT>
std::shared_ptr<BucketT>
BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
MergeKey* mergeKey)
BucketOutputIterator<BucketT>::getBucket(
BucketManager& bucketManager, MergeKey* mergeKey,
std::optional<std::vector<typename BucketT::EntryT>> inMemoryState,
bool shouldIndex)
{
ZoneScoped;
if (mBuf)
Expand Down Expand Up @@ -196,14 +199,37 @@ BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
// either it's a new bucket or we just reconstructed a bucket
// we already have, in any case ensure we have an index
if (auto b = bucketManager.getBucketIfExists<BucketT>(hash);
!b || !b->isIndexed())
((!b || !b->isIndexed()) && shouldIndex))
{
index =
createIndex<BucketT>(bucketManager, mFilename, hash, mCtx, nullptr);
// Create index using in-memory state instead of file IO if available
if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
if (inMemoryState)
{
index = std::make_unique<LiveBucketIndex>(
bucketManager, *inMemoryState, mMeta);
}
}

if (!index)
{
index = createIndex<BucketT>(bucketManager, mFilename, hash, mCtx,
nullptr);
}
}

auto b = bucketManager.adoptFileAsBucket<BucketT>(
mFilename.string(), hash, mergeKey, std::move(index));

if constexpr (std::is_same_v<BucketT, LiveBucket>)
{
if (inMemoryState)
{
b->setInMemoryEntries(std::move(*inMemoryState));
}
}

return bucketManager.adoptFileAsBucket<BucketT>(mFilename.string(), hash,
mergeKey, std::move(index));
return b;
}

template class BucketOutputIterator<LiveBucket>;
Expand Down
7 changes: 5 additions & 2 deletions src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ template <typename BucketT> class BucketOutputIterator

void put(typename BucketT::EntryT const& e);

std::shared_ptr<BucketT> getBucket(BucketManager& bucketManager,
MergeKey* mergeKey = nullptr);
std::shared_ptr<BucketT> getBucket(
BucketManager& bucketManager, MergeKey* mergeKey = nullptr,
std::optional<std::vector<typename BucketT::EntryT>> inMemoryState =
std::nullopt,
bool shouldIndex = true);
};
}
Loading