Skip to content

Commit cf792e5

Browse files
committed
WIP: Background tryAdd functionality in TransactionQueue
This is a *draft* change that will resolve stellar#4316 when it is complete. The change makes `TransactionQueue` thread safe and runs the `tryAdd` function in the background when the feature is enabled. The implementation closely follows the [design document](https://docs.google.com/document/d/1pU__XfEp-rR-17TNsuj-VhY6JfyendaFSYLTiq6tIj4/edit?usp=sharing) I wrote. The implementation still requires the main thread to re-broadcast the transactions (for now). I've opened this PR for visibility / early feedback on the implementation. This change is very much a work in progress, with the following tasks remaining: * [ ] Fix catchup. I seem to have broken catchup in rebasing these changes on master. I need to figure out what is going on there and fix it. * [ ] Fix failing tests. These are failing because they don't update `TransactionQueue`s new snapshots correctly. * [ ] Rigorous testing, both for correctness and performance. * [ ] I'd like to take a look at pushing the cut-point out a bit to enable flooding in the background as well. If this is a relatively simple change, I'd like to roll it into this PR. If it looks hairy, then I'll leave it for a separate change later.
1 parent 134fcfd commit cf792e5

37 files changed

+744
-308
lines changed

src/herder/HerderImpl.cpp

+63-31
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
8383
}
8484

8585
HerderImpl::HerderImpl(Application& app)
86-
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
87-
TRANSACTION_QUEUE_BAN_LEDGERS,
88-
TRANSACTION_QUEUE_SIZE_MULTIPLIER)
89-
, mPendingEnvelopes(app, *this)
86+
: mPendingEnvelopes(app, *this)
9087
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
9188
, mLastSlotSaved(0)
9289
, mTrackingTimer(app)
@@ -275,7 +272,10 @@ HerderImpl::shutdown()
275272
"Shutdown interrupting quorum transitive closure analysis.");
276273
mLastQuorumMapIntersectionState.mInterruptFlag = true;
277274
}
278-
mTransactionQueue.shutdown();
275+
if (mTransactionQueue)
276+
{
277+
mTransactionQueue->shutdown();
278+
}
279279
if (mSorobanTransactionQueue)
280280
{
281281
mSorobanTransactionQueue->shutdown();
@@ -603,7 +603,7 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
603603
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
604604
!tx->isSoroban();
605605
bool hasClassic =
606-
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
606+
mTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
607607
tx->isSoroban();
608608
if (hasSoroban || hasClassic)
609609
{
@@ -617,11 +617,31 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
617617
}
618618
else if (!tx->isSoroban())
619619
{
620-
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
620+
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
621+
{
622+
mApp.postOnOverlayThread(
623+
[this, tx]() { mTransactionQueue->tryAdd(tx, false); },
624+
"try add tx");
625+
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
626+
}
627+
else
628+
{
629+
result = mTransactionQueue->tryAdd(tx, submittedFromSelf);
630+
}
621631
}
622632
else if (mSorobanTransactionQueue)
623633
{
624-
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
634+
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
635+
{
636+
mApp.postOnOverlayThread(
637+
[this, tx]() { mSorobanTransactionQueue->tryAdd(tx, false); },
638+
"try add tx");
639+
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
640+
}
641+
else
642+
{
643+
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
644+
}
625645
}
626646
else
627647
{
@@ -923,7 +943,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
923943
bool
924944
HerderImpl::sourceAccountPending(AccountID const& accountID) const
925945
{
926-
bool accPending = mTransactionQueue.sourceAccountPending(accountID);
946+
bool accPending = mTransactionQueue->sourceAccountPending(accountID);
927947
if (mSorobanTransactionQueue)
928948
{
929949
accPending = accPending ||
@@ -1092,7 +1112,7 @@ HerderImpl::getPendingEnvelopes()
10921112
ClassicTransactionQueue&
10931113
HerderImpl::getTransactionQueue()
10941114
{
1095-
return mTransactionQueue;
1115+
return *mTransactionQueue;
10961116
}
10971117
SorobanTransactionQueue&
10981118
HerderImpl::getSorobanTransactionQueue()
@@ -1391,7 +1411,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
13911411
// it's guaranteed to be up-to-date
13921412
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
13931413
PerPhaseTransactionList txPhases;
1394-
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header));
1414+
txPhases.emplace_back(mTransactionQueue->getTransactions(lcl.header));
13951415

13961416
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
13971417
SOROBAN_PROTOCOL_VERSION))
@@ -1470,7 +1490,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
14701490
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]);
14711491
}
14721492

1473-
mTransactionQueue.ban(
1493+
mTransactionQueue->ban(
14741494
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]);
14751495

