|
| 1 | +// Copyright (c) 2023-present The Bitcoin Core developers |
| 2 | +// Distributed under the MIT software license, see the accompanying |
| 3 | +// file COPYING or http://www.opensource.org/licenses/mit-license.php. |
| 4 | + |
| 5 | +#include <sv2/connman.h> |
| 6 | +#include <sv2/messages.h> |
| 7 | +#include <logging.h> |
| 8 | +#include <sync.h> |
| 9 | +#include <util/thread.h> |
| 10 | + |
| 11 | +using node::Sv2MsgType; |
| 12 | + |
| 13 | +Sv2Connman::~Sv2Connman() |
| 14 | +{ |
| 15 | + AssertLockNotHeld(m_clients_mutex); |
| 16 | + |
| 17 | + { |
| 18 | + LOCK(m_clients_mutex); |
| 19 | + for (const auto& client : m_sv2_clients) { |
| 20 | + LogTrace(BCLog::SV2, "Disconnecting client id=%zu\n", |
| 21 | + client.first); |
| 22 | + client.second->m_disconnect_flag = true; |
| 23 | + } |
| 24 | + DisconnectFlagged(); |
| 25 | + } |
| 26 | + |
| 27 | + Interrupt(); |
| 28 | + StopThreads(); |
| 29 | +} |
| 30 | + |
| 31 | +bool Sv2Connman::Start(Sv2EventsInterface* msgproc, std::string host, uint16_t port) |
| 32 | +{ |
| 33 | + m_msgproc = msgproc; |
| 34 | + |
| 35 | + if (!Bind(host, port)) return false; |
| 36 | + |
| 37 | + SockMan::Options sockman_options; |
| 38 | + StartSocketsThreads(sockman_options); |
| 39 | + |
| 40 | + return true; |
| 41 | +} |
| 42 | + |
| 43 | +bool Sv2Connman::Bind(std::string host, uint16_t port) |
| 44 | +{ |
| 45 | + const CService addr_bind = LookupNumeric(host, port); |
| 46 | + |
| 47 | + bilingual_str error; |
| 48 | + if (!BindAndStartListening(addr_bind, error)) { |
| 49 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Template Provider failed to bind to port %d: %s\n", port, error.original); |
| 50 | + return false; |
| 51 | + } |
| 52 | + |
| 53 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Info, "%s listening on %s:%d\n", SV2_PROTOCOL_NAMES.at(m_subprotocol), host, port); |
| 54 | + |
| 55 | + return true; |
| 56 | +} |
| 57 | + |
| 58 | + |
| 59 | +void Sv2Connman::DisconnectFlagged() |
| 60 | +{ |
| 61 | + AssertLockHeld(m_clients_mutex); |
| 62 | + |
| 63 | + // Remove clients that are flagged for disconnection. |
| 64 | + auto it = m_sv2_clients.begin(); |
| 65 | + while(it != m_sv2_clients.end()) { |
| 66 | + std::shared_ptr<Sv2Client> client{it->second}; |
| 67 | + LOCK(client->cs_send); |
| 68 | + if (client->m_send_messages.empty() && it->second->m_disconnect_flag) { |
| 69 | + CloseConnection(it->second->m_id); |
| 70 | + it = m_sv2_clients.erase(it); |
| 71 | + } else { |
| 72 | + it++; |
| 73 | + } |
| 74 | + } |
| 75 | +} |
| 76 | + |
| 77 | +void Sv2Connman::EventIOLoopCompletedForAll() |
| 78 | +{ |
| 79 | + LOCK(m_clients_mutex); |
| 80 | + DisconnectFlagged(); |
| 81 | +} |
| 82 | + |
| 83 | +void Sv2Connman::Interrupt() |
| 84 | +{ |
| 85 | + interruptNet(); |
| 86 | +} |
| 87 | + |
| 88 | +void Sv2Connman::StopThreads() |
| 89 | +{ |
| 90 | + JoinSocketsThreads(); |
| 91 | +} |
| 92 | + |
| 93 | +std::shared_ptr<Sv2Client> Sv2Connman::GetClientById(NodeId node_id) const |
| 94 | +{ |
| 95 | + auto it{m_sv2_clients.find(node_id)}; |
| 96 | + if (it != m_sv2_clients.end()) { |
| 97 | + return it->second; |
| 98 | + } |
| 99 | + return nullptr; |
| 100 | +} |
| 101 | + |
| 102 | +bool Sv2Connman::EventNewConnectionAccepted(NodeId node_id, |
| 103 | + const CService& addr_bind_, |
| 104 | + const CService& addr_) |
| 105 | +{ |
| 106 | + Assume(m_certificate); |
| 107 | + LOCK(m_clients_mutex); |
| 108 | + std::unique_ptr transport = std::make_unique<Sv2Transport>(m_static_key, m_certificate.value()); |
| 109 | + auto client = std::make_shared<Sv2Client>(node_id, std::move(transport)); |
| 110 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "New client id=%zu connected\n", node_id); |
| 111 | + m_sv2_clients.emplace(node_id, std::move(client)); |
| 112 | + return true; |
| 113 | +} |
| 114 | + |
| 115 | +void Sv2Connman::EventReadyToSend(NodeId node_id, bool& cancel_recv) |
| 116 | +{ |
| 117 | + AssertLockNotHeld(m_clients_mutex); |
| 118 | + |
| 119 | + auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)}; |
| 120 | + if (client == nullptr) { |
| 121 | + cancel_recv = true; |
| 122 | + return; |
| 123 | + } |
| 124 | + |
| 125 | + LOCK(client->cs_send); |
| 126 | + auto it = client->m_send_messages.begin(); |
| 127 | + std::optional<bool> expected_more; |
| 128 | + |
| 129 | + size_t total_sent = 0; |
| 130 | + |
| 131 | + while(true) { |
| 132 | + if (it != client->m_send_messages.end()) { |
| 133 | + // If possible, move one message from the send queue to the transport. |
| 134 | + // This fails when there is an existing message still being sent, |
| 135 | + // or when the handshake has not yet completed. |
| 136 | + // |
| 137 | + // Wrap Sv2NetMsg inside CSerializedNetMsg for transport |
| 138 | + CSerializedNetMsg net_msg{*it}; |
| 139 | + if (client->m_transport->SetMessageToSend(net_msg)) { |
| 140 | + ++it; |
| 141 | + } |
| 142 | + } |
| 143 | + |
| 144 | + const auto& [data, more, _m_message_type] = client->m_transport->GetBytesToSend(/*have_next_message=*/it != client->m_send_messages.end()); |
| 145 | + |
| 146 | + |
| 147 | + // We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more |
| 148 | + // bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check, |
| 149 | + // verify that the previously returned 'more' was correct. |
| 150 | + if (expected_more.has_value()) Assume(!data.empty() == *expected_more); |
| 151 | + expected_more = more; |
| 152 | + |
| 153 | + ssize_t sent = 0; |
| 154 | + std::string errmsg; |
| 155 | + |
| 156 | + if (!data.empty()) { |
| 157 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Send %d bytes to client id=%zu\n", |
| 158 | + data.size() - total_sent, node_id); |
| 159 | + |
| 160 | + sent = SendBytes(node_id, data, more, errmsg); |
| 161 | + } |
| 162 | + |
| 163 | + if (sent > 0) { |
| 164 | + client->m_transport->MarkBytesSent(sent); |
| 165 | + if (static_cast<size_t>(sent) != data.size()) { |
| 166 | + // could not send full message; stop sending more |
| 167 | + break; |
| 168 | + } |
| 169 | + } else { |
| 170 | + if (sent < 0) { |
| 171 | + LogDebug(BCLog::NET, "socket send error for peer=%d: %s\n", node_id, errmsg); |
| 172 | + CloseConnection(node_id); |
| 173 | + } |
| 174 | + break; |
| 175 | + } |
| 176 | + } |
| 177 | + |
| 178 | + // Clear messages that have been handed to transport from the queue |
| 179 | + client->m_send_messages.erase(client->m_send_messages.begin(), it); |
| 180 | + |
| 181 | + // If both receiving and (non-optimistic) sending were possible, we first attempt |
| 182 | + // sending. If that succeeds, but does not fully drain the send queue, do not |
| 183 | + // attempt to receive. This avoids needlessly queueing data if the remote peer |
| 184 | + // is slow at receiving data, by means of TCP flow control. We only do this when |
| 185 | + // sending actually succeeded to make sure progress is always made; otherwise a |
| 186 | + // deadlock would be possible when both sides have data to send, but neither is |
| 187 | + // receiving. |
| 188 | + // |
| 189 | + // TODO: decide if this is useful for Sv2 |
| 190 | + cancel_recv = total_sent > 0; // && more; |
| 191 | +} |
| 192 | + |
| 193 | +void Sv2Connman::EventGotData(Id id, std::span<const uint8_t> data) |
| 194 | +{ |
| 195 | + AssertLockNotHeld(m_clients_mutex); |
| 196 | + |
| 197 | + auto client{WITH_LOCK(m_clients_mutex, return GetClientById(id);)}; |
| 198 | + if (client == nullptr) { |
| 199 | + return; |
| 200 | + } |
| 201 | + |
| 202 | + try { |
| 203 | + while (data.size() > 0) { |
| 204 | + // absorb network data |
| 205 | + if (!client->m_transport->ReceivedBytes(data)) { |
| 206 | + // Serious transport problem |
| 207 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Trace, "Transport problem, disconnecting client id=%zu\n", |
| 208 | + client->m_id); |
| 209 | + // TODO: should we even bother with this? |
| 210 | + client->m_disconnect_flag = true; |
| 211 | + break; |
| 212 | + } |
| 213 | + |
| 214 | + if (client->m_transport->ReceivedMessageComplete()) { |
| 215 | + bool dummy_reject_message = false; |
| 216 | + Sv2NetMsg msg = client->m_transport->GetReceivedMessage(std::chrono::milliseconds(0), dummy_reject_message); |
| 217 | + ProcessSv2Message(msg, *client.get()); |
| 218 | + } |
| 219 | + } |
| 220 | + } catch (const std::exception& e) { |
| 221 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received error when processing client id=%zu message: %s\n", client->m_id, e.what()); |
| 222 | + client->m_disconnect_flag = true; |
| 223 | + } |
| 224 | + |
| 225 | +} |
| 226 | + |
| 227 | +void Sv2Connman::EventGotEOF(NodeId node_id) |
| 228 | +{ |
| 229 | + auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)}; |
| 230 | + if (client == nullptr) return; |
| 231 | + client->m_disconnect_flag = true; |
| 232 | +} |
| 233 | + |
| 234 | +void Sv2Connman::EventGotPermanentReadError(NodeId node_id, const std::string& errmsg) |
| 235 | +{ |
| 236 | + auto client{WITH_LOCK(m_clients_mutex, return GetClientById(node_id);)}; |
| 237 | + if (client == nullptr) return; |
| 238 | + client->m_disconnect_flag = true; |
| 239 | +} |
| 240 | + |
| 241 | +void Sv2Connman::ProcessSv2Message(const Sv2NetMsg& sv2_net_msg, Sv2Client& client) |
| 242 | +{ |
| 243 | + uint8_t msg_type[1] = {uint8_t(sv2_net_msg.m_msg_type)}; |
| 244 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Received 0x%s %s from client id=%zu\n", |
| 245 | + // After clang-17: |
| 246 | + // std::format("{:x}", uint8_t(sv2_net_msg.m_msg_type)), |
| 247 | + HexStr(msg_type), |
| 248 | + node::SV2_MSG_NAMES.at(sv2_net_msg.m_msg_type), client.m_id); |
| 249 | + |
| 250 | + DataStream ss (sv2_net_msg.m_msg); |
| 251 | + |
| 252 | + if (client.m_disconnect_flag) { |
| 253 | + // Don't bother processing new messages if we are about to disconnect when the |
| 254 | + // send queue empties. This also prevents us from appending to the send queue |
| 255 | + // when m_disconnect_flag is set. |
| 256 | + return; |
| 257 | + } |
| 258 | + |
| 259 | + switch (sv2_net_msg.m_msg_type) |
| 260 | + { |
| 261 | + case Sv2MsgType::SETUP_CONNECTION: |
| 262 | + { |
| 263 | + if (client.m_setup_connection_confirmed) { |
| 264 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Client client id=%zu connection has already been confirmed\n", |
| 265 | + client.m_id); |
| 266 | + return; |
| 267 | + } |
| 268 | + |
| 269 | + node::Sv2SetupConnectionMsg setup_conn; |
| 270 | + try { |
| 271 | + ss >> setup_conn; |
| 272 | + } catch (const std::exception& e) { |
| 273 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid SetupConnection message from client id=%zu: %s\n", |
| 274 | + client.m_id, e.what()); |
| 275 | + client.m_disconnect_flag = true; |
| 276 | + return; |
| 277 | + } |
| 278 | + |
| 279 | + LOCK(client.cs_send); |
| 280 | + |
| 281 | + // Disconnect a client that connects on the wrong subprotocol. |
| 282 | + if (setup_conn.m_protocol != m_subprotocol) { |
| 283 | + node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"unsupported-protocol"}}; |
| 284 | + |
| 285 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnectionError to client id=%zu\n", |
| 286 | + client.m_id); |
| 287 | + client.m_send_messages.emplace_back(setup_conn_err); |
| 288 | + |
| 289 | + client.m_disconnect_flag = true; |
| 290 | + return; |
| 291 | + } |
| 292 | + |
| 293 | + // Disconnect a client if they are not running a compatible protocol version. |
| 294 | + if ((m_protocol_version < setup_conn.m_min_version) || (m_protocol_version > setup_conn.m_max_version)) { |
| 295 | + node::Sv2SetupConnectionErrorMsg setup_conn_err{setup_conn.m_flags, std::string{"protocol-version-mismatch"}}; |
| 296 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x02 SetupConnection.Error to client id=%zu\n", |
| 297 | + client.m_id); |
| 298 | + client.m_send_messages.emplace_back(setup_conn_err); |
| 299 | + |
| 300 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received a connection from client id=%zu with incompatible protocol_versions: min_version: %d, max_version: %d\n", |
| 301 | + client.m_id, setup_conn.m_min_version, setup_conn.m_max_version); |
| 302 | + client.m_disconnect_flag = true; |
| 303 | + return; |
| 304 | + } |
| 305 | + |
| 306 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "Send 0x01 SetupConnection.Success to client id=%zu\n", |
| 307 | + client.m_id); |
| 308 | + node::Sv2SetupConnectionSuccessMsg setup_success{m_protocol_version, m_optional_features}; |
| 309 | + client.m_send_messages.emplace_back(setup_success); |
| 310 | + |
| 311 | + client.m_setup_connection_confirmed = true; |
| 312 | + |
| 313 | + break; |
| 314 | + } |
| 315 | + case Sv2MsgType::COINBASE_OUTPUT_CONSTRAINTS: |
| 316 | + { |
| 317 | + if (!client.m_setup_connection_confirmed) { |
| 318 | + client.m_disconnect_flag = true; |
| 319 | + return; |
| 320 | + } |
| 321 | + |
| 322 | + node::Sv2CoinbaseOutputConstraintsMsg coinbase_output_constraints; |
| 323 | + try { |
| 324 | + ss >> coinbase_output_constraints; |
| 325 | + client.m_coinbase_output_constraints_recv = true; |
| 326 | + } catch (const std::exception& e) { |
| 327 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received invalid CoinbaseOutputConstraints message from client id=%zu: %s\n", |
| 328 | + client.m_id, e.what()); |
| 329 | + client.m_disconnect_flag = true; |
| 330 | + return; |
| 331 | + } |
| 332 | + |
| 333 | + uint32_t max_additional_size = coinbase_output_constraints.m_coinbase_output_max_additional_size; |
| 334 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Debug, "coinbase_output_max_additional_size=%d bytes\n", max_additional_size); |
| 335 | + |
| 336 | + if (max_additional_size > MAX_BLOCK_WEIGHT) { |
| 337 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Error, "Received impossible CoinbaseOutputConstraints from client id=%zu: %d\n", |
| 338 | + client.m_id, max_additional_size); |
| 339 | + client.m_disconnect_flag = true; |
| 340 | + return; |
| 341 | + } |
| 342 | + |
| 343 | + client.m_coinbase_tx_outputs_size = coinbase_output_constraints.m_coinbase_output_max_additional_size; |
| 344 | + |
| 345 | + break; |
| 346 | + } |
| 347 | + default: { |
| 348 | + uint8_t msg_type[1]{uint8_t(sv2_net_msg.m_msg_type)}; |
| 349 | + LogPrintLevel(BCLog::SV2, BCLog::Level::Warning, "Received unknown message type 0x%s from client id=%zu\n", |
| 350 | + HexStr(msg_type), client.m_id); |
| 351 | + break; |
| 352 | + } |
| 353 | + } |
| 354 | +} |
0 commit comments