Skip to content

Level 0 merge #4697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release/v22.3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 133 additions & 84 deletions src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "bucket/BucketBase.h"
#include "bucket/BucketInputIterator.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketMergeAdapter.h"
#include "bucket/BucketOutputIterator.h"
#include "bucket/BucketUtils.h"
#include "bucket/HotArchiveBucket.h"
Expand Down Expand Up @@ -146,6 +147,49 @@ BucketBase<BucketT, IndexT>::randomBucketIndexName(std::string const& tmpDir)
return randomFileName(tmpDir, ".index");
}

template <class BucketT, class IndexT>
bool
BucketBase<BucketT, IndexT>::updateMergeCountersForProtocolVersion(
MergeCounters& mc, uint32_t protocolVersion,
std::vector<BucketInputIterator<BucketT>> const& shadowIterators)
{
// Don't count shadow metrics for Hot Archive BucketList
if constexpr (std::is_same_v<BucketT, HotArchiveBucket>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be calling this function on HotArchiveBuckets at all, right? (shadows were dropped in protocol 12). Could we enforce that? I realize this was here before, but since we're already making changes, this is a good opportunity to harden the code.

{
return true;
}

bool keepShadowedLifecycleEntries = true;

if (protocolVersionIsBefore(
protocolVersion,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
++mc.mPreInitEntryProtocolMerges;
keepShadowedLifecycleEntries = false;
}
else
{
++mc.mPostInitEntryProtocolMerges;
}

if (protocolVersionIsBefore(protocolVersion,
LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED))
{
++mc.mPreShadowRemovalProtocolMerges;
}
else
{
if (!shadowIterators.empty())
{
throw std::runtime_error("Shadows are not supported");
}
++mc.mPostShadowRemovalProtocolMerges;
}

return keepShadowedLifecycleEntries;
}

// The protocol used in a merge is the maximum of any of the protocols used in
// its input buckets, _including_ any of its shadows. We need to be strict about
// this for the same reason we change shadow algorithms along with merge
Expand Down Expand Up @@ -221,91 +265,101 @@ calculateMergeProtocolVersion(
// we switch shadowing-behaviour to a more conservative mode, in order to
// support annihilation of INITENTRY and DEADENTRY pairs. See commentary
// above in `maybePut`.
keepShadowedLifecycleEntries = true;

// Don't count shadow metrics for Hot Archive BucketList
if constexpr (std::is_same_v<BucketT, HotArchiveBucket>)
{
return;
}

if (protocolVersionIsBefore(
protocolVersion,
LiveBucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
++mc.mPreInitEntryProtocolMerges;
keepShadowedLifecycleEntries = false;
}
else
{
++mc.mPostInitEntryProtocolMerges;
}

if (protocolVersionIsBefore(protocolVersion,
LiveBucket::FIRST_PROTOCOL_SHADOWS_REMOVED))
{
++mc.mPreShadowRemovalProtocolMerges;
}
else
{
if (!shadowIterators.empty())
{
throw std::runtime_error("Shadows are not supported");
}
++mc.mPostShadowRemovalProtocolMerges;
}
keepShadowedLifecycleEntries =
BucketBase<BucketT, IndexT>::updateMergeCountersForProtocolVersion(
mc, protocolVersion, shadowIterators);
}

// There are 4 "easy" cases for merging: exhausted iterators on either
// side, or entries that compare non-equal. In all these cases we just
// take the lesser (or existing) entry and advance only one iterator,
// not scrutinizing the entry type further.
template <class BucketT, class IndexT>
template <class BucketT, class IndexT, typename InputSource>
static bool
mergeCasesWithDefaultAcceptance(
BucketEntryIdCmp<BucketT> const& cmp, MergeCounters& mc,
BucketInputIterator<BucketT>& oi, BucketInputIterator<BucketT>& ni,
BucketOutputIterator<BucketT>& out,
InputSource& inputSource,
std::function<void(typename BucketT::EntryT const&)> putFunc,
std::vector<BucketInputIterator<BucketT>>& shadowIterators,
uint32_t protocolVersion, bool keepShadowedLifecycleEntries)
{
BUCKET_TYPE_ASSERT(BucketT);

if (!ni || (oi && ni && cmp(*oi, *ni)))
// Either of:
//
// - Out of new entries.
// - Old entry has smaller key.
//
// In both cases: take old entry.
if (inputSource.oldFirst())
{
// Either of:
//
// - Out of new entries.
// - Old entry has smaller key.
//
// In both cases: take old entry.
// Take old entry
auto entry = inputSource.getOldEntry();
++mc.mOldEntriesDefaultAccepted;
BucketT::checkProtocolLegality(*oi, protocolVersion);
BucketT::countOldEntryType(mc, *oi);
BucketT::maybePut(out, *oi, shadowIterators,
BucketT::checkProtocolLegality(entry, protocolVersion);
BucketT::countOldEntryType(mc, entry);
BucketT::maybePut(putFunc, entry, shadowIterators,
keepShadowedLifecycleEntries, mc);
++oi;
inputSource.advanceOld();
return true;
}
else if (!oi || (oi && ni && cmp(*ni, *oi)))
// Either of:
//
// - Out of old entries.
// - New entry has smaller key.
//
// In both cases: take new entry.
else if (inputSource.newFirst())
{
// Either of:
//
// - Out of old entries.
// - New entry has smaller key.
//
// In both cases: take new entry.
auto entry = inputSource.getNewEntry();
++mc.mNewEntriesDefaultAccepted;
BucketT::checkProtocolLegality(*ni, protocolVersion);
BucketT::countNewEntryType(mc, *ni);
BucketT::maybePut(out, *ni, shadowIterators,
BucketT::checkProtocolLegality(entry, protocolVersion);
BucketT::countNewEntryType(mc, entry);
BucketT::maybePut(putFunc, entry, shadowIterators,
keepShadowedLifecycleEntries, mc);
++ni;
inputSource.advanceNew();
return true;
}
return false;
}

template <class BucketT, class IndexT>
template <typename InputSource, typename PutFuncT>
void
BucketBase<BucketT, IndexT>::mergeInternal(
BucketManager& bucketManager, InputSource& inputSource, PutFuncT putFunc,
uint32_t protocolVersion,
std::vector<BucketInputIterator<BucketT>>& shadowIterators,
bool keepShadowedLifecycleEntries, MergeCounters& mc)
{
BucketEntryIdCmp<BucketT> cmp;
size_t iter = 0;

while (!inputSource.isDone())
{
// Check if the merge should be stopped every few entries
if (++iter >= 1000)
{
iter = 0;
if (bucketManager.isShutdown())
{
// Stop merging, as BucketManager is now shutdown
throw std::runtime_error(
"Incomplete bucket merge due to BucketManager shutdown");
}
}

if (!mergeCasesWithDefaultAcceptance<BucketT, IndexT, InputSource>(
cmp, mc, inputSource, putFunc, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
{
BucketT::template mergeCasesWithEqualKeys<InputSource>(
mc, inputSource, putFunc, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);
}
}
}

template <class BucketT, class IndexT>
std::shared_ptr<BucketT>
BucketBase<BucketT, IndexT>::merge(
Expand Down Expand Up @@ -363,34 +417,15 @@ BucketBase<BucketT, IndexT>::merge(
keepTombstoneEntries, meta, mc, ctx,
doFsync);

BucketEntryIdCmp<BucketT> cmp;
size_t iter = 0;
FileMergeInput<BucketT> inputSource(oi, ni);
auto putFunc = [&out](typename BucketT::EntryT const& entry) {
out.put(entry);
};

while (oi || ni)
{
// Check if the merge should be stopped every few entries
if (++iter >= 1000)
{
iter = 0;
if (bucketManager.isShutdown())
{
// Stop merging, as BucketManager is now shutdown
// This is safe as temp file has not been adopted yet,
// so it will be removed with the tmp dir
throw std::runtime_error(
"Incomplete bucket merge due to BucketManager shutdown");
}
}
// Perform the merge
mergeInternal(bucketManager, inputSource, putFunc, protocolVersion,
shadowIterators, keepShadowedLifecycleEntries, mc);

if (!mergeCasesWithDefaultAcceptance<BucketT, IndexT>(
cmp, mc, oi, ni, out, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
{
BucketT::mergeCasesWithEqualKeys(mc, oi, ni, out, shadowIterators,
protocolVersion,
keepShadowedLifecycleEntries);
}
}
if (countMergeEvents)
{
bucketManager.incrMergeCounters(mc);
Expand All @@ -408,6 +443,20 @@ BucketBase<BucketT, IndexT>::merge(
return out.getBucket(bucketManager, &mk);
}

template void BucketBase<LiveBucket, LiveBucket::IndexT>::mergeInternal<
MemoryMergeInput<LiveBucket>, std::function<void(BucketEntry const&)>>(
BucketManager&, MemoryMergeInput<LiveBucket>&,
std::function<void(BucketEntry const&)>, uint32_t,
std::vector<BucketInputIterator<LiveBucket>>&, bool, MergeCounters&);

template void
BucketBase<HotArchiveBucket, HotArchiveBucket::IndexT>::mergeInternal<
MemoryMergeInput<HotArchiveBucket>,
std::function<void(HotArchiveBucketEntry const&)>>(
BucketManager&, MemoryMergeInput<HotArchiveBucket>&,
std::function<void(HotArchiveBucketEntry const&)>, uint32_t,
std::vector<BucketInputIterator<HotArchiveBucket>>&, bool, MergeCounters&);

template class BucketBase<LiveBucket, LiveBucket::IndexT>;
template class BucketBase<HotArchiveBucket, HotArchiveBucket::IndexT>;
}
19 changes: 18 additions & 1 deletion src/bucket/BucketBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
// under the apache license, version 2.0. see the copying file at the root
// of this distribution or at http://www.apache.org/licenses/license-2.0

#include "bucket/BucketInputIterator.h"
#include "bucket/BucketUtils.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-types.h"
#include <filesystem>
#include <optional>
#include <string>

namespace asio
Expand Down Expand Up @@ -130,6 +130,23 @@ class BucketBase : public NonMovableOrCopyable
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);

// Returns whether shadowed lifecycle entries should be kept
static bool updateMergeCountersForProtocolVersion(
MergeCounters& mc, uint32_t protocolVersion,
std::vector<BucketInputIterator<BucketT>> const& shadowIterators);

// Helper function that implements the core merge algorithm logic for both
// iterator based and in-memory merges.
// PutFunc will be called to write entries that are the result of the merge.
// We have to use a template here to break a dependency on the BucketT type,
// but PutFuncT == std::function<void(typename BucketT::EntryT const&)>
template <typename InputSource, typename PutFuncT>
static void
mergeInternal(BucketManager& bucketManager, InputSource& inputSource,
PutFuncT putFunc, uint32_t protocolVersion,
std::vector<BucketInputIterator<BucketT>>& shadowIterators,
bool keepShadowedLifecycleEntries, MergeCounters& mc);

static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

Expand Down
Loading