Skip to content

Commit 9bd18d1

Browse files
committed
Prioritize main thread in locking
1 parent 4820f7f commit 9bd18d1

File tree

2 files changed

+65
-24
lines changed

2 files changed

+65
-24
lines changed

src/herder/TransactionQueue.cpp

+34-19
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ isDuplicateTx(TransactionFrameBasePtr oldTx, TransactionFrameBasePtr newTx)
270270
bool
271271
TransactionQueue::sourceAccountPending(AccountID const& accountID) const
272272
{
273-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
273+
TxQueueLock guard = lock();
274274
return mAccountStates.find(accountID) != mAccountStates.end();
275275
}
276276

@@ -328,6 +328,22 @@ validateSorobanMemo(TransactionFrameBasePtr tx)
328328
return true;
329329
}
330330

331+
TransactionQueue::TxQueueLock
332+
TransactionQueue::lock() const
333+
{
334+
if (threadIsMain())
335+
{
336+
mMainThreadWaiting.store(true);
337+
std::unique_lock<std::mutex> lock(mTxQueueMutex);
338+
mMainThreadWaiting.store(false);
339+
return TxQueueLock(std::move(lock), mTxQueueCv);
340+
}
341+
std::unique_lock<std::mutex> lock(mTxQueueMutex);
342+
mTxQueueCv->wait(lock, [this] { return !mMainThreadWaiting.load(); });
343+
return TxQueueLock(std::move(lock), mTxQueueCv);
344+
}
345+
346+
331347
TransactionQueue::AddResult
332348
TransactionQueue::canAdd(
333349
TransactionFrameBasePtr tx, AccountStates::iterator& stateIter,
@@ -641,7 +657,7 @@ TransactionQueue::AddResult
641657
TransactionQueue::tryAdd(TransactionFrameBasePtr tx, bool submittedFromSelf)
642658
{
643659
ZoneScoped;
644-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
660+
TxQueueLock guard = lock();
645661
CLOG_DEBUG(Tx, "Try add tx {} in {}", hexAbbrev(tx->getFullHash()),
646662
threadIsMain() ? "foreground" : "background");
647663

@@ -818,7 +834,7 @@ void
818834
TransactionQueue::ban(Transactions const& banTxs)
819835
{
820836
ZoneScoped;
821-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
837+
TxQueueLock guard = lock();
822838
banInternal(banTxs);
823839
}
824840

@@ -871,7 +887,7 @@ TransactionQueue::AccountState
871887
TransactionQueue::getAccountTransactionQueueInfo(
872888
AccountID const& accountID) const
873889
{
874-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
890+
TxQueueLock guard = lock();
875891
auto i = mAccountStates.find(accountID);
876892
if (i == std::end(mAccountStates))
877893
{
@@ -883,7 +899,7 @@ TransactionQueue::getAccountTransactionQueueInfo(
883899
size_t
884900
TransactionQueue::countBanned(int index) const
885901
{
886-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
902+
TxQueueLock guard = lock();
887903
return mBannedTransactions[index].size();
888904
}
889905
#endif
@@ -958,7 +974,7 @@ TransactionQueue::shift()
958974
bool
959975
TransactionQueue::isBanned(Hash const& hash) const
960976
{
961-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
977+
TxQueueLock guard = lock();
962978
return isBannedInternal(hash);
963979
}
964980

@@ -976,7 +992,7 @@ TxFrameList
976992
TransactionQueue::getTransactions(LedgerHeader const& lcl) const
977993
{
978994
ZoneScoped;
979-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
995+
TxQueueLock guard = lock();
980996
return getTransactionsInternal(lcl);
981997
}
982998

@@ -1004,7 +1020,7 @@ TransactionFrameBaseConstPtr
10041020
TransactionQueue::getTx(Hash const& hash) const
10051021
{
10061022
ZoneScoped;
1007-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1023+
TxQueueLock guard = lock();
10081024
auto it = mKnownTxHashes.find(hash);
10091025
if (it != mKnownTxHashes.end())
10101026
{
@@ -1223,7 +1239,7 @@ size_t
12231239
SorobanTransactionQueue::getMaxQueueSizeOps() const
12241240
{
12251241
ZoneScoped;
1226-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1242+
TxQueueLock guard = lock();
12271243
if (protocolVersionStartsFrom(
12281244
mBucketSnapshot->getLedgerHeader().ledgerVersion,
12291245
SOROBAN_PROTOCOL_VERSION))
@@ -1317,8 +1333,7 @@ ClassicTransactionQueue::broadcastSome()
13171333
}
13181334

13191335
void
1320-
TransactionQueue::broadcast(bool fromCallback,
1321-
std::lock_guard<std::mutex> const& guard)
1336+
TransactionQueue::broadcast(bool fromCallback, TxQueueLock const& guard)
13221337
{
13231338
// Must be called from the main thread due to the use of `mBroadcastTimer`
13241339
releaseAssert(threadIsMain());
@@ -1357,12 +1372,12 @@ TransactionQueue::broadcast(bool fromCallback,
13571372
void
13581373
TransactionQueue::broadcast(bool fromCallback)
13591374
{
1360-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1375+
TxQueueLock guard = lock();
13611376
broadcast(fromCallback, guard);
13621377
}
13631378

13641379
void
1365-
TransactionQueue::rebroadcast(std::lock_guard<std::mutex> const& guard)
1380+
TransactionQueue::rebroadcast(TxQueueLock const& guard)
13661381
{
13671382
// For `broadcast` call
13681383
releaseAssert(threadIsMain());
@@ -1383,7 +1398,7 @@ void
13831398
TransactionQueue::shutdown()
13841399
{
13851400
releaseAssert(threadIsMain());
1386-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1401+
TxQueueLock guard = lock();
13871402
mShutdown = true;
13881403
mBroadcastTimer.cancel();
13891404
}
@@ -1396,7 +1411,7 @@ TransactionQueue::update(
13961411
{
13971412
ZoneScoped;
13981413
releaseAssert(threadIsMain());
1399-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1414+
TxQueueLock guard = lock();
14001415

14011416
mValidationSnapshot =
14021417
std::make_shared<ImmutableValidationSnapshot>(mAppConn);
@@ -1455,7 +1470,7 @@ void
14551470
TransactionQueue::updateSnapshots(
14561471
SearchableSnapshotConstPtr const& newBucketSnapshot)
14571472
{
1458-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1473+
TxQueueLock guard = lock();
14591474
mValidationSnapshot =
14601475
std::make_shared<ImmutableValidationSnapshot>(mAppConn);
14611476
mBucketSnapshot = newBucketSnapshot;
@@ -1465,14 +1480,14 @@ TransactionQueue::updateSnapshots(
14651480
size_t
14661481
TransactionQueue::getQueueSizeOps() const
14671482
{
1468-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1483+
TxQueueLock guard = lock();
14691484
return mTxQueueLimiter.size();
14701485
}
14711486

14721487
std::optional<int64_t>
14731488
TransactionQueue::getInQueueSeqNum(AccountID const& account) const
14741489
{
1475-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1490+
TxQueueLock guard = lock();
14761491
auto stateIter = mAccountStates.find(account);
14771492
if (stateIter == mAccountStates.end())
14781493
{
@@ -1490,7 +1505,7 @@ size_t
14901505
ClassicTransactionQueue::getMaxQueueSizeOps() const
14911506
{
14921507
ZoneScoped;
1493-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
1508+
TxQueueLock guard = lock();
14941509
auto res = mTxQueueLimiter.maxScaledLedgerResources(false);
14951510
releaseAssert(res.size() == NUM_CLASSIC_TX_RESOURCES);
14961511
return res.getVal(Resource::Type::OPERATIONS);

src/herder/TransactionQueue.h

+31-5
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,27 @@ class TransactionQueue
170170
#endif
171171

172172
protected:
173+
// TODO: Docs?
174+
// TODO: Move?
175+
class TxQueueLock : NonMovableOrCopyable
176+
{
177+
public:
178+
TxQueueLock(std::unique_lock<std::mutex>&& lock,
179+
std::shared_ptr<std::condition_variable> cv)
180+
: mLock(std::move(lock)), mCv(cv)
181+
{
182+
}
183+
~TxQueueLock()
184+
{
185+
// Wake threads on destruction
186+
mCv->notify_all();
187+
}
188+
189+
private:
190+
std::unique_lock<std::mutex> mLock;
191+
std::shared_ptr<std::condition_variable> mCv;
192+
};
193+
173194
/**
174195
* The AccountState for every account. As noted above, an AccountID is in
175196
* AccountStates iff at least one of the following is true for the
@@ -232,7 +253,7 @@ class TransactionQueue
232253
// internal call should call the second overload (which enforces that the
233254
// lock is already held).
234255
void broadcast(bool fromCallback);
235-
void broadcast(bool fromCallback, std::lock_guard<std::mutex> const& guard);
256+
void broadcast(bool fromCallback, TxQueueLock const& guard);
236257
// broadcasts a single transaction
237258
enum class BroadcastStatus
238259
{
@@ -259,6 +280,8 @@ class TransactionQueue
259280
// held.
260281
void banInternal(Transactions const& banTxs);
261282

283+
TxQueueLock lock() const;
284+
262285
// Snapshots to use for transaction validation
263286
ImmutableValidationSnapshotPtr mValidationSnapshot;
264287
SearchableSnapshotConstPtr mBucketSnapshot;
@@ -270,11 +293,14 @@ class TransactionQueue
270293

271294
size_t mBroadcastSeed;
272295

273-
mutable std::mutex mTxQueueMutex;
274-
275296
private:
276297
AppConnector& mAppConn;
277298

299+
mutable std::mutex mTxQueueMutex;
300+
mutable std::shared_ptr<std::condition_variable> mTxQueueCv =
301+
std::make_shared<std::condition_variable>();
302+
mutable std::atomic<bool> mMainThreadWaiting{false};
303+
278304
void removeApplied(Transactions const& txs);
279305

280306
/**
@@ -286,7 +312,7 @@ class TransactionQueue
286312

287313
// TODO: Explain that this takes a lock guard due to the `broadcast` call
288314
// that it makes.
289-
void rebroadcast(std::lock_guard<std::mutex> const& guard);
315+
void rebroadcast(TxQueueLock const& guard);
290316

291317
// TODO: Docs
292318
// Private versions of public functions that contain the actual
@@ -327,7 +353,7 @@ class SorobanTransactionQueue : public TransactionQueue
327353
void
328354
clearBroadcastCarryover()
329355
{
330-
std::lock_guard<std::mutex> guard(mTxQueueMutex);
356+
TxQueueLock lock = TransactionQueue::lock();
331357
mBroadcastOpCarryover.clear();
332358
mBroadcastOpCarryover.resize(1, Resource::makeEmptySoroban());
333359
}

0 commit comments

Comments
 (0)