-
Notifications
You must be signed in to change notification settings - Fork 999
Level 0 merge #4697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release/v22.3.0
Are you sure you want to change the base?
Level 0 merge #4697
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ BucketLevel<BucketT>::BucketLevel(uint32_t i) | |
: mLevel(i) | ||
, mCurr(std::make_shared<BucketT>()) | ||
, mSnap(std::make_shared<BucketT>()) | ||
, mNextCurrInMemory(nullptr) | ||
{ | ||
} | ||
|
||
|
@@ -79,6 +80,7 @@ void | |
BucketLevel<BucketT>::setCurr(std::shared_ptr<BucketT> b) | ||
{ | ||
mNextCurr.clear(); | ||
mNextCurrInMemory.reset(); | ||
mCurr = b; | ||
} | ||
|
||
|
@@ -127,13 +129,82 @@ template <typename BucketT> | |
void | ||
BucketLevel<BucketT>::commit() | ||
{ | ||
if (mNextCurr.isLive()) | ||
if (mNextCurrInMemory) | ||
{ | ||
// If we have an in-memory merged result, use that | ||
setCurr(mNextCurrInMemory); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we add a level 0 assert here as well? |
||
} | ||
else if (mNextCurr.isLive()) | ||
{ | ||
// Otherwise use the async merge result | ||
setCurr(mNextCurr.resolve()); | ||
} | ||
releaseAssert(!mNextCurr.isMerging()); | ||
} | ||
|
||
template <> | ||
template <typename... VectorT> | ||
void | ||
BucketLevel<LiveBucket>::prepareFirstLevel(Application& app, | ||
uint32_t currLedger, | ||
uint32_t currLedgerProtocol, | ||
bool countMergeEvents, bool doFsync, | ||
VectorT const&... inputVectors) | ||
{ | ||
ZoneScoped; | ||
|
||
// This method should only be called on level 0 | ||
releaseAssert(mLevel == 0); | ||
|
||
// We shouldn't have an in-progress merge | ||
releaseAssert(!mNextCurr.isMerging()); | ||
releaseAssert(!mNextCurrInMemory); | ||
|
||
auto curr = | ||
BucketListBase<LiveBucket>::shouldMergeWithEmptyCurr(currLedger, mLevel) | ||
? std::make_shared<LiveBucket>() | ||
: mCurr; | ||
|
||
// On startup, the current bucket may not be initialized in memory, so | ||
// fallback to normal prepare | ||
if (!curr->hasInMemoryEntries()) | ||
{ | ||
auto snap = LiveBucket::fresh( | ||
app.getBucketManager(), currLedgerProtocol, inputVectors..., | ||
countMergeEvents, app.getClock().getIOContext(), doFsync); | ||
prepare(app, currLedger, currLedgerProtocol, snap, /*shadows=*/{}, | ||
countMergeEvents); | ||
return; | ||
} | ||
|
||
auto snap = LiveBucket::fresh( | ||
app.getBucketManager(), currLedgerProtocol, inputVectors..., | ||
countMergeEvents, app.getClock().getIOContext(), doFsync, | ||
/*storeInMemory=*/true, /*shouldIndex=*/false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
auto& bucketManager = app.getBucketManager(); | ||
auto& ctx = app.getClock().getIOContext(); | ||
mNextCurrInMemory = | ||
LiveBucket::mergeInMemory(bucketManager, currLedgerProtocol, curr, snap, | ||
countMergeEvents, ctx, doFsync); | ||
} | ||
|
||
template <> | ||
template <typename... VectorT> | ||
void | ||
BucketLevel<HotArchiveBucket>::prepareFirstLevel( | ||
Application& app, uint32_t currLedger, uint32_t currLedgerProtocol, | ||
bool countMergeEvents, bool doFsync, VectorT const&... inputVectors) | ||
{ | ||
// Hot Archive does not support in-memory merge, so we just use the | ||
// normal prepare method | ||
auto snap = HotArchiveBucket::fresh( | ||
app.getBucketManager(), currLedgerProtocol, inputVectors..., | ||
countMergeEvents, app.getClock().getIOContext(), doFsync); | ||
prepare(app, currLedger, currLedgerProtocol, snap, /*shadows=*/{}, | ||
countMergeEvents); | ||
} | ||
|
||
// prepare builds a FutureBucket for the _next state_ of the current level, | ||
// kicking off a merge that will finish sometime later. | ||
// | ||
|
@@ -179,6 +250,8 @@ BucketLevel<BucketT>::prepare( | |
// If more than one absorb is pending at the same time, we have a logic | ||
// error in our caller (and all hell will break loose). | ||
releaseAssert(!mNextCurr.isMerging()); | ||
releaseAssert(!mNextCurrInMemory); | ||
|
||
auto curr = | ||
BucketListBase<BucketT>::shouldMergeWithEmptyCurr(currLedger, mLevel) | ||
? std::make_shared<BucketT>() | ||
|
@@ -657,12 +730,8 @@ BucketListBase<BucketT>::addBatchInternal(Application& app, uint32_t currLedger, | |
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING; | ||
bool doFsync = !app.getConfig().DISABLE_XDR_FSYNC; | ||
releaseAssert(shadows.size() == 0); | ||
mLevels[0].prepare(app, currLedger, currLedgerProtocol, | ||
BucketT::fresh(app.getBucketManager(), | ||
currLedgerProtocol, inputVectors..., | ||
countMergeEvents, | ||
app.getClock().getIOContext(), doFsync), | ||
shadows, countMergeEvents); | ||
mLevels[0].prepareFirstLevel(app, currLedger, currLedgerProtocol, | ||
countMergeEvents, doFsync, inputVectors...); | ||
mLevels[0].commit(); | ||
|
||
// We almost always want to try to resolve completed merges to single | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -363,6 +363,7 @@ template <class BucketT> class BucketLevel | |
FutureBucket<BucketT> mNextCurr; | ||
std::shared_ptr<BucketT> mCurr; | ||
std::shared_ptr<BucketT> mSnap; | ||
std::shared_ptr<BucketT> mNextCurrInMemory; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use std::variant to represent "next curr" in a single variable? (it'd be safer and less error-prone) |
||
|
||
public: | ||
BucketLevel(uint32_t i); | ||
|
@@ -379,6 +380,16 @@ template <class BucketT> class BucketLevel | |
uint32_t currLedgerProtocol, std::shared_ptr<BucketT> snap, | ||
std::vector<std::shared_ptr<BucketT>> const& shadows, | ||
bool countMergeEvents); | ||
|
||
// Special version of prepare for level 0 that does an in-memory merge if | ||
// possible. Falls back to regular prepare if in-memory merge is not | ||
// possible., such as for the HotArchiveBucketList, or in some edge cases | ||
// on startup. | ||
template <typename... VectorT> | ||
void prepareFirstLevel(Application& app, uint32_t currLedger, | ||
uint32_t currLedgerProtocol, bool countMergeEvents, | ||
bool doFsync, VectorT const&... inputVectors); | ||
|
||
std::shared_ptr<BucketT> snap(); | ||
}; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be calling this function on HotArchiveBuckets at all, right? (shadows were dropped in protocol 12). Could we enforce that? I realize this was here before, but since we're already making changes, this is a good opportunity to harden the code.