Skip to content

Commit 4820f7f

Browse files
committed
Push cut point out through receiving messages
Push cut point out a tiny bit todo Identify where to push back next Atomic ledger state Refactor out tx metrics from queue adding In preparation for moving the cut point Consistency Getting close to having this all pushed out Just need to wire up the final bit Remove old post to bg thread Indirection around tx queue storage Feed tx queues to peer Wire up background flow Debug output Fix tracy build More debug info around broadcasting Maybe fix deadlock Fix deadlock with TCPPeer Reduce main thread posts on broadcast Add thread Though it's not being used anywhere Actually run on new thread Another memory leak fix Fix `maybeExecuteInBackground` to check thread type Save main thread type
1 parent 18e03a9 commit 4820f7f

22 files changed

+675
-270
lines changed

src/herder/Herder.h

+8
Original file line numberDiff line numberDiff line change
@@ -225,5 +225,13 @@ class Herder
225225
virtual TransactionFrameBaseConstPtr getTx(Hash const& hash) const = 0;
226226

227227
virtual void beginApply() = 0;
228+
229+
// TODO: Docs
230+
virtual TransactionQueuesPtr getTransactionQueues() const = 0;
231+
232+
// TODO: Docs
233+
static TransactionQueue::AddResult
234+
recvTransaction(TransactionQueuesPtr txQueues, TransactionFrameBasePtr tx,
235+
bool submittedFromSelf);
228236
};
229237
}

src/herder/HerderImpl.cpp

+64-79
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,7 @@ HerderImpl::shutdown()
272272
"Shutdown interrupting quorum transitive closure analysis.");
273273
mLastQuorumMapIntersectionState.mInterruptFlag = true;
274274
}
275-
if (mTransactionQueue)
276-
{
277-
mTransactionQueue->shutdown();
278-
}
279-
if (mSorobanTransactionQueue)
280-
{
281-
mSorobanTransactionQueue->shutdown();
282-
}
275+
mTransactionQueues->shutdown();
283276

284277
mTxSetGarbageCollectTimer.cancel();
285278
}
@@ -589,22 +582,35 @@ HerderImpl::emitEnvelope(SCPEnvelope const& envelope)
589582
broadcast(envelope);
590583
}
591584

