Skip to content

Commit 7416db3

Browse files
committed
Some cleanup around separate tx queue thread
1 parent 9bd18d1 commit 7416db3

File tree

7 files changed

+85
-125
lines changed

7 files changed

+85
-125
lines changed

src/herder/HerderImpl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ Herder::recvTransaction(TransactionQueuesPtr txQueues,
600600
// would be whatever is handling ledger close. However, that will only
601601
// decrease the sourceAccountPending value, which means this erroneously
602602
// rejects (which is safe). I guess it's possible for a user-submitted
603-
// transaction to come in and conflict with the overlay thread, but that
603+
// transaction to come in and conflict with the tx queue thread, but that
604604
// would require them to be simultaneously running two clients and
605605
// submitting from both of them. Still, it might be safest to use some kind
606606
// of atomic function that handles both this check AND the add.

src/herder/HerderImpl.h

-3
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,6 @@ class HerderImpl : public Herder
250250
void purgeOldPersistedTxSets();
251251
void writeDebugTxSet(LedgerCloseData const& lcd);
252252

253-
// TODO: Need some way to get these queues
254-
// TODO: Maybe something else should create this and pass it in somehow,
255-
// either via Application or explicitly in the constructor for HerderImpl.
256253
TransactionQueuesPtr const mTransactionQueues =
257254
std::make_shared<TransactionQueues>();
258255

src/herder/TransactionQueue.h

+5
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class Application;
6767
// incoming SCP messages need to wait for tx queue additions to occur, which
6868
// is bad.
6969
// * My note: Try both approaches and benchmark
70+
// * Follow up note: The priority locking scheme also helps to address
71+
// this.
7072

7173
enum class TxQueueAddResultCode
7274
{
@@ -172,6 +174,9 @@ class TransactionQueue
172174
protected:
173175
// TODO: Docs?
174176
// TODO: Move?
177+
// TODO: It might be worth benchmarking this against the solution that does
178+
// not use priority locking (just uses std::mutex). The added complexity of
179+
// this may not be worth it.
175180
class TxQueueLock : NonMovableOrCopyable
176181
{
177182
public:

src/main/ApplicationImpl.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
199199
if (mConfig.BACKGROUND_TX_QUEUE)
200200
{
201201
// TODO: Keep priority unchanged as tx queue processes time-sensitive
202-
// tasks? Or should tx queue priority be downgraded?
202+
// tasks? Or should tx queue priority be downgraded? The priority
203+
// locking mechanism in TransactionQueue is designed to prevent tx queue
204+
// from starving other work, so it may be fine to keep priority
205+
// unchanged.
203206
mTxQueueThread = std::thread{[this]() { mTxQueueIOContext->run(); }};
204207
mThreadTypes[mTxQueueThread->get_id()] = ThreadType::TX_QUEUE;
205208
}

src/overlay/OverlayManagerImpl.cpp

-11
Original file line numberDiff line numberDiff line change
@@ -1231,17 +1231,6 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
12311231

12321232
// add it to our current set
12331233
// and make sure it is valid
1234-
// TODO: I think I could pull this one call into Peer::recvTransaction
1235-
// and move basically all of the rest of this function to a new function
1236-
// called something like "recordTransactionStats" or something. Then,
1237-
// Peer:recvTransaction would invoke HerderImpl::recvTransaction in the
1238-
// background, and then pass the result to the new function on the main
1239-
// thread. That way I don't have to make OverlayManagerImpl and its
1240-
// dependencies (Floodgate, Peer, TxDemandsManager, maybe more), or much
1241-
// of Peer thread safe. Note that the recordTransactionStats function
1242-
// would probably need to take a shared ptr to the message so that the
1243-
// message doesn't get deleted before the function is called. The lambda
1244-
// capture will need to copy this pointer in.
12451234
auto addResult = mApp.getHerder().recvTransaction(transaction, false);
12461235
recordAddTransactionStats(addResult, transaction->getFullHash(), peer,
12471236
index);

src/overlay/Peer.cpp

+72-102
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,6 @@ void
462462
Peer::maybeExecuteInBackground(std::string const& jobName,
463463
std::function<void(std::shared_ptr<Peer>)> f)
464464
{
465-
// TODO: What if this is running on the txqueue thread? This could be
466-
// invoked via the destructor on the message tracker.
467465
if (useBackgroundThread() &&
468466
!mAppConnector.threadIsType(Application::ThreadType::OVERLAY))
469467
{
@@ -870,119 +868,94 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg)
870868
{
871869
ZoneScoped;
872870
releaseAssert(!threadIsMain() || !useBackgroundThread());
871+
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
873872

874-
// TODO: Remove if I get rid of the special lock scoping vv
875-
std::shared_ptr<CapacityTrackedMessage> msgTracker = nullptr;
876-
877-
// TODO: Move back if I git rid of lock scoping vv
878-
Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION;
879-
std::string queueName;
873+
if (shouldAbort(guard))
880874
{
881-
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
882-
883-
if (shouldAbort(guard))
884-
{
885-
return false;
886-
}
875+
return false;
876+
}
887877

888-
std::string errorMsg;
889-
if (getState(guard) >= GOT_HELLO &&
890-
msg.v0().message.type() != ERROR_MSG)
878+
std::string errorMsg;
879+
if (getState(guard) >= GOT_HELLO && msg.v0().message.type() != ERROR_MSG)
880+
{
881+
if (!mHmac.checkAuthenticatedMessage(msg, errorMsg))
891882
{
892-
if (!mHmac.checkAuthenticatedMessage(msg, errorMsg))
883+
if (!threadIsMain())
893884
{
894-
if (!threadIsMain())
895-
{
896-
mAppConnector.postOnMainThread(
897-
[self = shared_from_this(), errorMsg]() {
898-
self->sendErrorAndDrop(ERR_AUTH, errorMsg);
899-
},
900-
"Peer::sendErrorAndDrop");
901-
}
902-
else
903-
{
904-
sendErrorAndDrop(ERR_AUTH, errorMsg);
905-
}
906-
return false;
885+
mAppConnector.postOnMainThread(
886+
[self = shared_from_this(), errorMsg]() {
887+
self->sendErrorAndDrop(ERR_AUTH, errorMsg);
888+
},
889+
"Peer::sendErrorAndDrop");
890+
}
891+
else
892+
{
893+
sendErrorAndDrop(ERR_AUTH, errorMsg);
907894
}
895+
return false;
908896
}
897+
}
909898

910-
// NOTE: Additionally, we may use state snapshots to verify TRANSACTION
911-
// type messages in the background.
912-
913-
// Start tracking capacity here, so read throttling is applied
914-
// appropriately. Flow control might not be started at that time
915-
msgTracker = std::make_shared<CapacityTrackedMessage>(
916-
shared_from_this(), msg.v0().message);
917-
918-
std::string cat;
919-
920-
switch (msgTracker->getMessage().type())
921-
{
922-
case HELLO:
923-
case AUTH:
924-
cat = AUTH_ACTION_QUEUE;
925-
break;
926-
// control messages
927-
case PEERS:
928-
case ERROR_MSG:
929-
case SEND_MORE:
930-
case SEND_MORE_EXTENDED:
931-
cat = "CTRL";
932-
break;
933-
// high volume flooding
934-
case TRANSACTION:
935-
case FLOOD_ADVERT:
936-
case FLOOD_DEMAND:
937-
{
938-
cat = "TX";
939-
type = Scheduler::ActionType::DROPPABLE_ACTION;
940-
break;
941-
}
899+
// NOTE: Additionally, we may use state snapshots to verify TRANSACTION type
900+
// messages in the background.
942901

943-
// consensus, inbound
944-
case GET_TX_SET:
945-
case GET_SCP_QUORUMSET:
946-
case GET_SCP_STATE:
947-
cat = "SCPQ";
948-
type = Scheduler::ActionType::DROPPABLE_ACTION;
949-
break;
902+
// Start tracking capacity here, so read throttling is applied
903+
// appropriately. Flow control might not be started at that time
904+
auto msgTracker = std::make_shared<CapacityTrackedMessage>(
905+
shared_from_this(), msg.v0().message);
950906

951-
// consensus, self
952-
case DONT_HAVE:
953-
case TX_SET:
954-
case GENERALIZED_TX_SET:
955-
case SCP_QUORUMSET:
956-
case SCP_MESSAGE:
957-
cat = "SCP";
958-
break;
907+
std::string cat;
908+
Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION;
959909

960-
default:
961-
cat = "MISC";
962-
}
910+
switch (msgTracker->getMessage().type())
911+
{
912+
case HELLO:
913+
case AUTH:
914+
cat = AUTH_ACTION_QUEUE;
915+
break;
916+
// control messages
917+
case PEERS:
918+
case ERROR_MSG:
919+
case SEND_MORE:
920+
case SEND_MORE_EXTENDED:
921+
cat = "CTRL";
922+
break;
923+
// high volume flooding
924+
case TRANSACTION:
925+
case FLOOD_ADVERT:
926+
case FLOOD_DEMAND:
927+
{
928+
cat = "TX";
929+
type = Scheduler::ActionType::DROPPABLE_ACTION;
930+
break;
931+
}
963932

964-
// processing of incoming messages during authenticated must be
965-
// in-order, so while not authenticated, place all messages onto
966-
// AUTH_ACTION_QUEUE scheduler queue
967-
queueName = isAuthenticated(guard) ? cat : AUTH_ACTION_QUEUE;
968-
type = isAuthenticated(guard) ? type
969-
: Scheduler::ActionType::NORMAL_ACTION;
933+
// consensus, inbound
934+
case GET_TX_SET:
935+
case GET_SCP_QUORUMSET:
936+
case GET_SCP_STATE:
937+
cat = "SCPQ";
938+
type = Scheduler::ActionType::DROPPABLE_ACTION;
939+
break;
970940

971-
// TODO: This scope (ending here) exists to ensure this doesn't hold the
972-
// state lock upon entry to the transaction queue. This can cause
973-
// deadlocks! I think it's safe to release the lock here as there's no
974-
// longer any state querying. In practice though, if I end up posting
975-
// the tryAdd action onto some tx-queue specific thread, then I can
976-
// remove the scoping I added here and the lock will be released upon
977-
// return from this function (like it always has).
941+
// consensus, self
942+
case DONT_HAVE:
943+
case TX_SET:
944+
case GENERALIZED_TX_SET:
945+
case SCP_QUORUMSET:
946+
case SCP_MESSAGE:
947+
cat = "SCP";
948+
break;
978949

979-
// TODO: Really investigate whether this peer+transaction queue locking
980-
// each other issue can come up anywhere else.
950+
default:
951+
cat = "MISC";
981952
}
982953

983-
// TODO: vv Remove asserts if I get rid of the scoping above
984-
releaseAssert(msgTracker);
985-
releaseAssert(!queueName.empty());
954+
// processing of incoming messages during authenticated must be in-order, so
955+
// while not authenticated, place all messages onto AUTH_ACTION_QUEUE
956+
// scheduler queue
957+
auto queueName = isAuthenticated(guard) ? cat : AUTH_ACTION_QUEUE;
958+
type = isAuthenticated(guard) ? type : Scheduler::ActionType::NORMAL_ACTION;
986959

987960
// If a message is already scheduled, drop
988961
if (mAppConnector.checkScheduledAndCache(msgTracker))
@@ -1019,9 +992,6 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg)
1019992
self->recvMessage(t);
1020993
},
1021994
"Peer::recvMessage"); // TODO: Change message to something better
1022-
// TODO: If I end up running this on a different thread then I need to
1023-
// be sure to std::move `msgTracker` into the lambda as-per the note
1024-
// below.
1025995
}
1026996
else
1027997
{

src/overlay/TCPPeer.cpp

+3-7
Original file line numberDiff line numberDiff line change
@@ -528,14 +528,10 @@ TCPPeer::startRead()
528528
ZoneScoped;
529529
releaseAssert(!threadIsMain() || !useBackgroundThread());
530530
releaseAssert(canRead());
531+
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
532+
if (shouldAbort(guard))
531533
{
532-
RECURSIVE_LOCK_GUARD(mStateMutex, guard);
533-
if (shouldAbort(guard))
534-
{
535-
return;
536-
}
537-
// TODO: Remove this outer scoping if I add separate thread for bg tx
538-
// queue
534+
return;
539535
}
540536

541537
mThreadVars.getIncomingHeader().clear();

0 commit comments

Comments
 (0)