14761496
auto txSetHash = proposedSet->getContentsHash();
@@ -2172,9 +2192,11 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
21722192
{
21732193
if (!mSorobanTransactionQueue)
21742194
{
2195+
releaseAssert(mTxQueueBucketSnapshot);
21752196
mSorobanTransactionQueue =
21762197
std::make_unique<SorobanTransactionQueue>(
2177-
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
2198+
mApp, mTxQueueBucketSnapshot,
2199+
TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
21782200
TRANSACTION_QUEUE_BAN_LEDGERS,
21792201
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
21802202
}
@@ -2189,6 +2211,15 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
21892211
void
21902212
HerderImpl::start()
21912213
{
2214+
releaseAssert(!mTxQueueBucketSnapshot);
2215+
mTxQueueBucketSnapshot = mApp.getBucketManager()
2216+
.getBucketSnapshotManager()
2217+
.copySearchableLiveBucketListSnapshot();
2218+
releaseAssert(!mTransactionQueue);
2219+
mTransactionQueue = std::make_unique<ClassicTransactionQueue>(
2220+
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
2221+
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER);
2222+
21922223
mMaxTxSize = mApp.getHerder().getMaxClassicTxSize();
21932224
{
21942225
uint32_t version = mApp.getLedgerManager()
@@ -2333,23 +2364,23 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)
23332364

23342365
auto lhhe = mLedgerManager.getLastClosedLedgerHeader();
23352366

2336-
auto updateQueue = [&](auto& queue, auto const& applied) {
2337-
queue.removeApplied(applied);
2338-
queue.shift();
2339-
2340-
auto txs = queue.getTransactions(lhhe.header);
2341-
2342-
auto invalidTxs = TxSetUtils::getInvalidTxList(
2367+
auto filterInvalidTxs = [&](TxFrameList const& txs) {
2368+
return TxSetUtils::getInvalidTxList(
23432369
txs, mApp, 0,
2344-
getUpperBoundCloseTimeOffset(mApp, lhhe.header.scpValue.closeTime));
2345-
queue.ban(invalidTxs);
2346-
2347-
queue.rebroadcast();
2370+
getUpperBoundCloseTimeOffset(mApp.getAppConnector(),
2371+
lhhe.header.scpValue.closeTime));
23482372
};
2373+
// Update bucket list snapshot, if needed. Note that this modifies the
2374+
// pointer itself on update, so we need to pass the potentially new pointer
2375+
// to the tx queues.
2376+
mApp.getBucketManager()
2377+
.getBucketSnapshotManager()
2378+
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot);
23492379
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC))
23502380
{
2351-
updateQueue(mTransactionQueue,
2352-
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)]);
2381+
mTransactionQueue->update(
2382+
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header,
2383+
mTxQueueBucketSnapshot, filterInvalidTxs);
23532384
}
23542385

23552386
// Even if we're in protocol 20, still check for number of phases, in case
@@ -2358,8 +2389,9 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)
23582389
if (mSorobanTransactionQueue != nullptr &&
23592390
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN))
23602391
{
2361-
updateQueue(*mSorobanTransactionQueue,
2362-
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)]);
2392+
mSorobanTransactionQueue->update(
2393+
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header,
2394+
mTxQueueBucketSnapshot, filterInvalidTxs);
23632395
}
23642396
}
23652397

@@ -2476,7 +2508,7 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt,
24762508
size_t
24772509
HerderImpl::getMaxQueueSizeOps() const
24782510
{
2479-
return mTransactionQueue.getMaxQueueSizeOps();
2511+
return mTransactionQueue->getMaxQueueSizeOps();
24802512
}
24812513

24822514
size_t
@@ -2490,7 +2522,7 @@ HerderImpl::getMaxQueueSizeSorobanOps() const
24902522
bool
24912523
HerderImpl::isBannedTx(Hash const& hash) const
24922524
{
2493-
auto banned = mTransactionQueue.isBanned(hash);
2525+
auto banned = mTransactionQueue->isBanned(hash);
24942526
if (mSorobanTransactionQueue)
24952527
{
24962528
banned = banned || mSorobanTransactionQueue->isBanned(hash);
@@ -2501,7 +2533,7 @@ HerderImpl::isBannedTx(Hash const& hash) const
25012533
TransactionFrameBaseConstPtr
25022534
HerderImpl::getTx(Hash const& hash) const
25032535
{
2504-
auto classic = mTransactionQueue.getTx(hash);
2536+
auto classic = mTransactionQueue->getTx(hash);
25052537
if (!classic && mSorobanTransactionQueue)
25062538
{
25072539
return mSorobanTransactionQueue->getTx(hash);

src/herder/HerderImpl.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class HerderImpl : public Herder
248248
void purgeOldPersistedTxSets();
249249
void writeDebugTxSet(LedgerCloseData const& lcd);
250250

251-
ClassicTransactionQueue mTransactionQueue;
251+
std::unique_ptr<ClassicTransactionQueue> mTransactionQueue;
252252
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;
253253

254254
void updateTransactionQueue(TxSetXDRFrameConstPtr txSet);
@@ -301,6 +301,9 @@ class HerderImpl : public Herder
301301
Application& mApp;
302302
LedgerManager& mLedgerManager;
303303

304+
// Bucket list snapshot to use for transaction queues
305+
SearchableSnapshotConstPtr mTxQueueBucketSnapshot;
306+
304307
struct SCPMetrics
305308
{
306309
medida::Meter& mLostSync;

0 commit comments

Comments
 (0)