585+
// TODO: Move to Herder.cpp?
592586
TransactionQueue::AddResult
593-
HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
587+
Herder::recvTransaction(TransactionQueuesPtr txQueues,
588+
TransactionFrameBasePtr tx, bool submittedFromSelf)
594589
{
595590
ZoneScoped;
591+
ClassicTransactionQueue& classicTxQueue =
592+
txQueues->getClassicTransactionQueue();
596593
TransactionQueue::AddResult result(
597594
TransactionQueue::AddResultCode::ADD_STATUS_COUNT);
598595

599596
// Allow txs of the same kind to reach the tx queue in case it can be
600597
// replaced by fee
598+
// TODO: Is there a potential TOCTOU issue here as sourceAccountPending
599+
// could change before adding? I think no because the other competing thread
600+
// would be whatever is handling ledger close. However, that will only
601+
// decrease the sourceAccountPending value, which means this erroneously
602+
// rejects (which is safe). I guess it's possible for a user-submitted
603+
// transaction to come in and conflict with the overlay thread, but that
604+
// would require them to be simultaneously running two clients and
605+
// submitting from both of them. Still, it might be safest to use some kind
606+
// of atomic function that handles both this check AND the add.
601607
bool hasSoroban =
602-
mSorobanTransactionQueue &&
603-
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
608+
txQueues->hasSorobanTransactionQueue() &&
609+
txQueues->getSorobanTransactionQueue().sourceAccountPending(
610+
tx->getSourceID()) &&
604611
!tx->isSoroban();
605-
bool hasClassic =
606-
mTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
607-
tx->isSoroban();
612+
bool hasClassic = classicTxQueue.sourceAccountPending(tx->getSourceID()) &&
613+
tx->isSoroban();
608614
if (hasSoroban || hasClassic)
609615
{
610616
CLOG_DEBUG(Herder,
@@ -617,31 +623,12 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
617623
}
618624
else if (!tx->isSoroban())
619625
{
620-
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
621-
{
622-
mApp.postOnOverlayThread(
623-
[this, tx]() { mTransactionQueue->tryAdd(tx, false); },
624-
"try add tx");
625-
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
626-
}
627-
else
628-
{
629-
result = mTransactionQueue->tryAdd(tx, submittedFromSelf);
630-
}
626+
result = classicTxQueue.tryAdd(tx, submittedFromSelf);
631627
}
632-
else if (mSorobanTransactionQueue)
628+
else if (txQueues->hasSorobanTransactionQueue())
633629
{
634-
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
635-
{
636-
mApp.postOnOverlayThread(
637-
[this, tx]() { mSorobanTransactionQueue->tryAdd(tx, false); },
638-
"try add tx");
639-
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
640-
}
641-
else
642-
{
643-
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
644-
}
630+
result = txQueues->getSorobanTransactionQueue().tryAdd(
631+
tx, submittedFromSelf);
645632
}
646633
else
647634
{
@@ -661,6 +648,13 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
661648
return result;
662649
}
663650

651+
TransactionQueue::AddResult
652+
HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
653+
{
654+
ZoneScoped;
655+
return Herder::recvTransaction(mTransactionQueues, tx, submittedFromSelf);
656+
}
657+
664658
bool
665659
HerderImpl::checkCloseTime(SCPEnvelope const& envelope, bool enforceRecent)
666660
{
@@ -943,13 +937,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
943937
bool
944938
HerderImpl::sourceAccountPending(AccountID const& accountID) const
945939
{
946-
bool accPending = mTransactionQueue->sourceAccountPending(accountID);
947-
if (mSorobanTransactionQueue)
948-
{
949-
accPending = accPending ||
950-
mSorobanTransactionQueue->sourceAccountPending(accountID);
951-
}
952-
return accPending;
940+
return mTransactionQueues->sourceAccountPending(accountID);
953941
}
954942

955943
#endif
@@ -1112,13 +1100,12 @@ HerderImpl::getPendingEnvelopes()
11121100
ClassicTransactionQueue&
11131101
HerderImpl::getTransactionQueue()
11141102
{
1115-
return *mTransactionQueue;
1103+
return mTransactionQueues->getClassicTransactionQueue();
11161104
}
11171105
SorobanTransactionQueue&
11181106
HerderImpl::getSorobanTransactionQueue()
11191107
{
1120-
releaseAssert(mSorobanTransactionQueue);
1121-
return *mSorobanTransactionQueue;
1108+
return mTransactionQueues->getSorobanTransactionQueue();
11221109
}
11231110
#endif
11241111

@@ -1411,14 +1398,16 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
14111398
// it's guaranteed to be up-to-date
14121399
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
14131400
PerPhaseTransactionList txPhases;
1414-
txPhases.emplace_back(mTransactionQueue->getTransactions(lcl.header));
1401+
txPhases.emplace_back(
1402+
mTransactionQueues->getClassicTransactionQueue().getTransactions(
1403+
lcl.header));
14151404

14161405
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
14171406
SOROBAN_PROTOCOL_VERSION))
14181407
{
1419-
releaseAssert(mSorobanTransactionQueue);
14201408
txPhases.emplace_back(
1421-
mSorobanTransactionQueue->getTransactions(lcl.header));
1409+
mTransactionQueues->getSorobanTransactionQueue().getTransactions(
1410+
lcl.header));
14221411
}
14231412

14241413
// We pick as next close time the current time unless it's before the last
@@ -1485,12 +1474,11 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
14851474
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
14861475
SOROBAN_PROTOCOL_VERSION))
14871476
{
1488-
releaseAssert(mSorobanTransactionQueue);
1489-
mSorobanTransactionQueue->ban(
1477+
mTransactionQueues->getSorobanTransactionQueue().ban(
14901478
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]);
14911479
}
14921480

1493-
mTransactionQueue->ban(
1481+
mTransactionQueues->getClassicTransactionQueue().ban(
14941482
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]);
14951483

