Skip to content

Commit b62f820

Browse files
Sjorsvasild
andcommitted
Add Sv2Connman
Co-Authored-By: Vasil Dimov <[email protected]>
1 parent 4a75efb commit b62f820

File tree

5 files changed

+779
-0
lines changed

5 files changed

+779
-0
lines changed

src/sv2/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
add_library(bitcoin_sv2 STATIC EXCLUDE_FROM_ALL
66
noise.cpp
77
transport.cpp
8+
connman.cpp
89
)
910

1011
target_link_libraries(bitcoin_sv2
1112
PRIVATE
1213
core_interface
1314
bitcoin_clientversion
1415
bitcoin_crypto
16+
bitcoin_common # for SockMan
1517
$<$<PLATFORM_ID:Windows>:ws2_32>
1618
)

src/sv2/connman.cpp

Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
// Copyright (c) 2023-present The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <sv2/connman.h>
6+
#include <sv2/messages.h>
7+
#include <logging.h>
8+
#include <sync.h>
9+
#include <util/thread.h>
10+
11+
using node::Sv2MsgType;
12+
13+
Sv2Connman::~Sv2Connman()
14+
{
15+
AssertLockNotHeld(m_clients_mutex);
16+
17+
{
18+
LOCK(m_clients_mutex);
19+
for (const auto& client : m_sv2_clients) {
20+
LogTrace(BCLog::SV2, "Disconnecting client id=%zu\n",
21+
client.first);
22+
client.second->m_disconnect_flag = true;
23+
}
24+
DisconnectFlagged();
25+
}
26+
27+
Interrupt();
28+
StopThreads();
29+
}
30+
31+
bool Sv2Connman::Start(Sv2EventsInterface* msgproc, std::string host, uint16_t port)
32+
{
33+
m_msgproc = msgproc;
34+
35+
if (!Bind(host, port)) return false;
36+
37+
SockMan::Options sockman_options;
38+
StartSocketsThreads(sockman_options);
39+
40+
return true;
41+
}
42+
43+
bool Sv2Connman::Bind(std::string host, uint16_t port)
44+
{
45+
const CService addr_bind = LookupNumeric(host, port);
46+
47+
bilingual_str error;
48+
if (!BindAndStartListening(addr_bind, error)) {
49+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Template Provider failed to bind to port %d: %s\n", port, error.original);
50+
return false;
51+
}
52+
53+
LogPrintLevel(BCLog::SV2, BCLog::Level::Info, "%s listening on %s:%d\n", SV2_PROTOCOL_NAMES.at(m_subprotocol), host, port);
54+
55+
return true;
56+
}
57+
58+
59+
void Sv2Connman::DisconnectFlagged()
60+
{
61+
AssertLockHeld(m_clients_mutex);
62+
63+
// Remove clients that are flagged for disconnection.
64+
auto it = m_sv2_clients.begin();
65+
while(it != m_sv2_clients.end()) {
66+
const auto& client{it->second};
67+
LOCK(client->cs_send);
68+
if (client->m_send_messages.empty() && it->second->m_disconnect_flag) {
69+
CloseConnection(it->second->m_id);
70+
it = m_sv2_clients.erase(it);
71+
} else {
72+
it++;
73+
}
74+
}
75+
}
76+
77+
void Sv2Connman::EventIOLoopCompletedForAll()
78+
{
79+
LOCK(m_clients_mutex);
80+
DisconnectFlagged();
81+
}
82+
83+
void Sv2Connman::Interrupt()
84+
{
85+
interruptNet();
86+
}
87+
88+
void Sv2Connman::StopThreads()
89+
{
90+
JoinSocketsThreads();
91+
}
92+
93+
std::shared_ptr<Sv2Client> Sv2Connman::GetClientById(NodeId node_id) const
94+
{
95+
auto it{m_sv2_clients.find(node_id)};
96+
if (it != m_sv2_clients.end()) {
97+
return it->second;
98+
}
99+
return nullptr;
100+
}
101+
102+
bool Sv2Connman::EventNewConnectionAccepted(NodeId node_id,
103+
const CService& addr_bind_,
104+
const CService& addr_)
105+
{
106+
Assume(m_certificate);
107+
LOCK(m_clients_mutex);
108+
std::unique_ptr transport = std::make_unique<Sv2Transport>(m_static_key, m_certificate.value());
109+
auto client = std::make_shared<Sv2Client>(node_id, std::move(transport));
110+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "New client id=%zu connected\n", node_id);
111+
m_sv2_clients.emplace(node_id, std::move(client));
112+
return true;
113+
}
114+
115+
void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv)
116+
{
117+
AssertLockNotHeld(m_clients_mutex);
118+
119+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
120+
if (client == nullptr) {
121+
cancel_recv = true;
122+
return;
123+
}
124+
125+
LOCK(client->cs_send);
126+
auto it = client->m_send_messages.begin();
127+
std::optional<bool> expected_more;
128+
129+
size_t total_sent = 0;
130+
131+
while(true) {
132+
if (it != client->m_send_messages.end()) {
133+
// If possible, move one message from the send queue to the transport.
134+
// This fails when there is an existing message still being sent,
135+
// or when the handshake has not yet completed.
136+
//
137+
// Wrap Sv2NetMsg inside CSerializedNetMsg for transport
138+
CSerializedNetMsg net_msg{*it};
139+
if (client->m_transport->SetMessageToSend(net_msg)) {
140+
++it;
141+
}
142+
}
143+
144+
const auto& [data, more, _m_message_type] = client->m_transport->GetBytesToSend(/*have_next_message=*/it != client->m_send_messages.end());
145+
146+
147+
// We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
148+
// bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
149+
// verify that the previously returned 'more' was correct.
150+
if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
151+
expected_more = more;
152+
153+
ssize_t sent = 0;
154+
std::string errmsg;
155+
156+
if (!data.empty()) {
157+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Send %d bytes to client id=%zu\n",
158+
data.size() - total_sent, node_id);
159+
160+
sent = SendBytes(node_id, data, more, errmsg);
161+
}
162+
163+
if (sent > 0) {
164+
client->m_transport->MarkBytesSent(sent);
165+
if (static_cast<size_t>(sent) != data.size()) {
166+
// could not send full message; stop sending more
167+
break;
168+
}
169+
} else {
170+
if (sent < 0) {
171+
LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node_id, errmsg);
172+
CloseConnection(node_id);
173+
}
174+
break;
175+
}
176+
}
177+
178+
// Clear messages that have been handed to transport from the queue
179+
client->m_send_messages.erase(client->m_send_messages.begin(), it);
180+
181+
// If both receiving and (non-optimistic) sending were possible, we first attempt
182+
// sending. If that succeeds, but does not fully drain the send queue, do not
183+
// attempt to receive. This avoids needlessly queueing data if the remote peer
184+
// is slow at receiving data, by means of TCP flow control. We only do this when
185+
// sending actually succeeded to make sure progress is always made; otherwise a
186+
// deadlock would be possible when both sides have data to send, but neither is
187+
// receiving.
188+
//
189+
// TODO: decide if this is useful for Sv2
190+
cancel_recv = total_sent > 0; // && more;
191+
}
192+
193+
void Sv2Connman::EventGotData(Id id, std::span<const uint8_t> data)
194+
{
195+
AssertLockNotHeld(m_clients_mutex);
196+
197+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(id);)};
198+
if (client == nullptr) {
199+
return;
200+
}
201+
202+
try {
203+
while (data.size() > 0) {
204+
// absorb network data
205+
if (!client->m_transport->ReceivedBytes(data)) {
206+
// Serious transport problem
207+
LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Transport problem, disconnecting client id=%zu\n",
208+
client->m_id);
209+
// TODO: should we even bother with this?
210+
client->m_disconnect_flag = true;
211+
break;
212+
}
213+
214+
if (client->m_transport->ReceivedMessageComplete()) {
215+
bool dummy_reject_message = false;
216+
Sv2NetMsg msg = client->m_transport->GetReceivedMessage(std::chrono::milliseconds(0), dummy_reject_message);
217+
ProcessSv2Message(msg, *client.get());
218+
}
219+
}
220+
} catch (const std::exception& e) {
221+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received error when processing client id=%zu message: %s\n", client->m_id, e.what());
222+
client->m_disconnect_flag = true;
223+
}
224+
225+
}
226+
227+
void Sv2Connman::EventGotEOF(NodeId node_id)
228+
{
229+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
230+
if (client == nullptr) return;
231+
client->m_disconnect_flag = true;
232+
}
233+
234+
void Sv2Connman::EventGotPermanentReadError(NodeId node_id, const std::string& errmsg)
235+
{
236+
auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)};
237+
if (client == nullptr) return;
238+
client->m_disconnect_flag = true;
239+
}
240+
241+
void Sv2Connman::ProcessSv2Message(const Sv2NetMsg& sv2_net_msg, Sv2Client& client)
242+
{
243+
uint8_t msg_type[1] = {uint8_t(sv2_net_msg.m_msg_type)};
244+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Received 0x%s %s from client id=%zu\n",
245+
// After clang-17:
246+
// std::format("{:x}", uint8_t(sv2_net_msg.m_msg_type)),
247+
HexStr(msg_type),
248+
node::SV2_MSG_NAMES.at(sv2_net_msg.m_msg_type), client.m_id);
249+
250+
DataStream ss (sv2_net_msg.m_msg);
251+
252+
if (client.m_disconnect_flag) {
253+
// Don't bother processing new messages if we are about to disconnect when the
254+
// send queue empties. This also prevents us from appending to the send queue
255+
// when m_disconnect_flag is set.
256+
return;
257+
}
258+
259+
switch (sv2_net_msg.m_msg_type)
260+
{
261+
case Sv2MsgType::SETUP_CONNECTION:
262+
{
263+
if (client.m_setup_connection_confirmed) {
264+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Client client id=%zu connection has already been confirmed\n",
265+
client.m_id);
266+
return;
267+
}
268+
269+
node::Sv2SetupConnectionMsg setup_conn;
270+
try {
271+
ss >> setup_conn;
272+
} catch (const std::exception& e) {
273+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid SetupConnection message from client id=%zu: %s\n",
274+
client.m_id, e.what());
275+
client.m_disconnect_flag = true;
276+
return;
277+
}
278+
279+
LOCK(client.cs_send);
280+
281+
// Disconnect a client that connects on the wrong subprotocol.
282+
if (setup_conn.m_protocol != m_subprotocol) {
283+
node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"unsupported-protocol"}};
284+
285+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnectionError to client id=%zu\n",
286+
client.m_id);
287+
client.m_send_messages.emplace_back(setup_conn_err);
288+
289+
client.m_disconnect_flag = true;
290+
return;
291+
}
292+
293+
// Disconnect a client if they are not running a compatible protocol version.
294+
if ((m_protocol_version < setup_conn.m_min_version) || (m_protocol_version > setup_conn.m_max_version)) {
295+
node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"protocol-version-mismatch"}};
296+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnection.Error to client id=%zu\n",
297+
client.m_id);
298+
client.m_send_messages.emplace_back(setup_conn_err);
299+
300+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received a connection from client id=%zu with incompatible protocol_versions: min_version: %d, max_version: %d\n",
301+
client.m_id, setup_conn.m_min_version, setup_conn.m_max_version);
302+
client.m_disconnect_flag = true;
303+
return;
304+
}
305+
306+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x01 SetupConnection.Success to client id=%zu\n",
307+
client.m_id);
308+
node::Sv2SetupConnectionSuccessMsg setup_success{m_protocol_version, m_optional_features};
309+
client.m_send_messages.emplace_back(setup_success);
310+
311+
client.m_setup_connection_confirmed = true;
312+
313+
break;
314+
}
315+
case Sv2MsgType::COINBASE_OUTPUT_CONSTRAINTS:
316+
{
317+
if (!client.m_setup_connection_confirmed) {
318+
client.m_disconnect_flag = true;
319+
return;
320+
}
321+
322+
node::Sv2CoinbaseOutputConstraintsMsg coinbase_output_constraints;
323+
try {
324+
ss >> coinbase_output_constraints;
325+
client.m_coinbase_output_constraints_recv = true;
326+
} catch (const std::exception& e) {
327+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid CoinbaseOutputConstraints message from client id=%zu: %s\n",
328+
client.m_id, e.what());
329+
client.m_disconnect_flag = true;
330+
return;
331+
}
332+
333+
uint32_t max_additional_size = coinbase_output_constraints.m_coinbase_output_max_additional_size;
334+
LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "coinbase_output_max_additional_size=%d bytes\n", max_additional_size);
335+
336+
if (max_additional_size > MAX_BLOCK_WEIGHT) {
337+
LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received impossible CoinbaseOutputConstraints from client id=%zu: %d\n",
338+
client.m_id, max_additional_size);
339+
client.m_disconnect_flag = true;
340+
return;
341+
}
342+
343+
client.m_coinbase_tx_outputs_size = coinbase_output_constraints.m_coinbase_output_max_additional_size;
344+
345+
break;
346+
}
347+
default: {
348+
uint8_t msg_type[1]{uint8_t(sv2_net_msg.m_msg_type)};
349+
LogPrintLevel(BCLog::SV2, BCLog::Level::Warning, "Received unknown message type 0x%s from client id=%zu\n",
350+
HexStr(msg_type), client.m_id);
351+
break;
352+
}
353+
}
354+
}

0 commit comments

Comments
 (0)