@@ -827,7 +827,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
827
827
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0 , hdr};
828
828
}
829
829
830
- size_t CConnman::SocketSendData (CNode& node) const
830
+ std::pair< size_t , bool > CConnman::SocketSendData (CNode& node) const
831
831
{
832
832
auto it = node.vSendMsg .begin ();
833
833
size_t nSentSize = 0 ;
@@ -882,7 +882,7 @@ size_t CConnman::SocketSendData(CNode& node) const
882
882
assert (node.nSendSize == 0 );
883
883
}
884
884
node.vSendMsg .erase (node.vSendMsg .begin (), it);
885
- return nSentSize;
885
+ return { nSentSize, !node. vSendMsg . empty ()} ;
886
886
}
887
887
888
888
/* * Try to find a connection to evict when the node is full.
@@ -1217,37 +1217,15 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
1217
1217
}
1218
1218
1219
1219
for (CNode* pnode : nodes) {
1220
- // Implement the following logic:
1221
- // * If there is data to send, select() for sending data. As this only
1222
- // happens when optimistic write failed, we choose to first drain the
1223
- // write buffer in this case before receiving more. This avoids
1224
- // needlessly queueing received data, if the remote peer is not themselves
1225
- // receiving data. This means properly utilizing TCP flow control signalling.
1226
- // * Otherwise, if there is space left in the receive buffer, select() for
1227
- // receiving data.
1228
- // * Hand off all complete messages to the processor, to be handled without
1229
- // blocking here.
1230
-
1231
1220
bool select_recv = !pnode->fPauseRecv ;
1232
- bool select_send;
1233
- {
1234
- LOCK (pnode->cs_vSend );
1235
- select_send = !pnode->vSendMsg .empty ();
1236
- }
1221
+ bool select_send = WITH_LOCK (pnode->cs_vSend , return !pnode->vSendMsg .empty ());
1222
+ if (!select_recv && !select_send) continue ;
1237
1223
1238
1224
LOCK (pnode->m_sock_mutex );
1239
- if (!pnode->m_sock ) {
1240
- continue ;
1225
+ if (pnode->m_sock ) {
1226
+ Sock::Event event = (select_send ? Sock::SEND : 0 ) | (select_recv ? Sock::RECV : 0 );
1227
+ events_per_sock.emplace (pnode->m_sock , Sock::Events{event});
1241
1228
}
1242
-
1243
- Sock::Event requested{0 };
1244
- if (select_send) {
1245
- requested = Sock::SEND;
1246
- } else if (select_recv) {
1247
- requested = Sock::RECV;
1248
- }
1249
-
1250
- events_per_sock.emplace (pnode->m_sock , Sock::Events{requested});
1251
1229
}
1252
1230
1253
1231
return events_per_sock;
@@ -1308,6 +1286,24 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
1308
1286
errorSet = it->second .occurred & Sock::ERR;
1309
1287
}
1310
1288
}
1289
+
1290
+ if (sendSet) {
1291
+ // Send data
1292
+ auto [bytes_sent, data_left] = WITH_LOCK (pnode->cs_vSend , return SocketSendData (*pnode));
1293
+ if (bytes_sent) {
1294
+ RecordBytesSent (bytes_sent);
1295
+
1296
+ // If both receiving and (non-optimistic) sending were possible, we first attempt
1297
+ // sending. If that succeeds, but does not fully drain the send queue, do not
1298
+ // attempt to receive. This avoids needlessly queueing data if the remote peer
1299
+ // is slow at receiving data, by means of TCP flow control. We only do this when
1300
+ // sending actually succeeded to make sure progress is always made; otherwise a
1301
+ // deadlock would be possible when both sides have data to send, but neither is
1302
+ // receiving.
1303
+ if (data_left) recvSet = false ;
1304
+ }
1305
+ }
1306
+
1311
1307
if (recvSet || errorSet)
1312
1308
{
1313
1309
// typical socket buffer is 8K-64K
@@ -1354,12 +1350,6 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
1354
1350
}
1355
1351
}
1356
1352
1357
- if (sendSet) {
1358
- // Send data
1359
- size_t bytes_sent = WITH_LOCK (pnode->cs_vSend , return SocketSendData (*pnode));
1360
- if (bytes_sent) RecordBytesSent (bytes_sent);
1361
- }
1362
-
1363
1353
if (InactivityCheck (*pnode)) pnode->fDisconnect = true ;
1364
1354
}
1365
1355
}
@@ -2887,7 +2877,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
2887
2877
if (nMessageSize) pnode->vSendMsg .push_back (std::move (msg.data ));
2888
2878
2889
2879
// If write queue empty, attempt "optimistic write"
2890
- if (optimisticSend) nBytesSent = SocketSendData (*pnode);
2880
+ bool data_left;
2881
+ if (optimisticSend) std::tie (nBytesSent, data_left) = SocketSendData (*pnode);
2891
2882
}
2892
2883
if (nBytesSent) RecordBytesSent (nBytesSent);
2893
2884
}
0 commit comments