@@ -16,8 +16,7 @@ namespace stellar
16
16
{
17
17
18
18
size_t
19
- FlowControl::getOutboundQueueByteLimit (
20
- std::lock_guard<std::mutex>& lockGuard) const
19
+ FlowControl::getOutboundQueueByteLimit (MutexLocker& lockGuard) const
21
20
{
22
21
#ifdef BUILD_TESTS
23
22
if (mOutboundQueueLimit )
@@ -44,7 +43,7 @@ FlowControl::FlowControl(AppConnector& connector, bool useBackgroundThread)
44
43
45
44
bool
46
45
FlowControl::hasOutboundCapacity (StellarMessage const & msg,
47
- std::lock_guard<std::mutex> & lockGuard) const
46
+ MutexLocker & lockGuard) const
48
47
{
49
48
releaseAssert (!threadIsMain () || !mUseBackgroundThread );
50
49
return mFlowControlCapacity .hasOutboundCapacity (msg) &&
55
54
FlowControl::noOutboundCapacityTimeout (VirtualClock::time_point now,
56
55
std::chrono::seconds timeout) const
57
56
{
58
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
57
+ MutexLocker guard (mFlowControlMutex );
59
58
return mNoOutboundCapacity && now - *mNoOutboundCapacity >= timeout;
60
59
}
61
60
62
61
void
63
62
FlowControl::setPeerID (NodeID const & peerID)
64
63
{
65
64
releaseAssert (threadIsMain ());
66
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
65
+ MutexLocker guard (mFlowControlMutex );
67
66
mNodeID = peerID;
68
67
}
69
68
@@ -72,7 +71,7 @@ FlowControl::maybeReleaseCapacity(StellarMessage const& msg)
72
71
{
73
72
ZoneScoped;
74
73
releaseAssert (threadIsMain ());
75
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
74
+ MutexLocker guard (mFlowControlMutex );
76
75
77
76
if (msg.type () == SEND_MORE_EXTENDED)
78
77
{
@@ -103,7 +102,7 @@ FlowControl::processSentMessages(
103
102
ZoneScoped;
104
103
releaseAssert (!threadIsMain () || !mUseBackgroundThread );
105
104
106
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
105
+ MutexLocker guard (mFlowControlMutex );
107
106
for (int i = 0 ; i < sentMessages.size (); i++)
108
107
{
109
108
auto const & sentMsgs = sentMessages[i];
@@ -162,7 +161,7 @@ FlowControl::getNextBatchToSend()
162
161
ZoneScoped;
163
162
releaseAssert (!threadIsMain () || !mUseBackgroundThread );
164
163
165
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
164
+ MutexLocker guard (mFlowControlMutex );
166
165
std::vector<QueuedOutboundMessage> batchToSend;
167
166
168
167
int sent = 0 ;
@@ -217,7 +216,7 @@ FlowControl::updateMsgMetrics(std::shared_ptr<StellarMessage const> msg,
217
216
{
218
217
// The lock isn't strictly needed here, but is added for consistency and
219
218
// future-proofing this function
220
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
219
+ MutexLocker guard (mFlowControlMutex );
221
220
auto diff = mAppConnector .now () - timePlaced;
222
221
223
222
auto updateQueueDelay = [&](auto & queue, auto & metrics) {
@@ -258,7 +257,7 @@ FlowControl::handleTxSizeIncrease(uint32_t increase)
258
257
{
259
258
ZoneScoped;
260
259
releaseAssert (threadIsMain ());
261
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
260
+ MutexLocker guard (mFlowControlMutex );
262
261
releaseAssert (increase > 0 );
263
262
// Bump flood capacity to accommodate the upgrade
264
263
mFlowControlBytesCapacity .handleTxSizeIncrease (increase);
@@ -269,7 +268,7 @@ FlowControl::beginMessageProcessing(StellarMessage const& msg)
269
268
{
270
269
ZoneScoped;
271
270
releaseAssert (!threadIsMain () || !mUseBackgroundThread );
272
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
271
+ MutexLocker guard (mFlowControlMutex );
273
272
274
273
return mFlowControlCapacity .lockLocalCapacity (msg) &&
275
274
mFlowControlBytesCapacity .lockLocalCapacity (msg);
@@ -279,7 +278,7 @@ SendMoreCapacity
279
278
FlowControl::endMessageProcessing (StellarMessage const & msg)
280
279
{
281
280
ZoneScoped;
282
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
281
+ MutexLocker guard (mFlowControlMutex );
283
282
284
283
mFloodDataProcessed += mFlowControlCapacity .releaseLocalCapacity (msg);
285
284
mFloodDataProcessedBytes +=
@@ -318,7 +317,7 @@ FlowControl::endMessageProcessing(StellarMessage const& msg)
318
317
}
319
318
320
319
bool
321
- FlowControl::canRead (std::lock_guard<std::mutex> const & guard) const
320
+ FlowControl::canRead (MutexLocker const & guard) const
322
321
{
323
322
return mFlowControlBytesCapacity .canRead () &&
324
323
mFlowControlCapacity .canRead ();
@@ -327,7 +326,7 @@ FlowControl::canRead(std::lock_guard<std::mutex> const& guard) const
327
326
bool
328
327
FlowControl::canRead () const
329
328
{
330
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
329
+ MutexLocker guard (mFlowControlMutex );
331
330
return canRead (guard);
332
331
}
333
332
@@ -365,7 +364,7 @@ FlowControl::isSendMoreValid(StellarMessage const& msg,
365
364
std::string& errorMsg) const
366
365
{
367
366
releaseAssert (threadIsMain ());
368
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
367
+ MutexLocker guard (mFlowControlMutex );
369
368
370
369
if (msg.type () != SEND_MORE_EXTENDED)
371
370
{
@@ -404,7 +403,7 @@ FlowControl::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
404
403
{
405
404
ZoneScoped;
406
405
releaseAssert (threadIsMain ());
407
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
406
+ MutexLocker guard (mFlowControlMutex );
408
407
releaseAssert (msg);
409
408
auto type = msg->type ();
410
409
size_t msgQInd = 0 ;
@@ -461,16 +460,12 @@ FlowControl::addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg)
461
460
auto & om = mOverlayMetrics ;
462
461
if (type == TRANSACTION)
463
462
{
464
- auto isOverLimit = [&](auto const & queue) {
465
- bool overLimit =
466
- queue.size () > limit ||
467
- mTxQueueByteCount > getOutboundQueueByteLimit (guard);
468
- return overLimit;
469
- };
463
+ bool isOverLimit = queue.size () > limit ||
464
+ mTxQueueByteCount > getOutboundQueueByteLimit (guard);
470
465
471
466
// If we are at limit, we're probably really behind, so drop the entire
472
467
// queue
473
- if (isOverLimit (queue) )
468
+ if (isOverLimit)
474
469
{
475
470
dropped = queue.size ();
476
471
mTxQueueByteCount = 0 ;
@@ -559,7 +554,7 @@ Json::Value
559
554
FlowControl::getFlowControlJsonInfo (bool compact) const
560
555
{
561
556
releaseAssert (threadIsMain ());
562
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
557
+ MutexLocker guard (mFlowControlMutex );
563
558
564
559
Json::Value res;
565
560
if (mFlowControlCapacity .getCapacity ().mTotalCapacity )
@@ -601,7 +596,7 @@ FlowControl::getFlowControlJsonInfo(bool compact) const
601
596
bool
602
597
FlowControl::maybeThrottleRead ()
603
598
{
604
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
599
+ MutexLocker guard (mFlowControlMutex );
605
600
if (!canRead (guard))
606
601
{
607
602
CLOG_DEBUG (Overlay, " Throttle reading from peer {}" ,
@@ -615,7 +610,7 @@ FlowControl::maybeThrottleRead()
615
610
void
616
611
FlowControl::stopThrottling ()
617
612
{
618
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
613
+ MutexLocker guard (mFlowControlMutex );
619
614
releaseAssert (mLastThrottle );
620
615
CLOG_DEBUG (Overlay, " Stop throttling reading from peer {}" ,
621
616
mAppConnector .getConfig ().toShortString (mNodeID ));
@@ -627,7 +622,7 @@ FlowControl::stopThrottling()
627
622
bool
628
623
FlowControl::isThrottled () const
629
624
{
630
- std::lock_guard<std::mutex> guard (mFlowControlMutex );
625
+ MutexLocker guard (mFlowControlMutex );
631
626
return static_cast <bool >(mLastThrottle );
632
627
}
633
628
0 commit comments