Skip to content

Commit 7d5d5ad

Browse files
Sjorsvasild
andcommitted
Add Sv2Connman
Co-Authored-By: Vasil Dimov <[email protected]>
1 parent 32b8b4f commit 7d5d5ad

File tree

5 files changed

+772
-0
lines changed

5 files changed

+772
-0
lines changed

src/sv2/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
add_library(bitcoin_sv2 STATIC EXCLUDE_FROM_ALL
66
noise.cpp
77
transport.cpp
8+
connman.cpp
89
)
910

1011
target_link_libraries(bitcoin_sv2
1112
PRIVATE
1213
core_interface
1314
bitcoin_clientversion
1415
bitcoin_crypto
16+
bitcoin_common # for SockMan
1517
$<$<PLATFORM_ID:Windows>:ws2_32>
1618
)

src/sv2/connman.cpp

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
// Copyright (c) 2023-present The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <sv2/connman.h>
6+
#include <sv2/messages.h>
7+
#include <logging.h>
8+
#include <sync.h>
9+
#include <util/thread.h>
10+
11+
using node::Sv2MsgType;
12+
13+
Sv2Connman::~Sv2Connman()
14+
{
15+
AssertLockNotHeld(m_clients_mutex);
16+
17+
{
18+
LOCK(m_clients_mutex);
19+
for (const auto& client : m_sv2_clients) {
20+
LogTrace(BCLog::SV2, "Disconnecting client id=%zu\n",
21+
client.first);
22+
client.second->m_disconnect_flag = true;
23+
}
24+
DisconnectFlagged();
25+
}
26+
27+
Interrupt();
28+
StopThreads();
29+
}
30+
31+
bool Sv2Connman::Start(Sv2EventsInterface* msgproc, std::string host, uint16_t port)
32+
{
33+
m_msgproc = msgproc;
34+
35+
if (!Bind(host, port)) return false;
36+
37+
SockMan::Options sockman_options;
38+
StartSocketsThreads(sockman_options);
39+
40+
return true;
41+
}
42+
43+
bool Sv2Connman::Bind(std::string host, uint16_t port)
44+
{
45+
const CService addr_bind = LookupNumeric(host, port);
46+
47+
bilingual_str error;
48+
if (!BindAndStartListening(addr_bind, error)) {
49+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Template Provider failed to bind to port %d: %s\n", port, error.original);
50+
return false;
51+
}
52+
53+
LogPrintLevel(BCLog::SV2, BCLog::Level::Info, "%s listening on %s:%d\n", SV2_PROTOCOL_NAMES.at(m_subprotocol), host, port);
54+
55+
return true;
56+
}
57+
58+
59+
void Sv2Connman::DisconnectFlagged()
60+
{
61+
AssertLockHeld(m_clients_mutex);
62+
63+
// Remove clients that are flagged for disconnection.
64+
auto it = m_sv2_clients.begin();
65+
while(it != m_sv2_clients.end()) {
66+
if (it->second->m_send_messages.empty() && it->second->m_disconnect_flag) {
67+
CloseConnection(it->second->m_id);
68+
it = m_sv2_clients.erase(it);
69+
} else {
70+
it++;
71+
}
72+
}
73+
}
74+
75+
void Sv2Connman::EventIOLoopCompletedForAll()
76+
{
77+
LOCK(m_clients_mutex);
78+
DisconnectFlagged();
79+
}
80+
81+
void Sv2Connman::Interrupt()
82+
{
83+
interruptNet();
84+
}
85+
86+
void Sv2Connman::StopThreads()
87+
{
88+
JoinSocketsThreads();
89+
}
90+
91+
std::shared_ptr<Sv2Client> Sv2Connman::GetClientById(NodeId node_id) const
92+
{
93+
auto it{m_sv2_clients.find(node_id)};
94+
if (it != m_sv2_clients.end()) {
95+
return it->second;
96+
}
97+
return nullptr;
98+
}
99+
100+
bool Sv2Connman::EventNewConnectionAccepted(NodeId node_id,
101+
const CService& addr_bind_,
102+
const CService& addr_)
103+
{
104+
Assume(m_certificate);
105+
LOCK(m_clients_mutex);
106+
std::unique_ptr transport = std::make_unique<Sv2Transport>(m_static_key, m_certificate.value());
107+
auto client = std::make_shared<Sv2Client>(node_id, std::move(transport));
108+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "New client id=%zu connected\n", node_id);
109+
m_sv2_clients.emplace(node_id, std::move(client));
110+
return true;
111+
}
112+
113+
void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
114+
{
115+
AssertLockNotHeld(m_clients_mutex);
116+
117+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
118+
if (client == nullptr) {
119+
cancel_recv = true;
120+
return;
121+
}
122+
123+
auto it = client->m_send_messages.begin();
124+
std::optional<bool> expected_more;
125+
126+
size_t total_sent = 0;
127+
128+
while(true) {
129+
if (it != client->m_send_messages.end()) {
130+
// If possible, move one message from the send queue to the transport.
131+
// This fails when there is an existing message still being sent,
132+
// or when the handshake has not yet completed.
133+
//
134+
// Wrap Sv2NetMsg inside CSerializedNetMsg for transport
135+
CSerializedNetMsg net_msg{*it};
136+
if (client->m_transport->SetMessageToSend(net_msg)) {
137+
++it;
138+
}
139+
}
140+
141+
const auto& [data, more, _m_message_type] = client->m_transport->GetBytesToSend(/*have_next_message=*/it != client->m_send_messages.end());
142+
143+
144+
// We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
145+
// bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
146+
// verify that the previously returned 'more' was correct.
147+
if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
148+
expected_more = more;
149+
150+
ssize_t sent = 0;
151+
std::string errmsg;
152+
153+
if (!data.empty()) {
154+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Send %d bytes to client id=%zu\n",
155+
data.size() - total_sent, node_id);
156+
157+
sent = SendBytes(node_id, data, more, errmsg);
158+
}
159+
160+
if (sent > 0) {
161+
client->m_transport->MarkBytesSent(sent);
162+
if (static_cast<size_t>(sent) != data.size()) {
163+
// could not send full message; stop sending more
164+
break;
165+
}
166+
} else {
167+
if (sent < 0) {
168+
LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node_id, errmsg);
169+
CloseConnection(node_id);
170+
}
171+
break;
172+
}
173+
}
174+
175+
// Clear messages that have been handed to transport from the queue
176+
client->m_send_messages.erase(client->m_send_messages.begin(), it);
177+
178+
// If both receiving and (non-optimistic) sending were possible, we first attempt
179+
// sending. If that succeeds, but does not fully drain the send queue, do not
180+
// attempt to receive. This avoids needlessly queueing data if the remote peer
181+
// is slow at receiving data, by means of TCP flow control. We only do this when
182+
// sending actually succeeded to make sure progress is always made; otherwise a
183+
// deadlock would be possible when both sides have data to send, but neither is
184+
// receiving.
185+
//
186+
// TODO: decide if this is useful for Sv2
187+
cancel_recv = total_sent > 0; // && more;
188+
}
189+
190+
void Sv2Connman::EventGotData(Id id, std::span<const uint8_t> data)
191+
{
192+
AssertLockNotHeld(m_clients_mutex);
193+
194+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(id);)};
195+
if (client == nullptr) {
196+
return;
197+
}
198+
199+
try {
200+
while (data.size() > 0) {
201+
// absorb network data
202+
if (!client->m_transport->ReceivedBytes(data)) {
203+
// Serious transport problem
204+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Transport problem, disconnecting client id=%zu\n",
205+
client->m_id);
206+
// TODO: should we even bother with this?
207+
client->m_disconnect_flag = true;
208+
break;
209+
}
210+
211+
if (client->m_transport->ReceivedMessageComplete()) {
212+
bool dummy_reject_message = false;
213+
Sv2NetMsg msg = client->m_transport->GetReceivedMessage(std::chrono::milliseconds(0), dummy_reject_message);
214+
ProcessSv2Message(msg, *client.get());
215+
}
216+
}
217+
} catch (const std::exception& e) {
218+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received error when processing client id=%zu message: %s\n", client->m_id, e.what());
219+
client->m_disconnect_flag = true;
220+
}
221+
222+
}
223+
224+
void Sv2Connman::EventGotEOF(NodeId node_id)
225+
{
226+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
227+
if (client == nullptr) return;
228+
client->m_disconnect_flag = true;
229+
}
230+
231+
void Sv2Connman::EventGotPermanentReadError(NodeId node_id, const std::string& errmsg)
232+
{
233+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
234+
if (client == nullptr) return;
235+
client->m_disconnect_flag = true;
236+
}
237+
238+
void Sv2Connman::ProcessSv2Message(const Sv2NetMsg& sv2_net_msg, Sv2Client& client)
239+
{
240+
uint8_t msg_type[1] = {uint8_t(sv2_net_msg.m_msg_type)};
241+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Received 0x%s %s from client id=%zu\n",
242+
// After clang-17:
243+
// std::format("{:x}", uint8_t(sv2_net_msg.m_msg_type)),
244+
HexStr(msg_type),
245+
node::SV2_MSG_NAMES.at(sv2_net_msg.m_msg_type), client.m_id);
246+
247+
DataStream ss (sv2_net_msg.m_msg);
248+
249+
if (client.m_disconnect_flag) {
250+
// Don't bother processing new messages if we are about to disconnect when the
251+
// send queue empties. This also prevents us from appending to the send queue
252+
// when m_disconnect_flag is set.
253+
return;
254+
}
255+
256+
switch (sv2_net_msg.m_msg_type)
257+
{
258+
case Sv2MsgType::SETUP_CONNECTION:
259+
{
260+
if (client.m_setup_connection_confirmed) {
261+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Client client id=%zu connection has already been confirmed\n",
262+
client.m_id);
263+
return;
264+
}
265+
266+
node::Sv2SetupConnectionMsg setup_conn;
267+
try {
268+
ss >> setup_conn;
269+
} catch (const std::exception& e) {
270+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid SetupConnection message from client id=%zu: %s\n",
271+
client.m_id, e.what());
272+
client.m_disconnect_flag = true;
273+
return;
274+
}
275+
276+
// Disconnect a client that connects on the wrong subprotocol.
277+
if (setup_conn.m_protocol != m_subprotocol) {
278+
node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"unsupported-protocol"}};
279+
280+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnectionError to client id=%zu\n",
281+
client.m_id);
282+
client.m_send_messages.emplace_back(setup_conn_err);
283+
284+
client.m_disconnect_flag = true;
285+
return;
286+
}
287+
288+
// Disconnect a client if they are not running a compatible protocol version.
289+
if ((m_protocol_version < setup_conn.m_min_version) || (m_protocol_version > setup_conn.m_max_version)) {
290+
node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"protocol-version-mismatch"}};
291+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnection.Error to client id=%zu\n",
292+
client.m_id);
293+
client.m_send_messages.emplace_back(setup_conn_err);
294+
295+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received a connection from client id=%zu with incompatible protocol_versions: min_version: %d, max_version: %d\n",
296+
client.m_id, setup_conn.m_min_version, setup_conn.m_max_version);
297+
client.m_disconnect_flag = true;
298+
return;
299+
}
300+
301+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x01 SetupConnection.Success to client id=%zu\n",
302+
client.m_id);
303+
node::Sv2SetupConnectionSuccessMsg setup_success{m_protocol_version, m_optional_features};
304+
client.m_send_messages.emplace_back(setup_success);
305+
306+
client.m_setup_connection_confirmed = true;
307+
308+
break;
309+
}
310+
case Sv2MsgType::COINBASE_OUTPUT_CONSTRAINTS:
311+
{
312+
if (!client.m_setup_connection_confirmed) {
313+
client.m_disconnect_flag = true;
314+
return;
315+
}
316+
317+
node::Sv2CoinbaseOutputConstraintsMsg coinbase_output_constraints;
318+
try {
319+
ss >> coinbase_output_constraints;
320+
client.m_coinbase_output_constraints_recv = true;
321+
} catch (const std::exception& e) {
322+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid CoinbaseOutputConstraints message from client id=%zu: %s\n",
323+
client.m_id, e.what());
324+
client.m_disconnect_flag = true;
325+
return;
326+
}
327+
328+
uint32_t max_additional_size = coinbase_output_constraints.m_coinbase_output_max_additional_size;
329+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "coinbase_output_max_additional_size=%d bytes\n", max_additional_size);
330+
331+
if (max_additional_size > MAX_BLOCK_WEIGHT) {
332+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received impossible CoinbaseOutputConstraints from client id=%zu: %d\n",
333+
client.m_id, max_additional_size);
334+
client.m_disconnect_flag = true;
335+
return;
336+
}
337+
338+
client.m_coinbase_tx_outputs_size = coinbase_output_constraints.m_coinbase_output_max_additional_size;
339+
340+
break;
341+
}
342+
default: {
343+
uint8_t msg_type[1]{uint8_t(sv2_net_msg.m_msg_type)};
344+
LogPrintLevel(BCLog::SV2, BCLog::Level::Warning, "Received unknown message type 0x%s from client id=%zu\n",
345+
HexStr(msg_type), client.m_id);
346+
break;
347+
}
348+
}
349+
}

0 commit comments

Comments
 (0)