Skip to content

Commit 1a0f0f3

Browse files
Sjorsvasild
andcommitted
Add Sv2Connman
Co-Authored-By: Vasil Dimov <[email protected]>
1 parent cc24340 commit 1a0f0f3

File tree

5 files changed

+774
-0
lines changed

5 files changed

+774
-0
lines changed

src/sv2/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
add_library(bitcoin_sv2 STATIC EXCLUDE_FROM_ALL
66
noise.cpp
77
transport.cpp
8+
connman.cpp
89
)
910

1011
target_link_libraries(bitcoin_sv2
1112
PRIVATE
1213
core_interface
1314
bitcoin_clientversion
1415
bitcoin_crypto
16+
bitcoin_common # for SockMan
1517
$<$<PLATFORM_ID:Windows>:ws2_32>
1618
)

src/sv2/connman.cpp

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
// Copyright (c) 2023-present The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <sv2/connman.h>
6+
#include <sv2/messages.h>
7+
#include <logging.h>
8+
#include <sync.h>
9+
#include <util/thread.h>
10+
11+
using node::Sv2MsgType;
12+
13+
Sv2Connman::~Sv2Connman()
14+
{
15+
AssertLockNotHeld(m_clients_mutex);
16+
17+
{
18+
LOCK(m_clients_mutex);
19+
for (const auto& client : m_sv2_clients) {
20+
LogTrace(BCLog::SV2, "Disconnecting client id=%zu\n",
21+
client.first);
22+
CloseConnection(client.second->m_id);
23+
client.second->m_disconnect_flag = true;
24+
}
25+
DisconnectFlagged();
26+
}
27+
28+
Interrupt();
29+
StopThreads();
30+
}
31+
32+
bool Sv2Connman::Start(Sv2EventsInterface* msgproc, std::string host, uint16_t port)
33+
{
34+
m_msgproc = msgproc;
35+
36+
if (!Bind(host, port)) return false;
37+
38+
SockMan::Options sockman_options;
39+
StartSocketsThreads(sockman_options);
40+
41+
return true;
42+
}
43+
44+
bool Sv2Connman::Bind(std::string host, uint16_t port)
45+
{
46+
const CService addr_bind = LookupNumeric(host, port);
47+
48+
bilingual_str error;
49+
if (!BindAndStartListening(addr_bind, error)) {
50+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Template Provider failed to bind to port %d: %s\n", port, error.original);
51+
return false;
52+
}
53+
54+
LogPrintLevel(BCLog::SV2, BCLog::Level::Info, "%s listening on %s:%d\n", SV2_PROTOCOL_NAMES.at(m_subprotocol), host, port);
55+
56+
return true;
57+
}
58+
59+
60+
void Sv2Connman::DisconnectFlagged()
61+
{
62+
AssertLockHeld(m_clients_mutex);
63+
64+
// Remove clients that are flagged for disconnection.
65+
auto it = m_sv2_clients.begin();
66+
while(it != m_sv2_clients.end()) {
67+
if (it->second->m_disconnect_flag) {
68+
it = m_sv2_clients.erase(it);
69+
} else {
70+
it++;
71+
}
72+
}
73+
}
74+
75+
void Sv2Connman::EventIOLoopCompletedForAll()
76+
{
77+
LOCK(m_clients_mutex);
78+
DisconnectFlagged();
79+
}
80+
81+
void Sv2Connman::Interrupt()
82+
{
83+
interruptNet();
84+
}
85+
86+
void Sv2Connman::StopThreads()
87+
{
88+
JoinSocketsThreads();
89+
}
90+
91+
std::shared_ptr<Sv2Client> Sv2Connman::GetClientById(NodeId node_id) const
92+
{
93+
auto it{m_sv2_clients.find(node_id)};
94+
if (it != m_sv2_clients.end()) {
95+
return it->second;
96+
}
97+
return nullptr;
98+
}
99+
100+
bool Sv2Connman::EventNewConnectionAccepted(NodeId node_id,
101+
const CService& addr_bind_,
102+
const CService& addr_)
103+
{
104+
Assume(m_certificate);
105+
LOCK(m_clients_mutex);
106+
std::unique_ptr transport = std::make_unique<Sv2Transport>(m_static_key, m_certificate.value());
107+
auto client = std::make_shared<Sv2Client>(node_id, std::move(transport));
108+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "New client id=%zu connected\n", node_id);
109+
m_sv2_clients.emplace(node_id, std::move(client));
110+
return true;
111+
}
112+
113+
void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
114+
{
115+
AssertLockNotHeld(m_clients_mutex);
116+
117+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
118+
if (client == nullptr) {
119+
cancel_recv = true;
120+
return;
121+
}
122+
123+
auto it = client->m_send_messages.begin();
124+
std::optional<bool> expected_more;
125+
126+
size_t total_sent = 0;
127+
128+
while(true) {
129+
if (it != client->m_send_messages.end()) {
130+
// If possible, move one message from the send queue to the transport.
131+
// This fails when there is an existing message still being sent,
132+
// or when the handshake has not yet completed.
133+
//
134+
// Wrap Sv2NetMsg inside CSerializedNetMsg for transport
135+
CSerializedNetMsg net_msg{*it};
136+
if (client->m_transport->SetMessageToSend(net_msg)) {
137+
++it;
138+
}
139+
}
140+
141+
const auto& [data, more, _m_message_type] = client->m_transport->GetBytesToSend(/*have_next_message=*/it != client->m_send_messages.end());
142+
143+
144+
// We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
145+
// bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
146+
// verify that the previously returned 'more' was correct.
147+
if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
148+
expected_more = more;
149+
150+
ssize_t sent = 0;
151+
std::string errmsg;
152+
153+
if (!data.empty()) {
154+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Send %d bytes to client id=%zu\n",
155+
data.size() - total_sent, node_id);
156+
157+
sent = SendBytes(node_id, data, more, errmsg);
158+
}
159+
160+
if (sent > 0) {
161+
client->m_transport->MarkBytesSent(sent);
162+
if (static_cast<size_t>(sent) != data.size()) {
163+
// could not send full message; stop sending more
164+
break;
165+
}
166+
} else {
167+
if (sent < 0) {
168+
LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node_id, errmsg);
169+
CloseConnection(node_id);
170+
}
171+
break;
172+
}
173+
}
174+
175+
// Clear messages that have been handed to transport from the queue
176+
client->m_send_messages.erase(client->m_send_messages.begin(), it);
177+
178+
// If both receiving and (non-optimistic) sending were possible, we first attempt
179+
// sending. If that succeeds, but does not fully drain the send queue, do not
180+
// attempt to receive. This avoids needlessly queueing data if the remote peer
181+
// is slow at receiving data, by means of TCP flow control. We only do this when
182+
// sending actually succeeded to make sure progress is always made; otherwise a
183+
// deadlock would be possible when both sides have data to send, but neither is
184+
// receiving.
185+
//
186+
// TODO: decide if this is useful for Sv2
187+
cancel_recv = total_sent > 0; // && more;
188+
}
189+
190+
void Sv2Connman::EventGotData(Id id, std::span<const uint8_t> data)
191+
{
192+
AssertLockNotHeld(m_clients_mutex);
193+
194+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(id);)};
195+
if (client == nullptr) {
196+
return;
197+
}
198+
199+
try {
200+
while (data.size() > 0) {
201+
// absorb network data
202+
if (!client->m_transport->ReceivedBytes(data)) {
203+
// Serious transport problem
204+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Transport problem, disconnecting client id=%zu\n",
205+
client->m_id);
206+
CloseConnection(id);
207+
// TODO: should we even bother with this?
208+
client->m_disconnect_flag = true;
209+
break;
210+
}
211+
212+
if (client->m_transport->ReceivedMessageComplete()) {
213+
bool dummy_reject_message = false;
214+
Sv2NetMsg msg = client->m_transport->GetReceivedMessage(std::chrono::milliseconds(0), dummy_reject_message);
215+
ProcessSv2Message(msg, *client.get());
216+
}
217+
}
218+
} catch (const std::exception& e) {
219+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received error when processing client id=%zu message: %s\n", client->m_id, e.what());
220+
CloseConnection(id);
221+
client->m_disconnect_flag = true;
222+
}
223+
224+
}
225+
226+
void Sv2Connman::EventGotEOF(NodeId node_id)
227+
{
228+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
229+
if (client == nullptr) return;
230+
CloseConnection(node_id);
231+
client->m_disconnect_flag = true;
232+
}
233+
234+
void Sv2Connman::EventGotPermanentReadError(NodeId node_id, const std::string& errmsg)
235+
{
236+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
237+
if (client == nullptr) return;
238+
CloseConnection(node_id);
239+
client->m_disconnect_flag = true;
240+
}
241+
242+
void Sv2Connman::ProcessSv2Message(const Sv2NetMsg& sv2_net_msg, Sv2Client& client)
243+
{
244+
uint8_t msg_type[1] = {uint8_t(sv2_net_msg.m_msg_type)};
245+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Received 0x%s %s from client id=%zu\n",
246+
// After clang-17:
247+
// std::format("{:x}", uint8_t(sv2_net_msg.m_msg_type)),
248+
HexStr(msg_type),
249+
node::SV2_MSG_NAMES.at(sv2_net_msg.m_msg_type), client.m_id);
250+
251+
DataStream ss (sv2_net_msg.m_msg);
252+
253+
switch (sv2_net_msg.m_msg_type)
254+
{
255+
case Sv2MsgType::SETUP_CONNECTION:
256+
{
257+
if (client.m_setup_connection_confirmed) {
258+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Client client id=%zu connection has already been confirmed\n",
259+
client.m_id);
260+
return;
261+
}
262+
263+
node::Sv2SetupConnectionMsg setup_conn;
264+
try {
265+
ss >> setup_conn;
266+
} catch (const std::exception& e) {
267+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid SetupConnection message from client id=%zu: %s\n",
268+
client.m_id, e.what());
269+
CloseConnection(client.m_id);
270+
client.m_disconnect_flag = true;
271+
return;
272+
}
273+
274+
// Disconnect a client that connects on the wrong subprotocol.
275+
if (setup_conn.m_protocol != m_subprotocol) {
276+
node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"unsupported-protocol"}};
277+
278+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnectionError to client id=%zu\n",
279+
client.m_id);
280+
client.m_send_messages.emplace_back(setup_conn_err);
281+
282+
CloseConnection(client.m_id);
283+
client.m_disconnect_flag = true;
284+
return;
285+
}
286+
287+
// Disconnect a client if they are not running a compatible protocol version.
288+
if ((m_protocol_version < setup_conn.m_min_version) || (m_protocol_version > setup_conn.m_max_version)) {
289+
node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"protocol-version-mismatch"}};
290+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnection.Error to client id=%zu\n",
291+
client.m_id);
292+
client.m_send_messages.emplace_back(setup_conn_err);
293+
294+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received a connection from client id=%zu with incompatible protocol_versions: min_version: %d, max_version: %d\n",
295+
client.m_id, setup_conn.m_min_version, setup_conn.m_max_version);
296+
CloseConnection(client.m_id);
297+
client.m_disconnect_flag = true;
298+
return;
299+
}
300+
301+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x01 SetupConnection.Success to client id=%zu\n",
302+
client.m_id);
303+
node::Sv2SetupConnectionSuccessMsg setup_success{m_protocol_version, m_optional_features};
304+
client.m_send_messages.emplace_back(setup_success);
305+
306+
client.m_setup_connection_confirmed = true;
307+
308+
break;
309+
}
310+
case Sv2MsgType::COINBASE_OUTPUT_CONSTRAINTS:
311+
{
312+
if (!client.m_setup_connection_confirmed) {
313+
CloseConnection(client.m_id);
314+
client.m_disconnect_flag = true;
315+
return;
316+
}
317+
318+
node::Sv2CoinbaseOutputConstraintsMsg coinbase_output_constraints;
319+
try {
320+
ss >> coinbase_output_constraints;
321+
client.m_coinbase_output_constraints_recv = true;
322+
} catch (const std::exception& e) {
323+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid CoinbaseOutputConstraints message from client id=%zu: %s\n",
324+
client.m_id, e.what());
325+
CloseConnection(client.m_id);
326+
client.m_disconnect_flag = true;
327+
return;
328+
}
329+
330+
uint32_t max_additional_size = coinbase_output_constraints.m_coinbase_output_max_additional_size;
331+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "coinbase_output_max_additional_size=%d bytes\n", max_additional_size);
332+
333+
if (max_additional_size > MAX_BLOCK_WEIGHT) {
334+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received impossible CoinbaseOutputConstraints from client id=%zu: %d\n",
335+
client.m_id, max_additional_size);
336+
CloseConnection(client.m_id);
337+
client.m_disconnect_flag = true;
338+
return;
339+
}
340+
341+
client.m_coinbase_tx_outputs_size = coinbase_output_constraints.m_coinbase_output_max_additional_size;
342+
343+
break;
344+
}
345+
default: {
346+
uint8_t msg_type[1]{uint8_t(sv2_net_msg.m_msg_type)};
347+
LogPrintLevel(BCLog::SV2, BCLog::Level::Warning, "Received unknown message type 0x%s from client id=%zu\n",
348+
HexStr(msg_type), client.m_id);
349+
break;
350+
}
351+
}
352+
}

0 commit comments

Comments
 (0)