14961484
auto txSetHash = proposedSet->getContentsHash();
@@ -2190,18 +2178,18 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
21902178
{
21912179
if (protocolVersionStartsFrom(protocolVersion, SOROBAN_PROTOCOL_VERSION))
21922180
{
2193-
if (!mSorobanTransactionQueue)
2181+
if (!mTransactionQueues->hasSorobanTransactionQueue())
21942182
{
21952183
releaseAssert(mTxQueueBucketSnapshot);
2196-
mSorobanTransactionQueue =
2184+
mTransactionQueues->setSorobanTransactionQueue(
21972185
std::make_unique<SorobanTransactionQueue>(
21982186
mApp, mTxQueueBucketSnapshot,
21992187
TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
22002188
TRANSACTION_QUEUE_BAN_LEDGERS,
2201-
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
2189+
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER));
22022190
}
22032191
}
2204-
else if (mSorobanTransactionQueue)
2192+
else if (mTransactionQueues->hasSorobanTransactionQueue())
22052193
{
22062194
throw std::runtime_error(
22072195
"Invalid state: Soroban queue initialized before v20");
@@ -2215,10 +2203,10 @@ HerderImpl::start()
22152203
mTxQueueBucketSnapshot = mApp.getBucketManager()
22162204
.getBucketSnapshotManager()
22172205
.copySearchableLiveBucketListSnapshot();
2218-
releaseAssert(!mTransactionQueue);
2219-
mTransactionQueue = std::make_unique<ClassicTransactionQueue>(
2220-
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
2221-
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER);
2206+
mTransactionQueues->setClassicTransactionQueue(
2207+
std::make_unique<ClassicTransactionQueue>(
2208+
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
2209+
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER));
22222210

22232211
mMaxTxSize = mApp.getHerder().getMaxClassicTxSize();
22242212
{
@@ -2378,18 +2366,18 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)
23782366
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot);
23792367
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC))
23802368
{
2381-
mTransactionQueue->update(
2369+
mTransactionQueues->getClassicTransactionQueue().update(
23822370
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header,
23832371
mTxQueueBucketSnapshot, filterInvalidTxs);
23842372
}
23852373

23862374
// Even if we're in protocol 20, still check for number of phases, in case
23872375
// we're dealing with the upgrade ledger that contains old-style transaction
23882376
// set
2389-
if (mSorobanTransactionQueue != nullptr &&
2377+
if (mTransactionQueues->hasSorobanTransactionQueue() &&
23902378
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN))
23912379
{
2392-
mSorobanTransactionQueue->update(
2380+
mTransactionQueues->getSorobanTransactionQueue().update(
23932381
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header,
23942382
mTxQueueBucketSnapshot, filterInvalidTxs);
23952383
}
@@ -2508,37 +2496,34 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt,
25082496
size_t
25092497
HerderImpl::getMaxQueueSizeOps() const
25102498
{
2511-
return mTransactionQueue->getMaxQueueSizeOps();
2499+
return mTransactionQueues->getClassicTransactionQueue()
2500+
.getMaxQueueSizeOps();
25122501
}
25132502

25142503
size_t
25152504
HerderImpl::getMaxQueueSizeSorobanOps() const
25162505
{
2517-
return mSorobanTransactionQueue
2518-
? mSorobanTransactionQueue->getMaxQueueSizeOps()
2506+
return mTransactionQueues->hasSorobanTransactionQueue()
2507+
? mTransactionQueues->getSorobanTransactionQueue()
2508+
.getMaxQueueSizeOps()
25192509
: 0;
25202510
}
25212511

25222512
bool
25232513
HerderImpl::isBannedTx(Hash const& hash) const
25242514
{
2525-
auto banned = mTransactionQueue->isBanned(hash);
2526-
if (mSorobanTransactionQueue)
2527-
{
2528-
banned = banned || mSorobanTransactionQueue->isBanned(hash);
2529-
}
2530-
return banned;
2515+
return mTransactionQueues->isBanned(hash);
25312516
}
25322517

25332518
TransactionFrameBaseConstPtr
25342519
HerderImpl::getTx(Hash const& hash) const
25352520
{
2536-
auto classic = mTransactionQueue->getTx(hash);
2537-
if (!classic && mSorobanTransactionQueue)
2538-
{
2539-
return mSorobanTransactionQueue->getTx(hash);
2540-
}
2541-
return classic;
2521+
return mTransactionQueues->getTx(hash);
2522+
}
2523+
2524+
TransactionQueuesPtr HerderImpl::getTransactionQueues() const {
2525+
releaseAssert(mTransactionQueues);
2526+
return mTransactionQueues;
25422527
}
25432528

25442529
}

src/herder/HerderImpl.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ class HerderImpl : public Herder
198198

199199
virtual void beginApply() override;
200200

201+
TransactionQueuesPtr getTransactionQueues() const override;
202+
201203
void startTxSetGCTimer();
202204

203205
#ifdef BUILD_TESTS
@@ -248,8 +250,11 @@ class HerderImpl : public Herder
248250
void purgeOldPersistedTxSets();
249251
void writeDebugTxSet(LedgerCloseData const& lcd);
250252

251-
std::unique_ptr<ClassicTransactionQueue> mTransactionQueue;
252-
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;
253+
// TODO: Need some way to get these queues
254+
// TODO: Maybe something else should create this and pass it in somehow,
255+
// either via Application or explicitly in the constructor for HerderImpl.
256+
TransactionQueuesPtr const mTransactionQueues =
257+
std::make_shared<TransactionQueues>();
253258

254259
void updateTransactionQueue(TxSetXDRFrameConstPtr txSet);
255260
void maybeSetupSorobanQueue(uint32_t protocolVersion);

0 commit comments

Comments
 (0)