From bb49d26032c57714c62a4b31ff1fdd969751683f Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Wed, 2 Apr 2025 06:07:41 +0200 Subject: [PATCH] net: implement opening PRIVATE_BROADCAST connections Implement opening `ConnectionType::PRIVATE_BROADCAST` connections with the following properties: * Only to Tor or I2P (or IPv4/IPv6 through the Tor proxy, if provided) * Open such connections only when requested and don't maintain N opened connections of this type. * Since this is substantially different than what `OpenNetworkConnection()` does, open the private broadcast connections from a different thread instead of modifying `OpenNetworkConnection()` to also open those types of connections. Co-authored-by: Andrew Toth --- src/init.cpp | 7 ++- src/net.cpp | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/net.h | 68 ++++++++++++++++++++++ 3 files changed, 232 insertions(+), 3 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 9e223704b03..6bee35b107d 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -542,7 +542,7 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) argsman.AddArg("-forcednsseed", strprintf("Always query for peer addresses via DNS lookup (default: %u)", DEFAULT_FORCEDNSSEED), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listen", strprintf("Accept connections from outside (default: %u if no -proxy, -connect or -maxconnections=0)", DEFAULT_LISTEN), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); - argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); + argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u. It does not apply to short-lived private broadcast connections either, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS, MAX_PRIVATE_BROADCAST_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxreceivebuffer=", strprintf("Maximum per-connection receive buffer, *1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxsendbuffer=", strprintf("Maximum per-connection memory usage for the send buffer, *1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxuploadtarget=", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -1018,11 +1018,14 @@ bool AppInitParameterInteraction(const ArgsManager& args) if (user_max_connection < 0) { return InitError(Untranslated("-maxconnections must be greater or equal than zero")); } + const size_t max_private{args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST) + ? MAX_PRIVATE_BROADCAST_CONNECTIONS + : 0}; // Reserve enough FDs to account for the bare minimum, plus any manual connections, plus the bound interfaces int min_required_fds = MIN_CORE_FDS + MAX_ADDNODE_CONNECTIONS + nBind; // Try raising the FD limit to what we need (available_fds may be smaller than the requested amount if this fails) - available_fds = RaiseFileDescriptorLimit(user_max_connection + min_required_fds); + available_fds = RaiseFileDescriptorLimit(user_max_connection + max_private + min_required_fds); // If we are using select instead of poll, our actual limit may be even smaller #ifndef USE_POLL available_fds = std::min(FD_SETSIZE, available_fds); diff --git a/src/net.cpp b/src/net.cpp index 8ed61b80ea6..c0c054f2e83 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -458,7 +458,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, i2p::Connection conn; bool connected{false}; - if (m_i2p_sam_session) { + // If an I2P SAM session already exists, normally we would re-use it. But in the case of + // private broadcast we force a new transient session. A Connect() using m_i2p_sam_session + // would use our permanent I2P address as a source address. + if (m_i2p_sam_session && conn_type != ConnectionType::PRIVATE_BROADCAST) { connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed); } else { { @@ -3052,6 +3055,74 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, return true; } +std::optional CConnman::PrivateBroadcast::PickNetwork(std::optional& proxy) const +{ + prevector<4, Network> nets; + std::optional clearnet_proxy; + proxy.reset(); + if (g_reachable_nets.Contains(NET_ONION)) { + nets.push_back(NET_ONION); + + clearnet_proxy = ProxyForIPv4or6(); + if (clearnet_proxy.has_value()) { + if (g_reachable_nets.Contains(NET_IPV4)) { + nets.push_back(NET_IPV4); + } + if (g_reachable_nets.Contains(NET_IPV6)) { + nets.push_back(NET_IPV6); + } + } + } + if (g_reachable_nets.Contains(NET_I2P)) { + nets.push_back(NET_I2P); + } + + if (nets.empty()) { + return std::nullopt; + } + + const Network net{nets[FastRandomContext{}.randrange(nets.size())]}; + if (net == NET_IPV4 || net == NET_IPV6) { + proxy = clearnet_proxy; + } + return net; +} + +size_t CConnman::PrivateBroadcast::NumToOpen() const +{ + return m_num_to_open; +} + +void CConnman::PrivateBroadcast::NumToOpenAdd(size_t n) +{ + m_num_to_open += n; + m_num_to_open.notify_all(); +} + +size_t CConnman::PrivateBroadcast::NumToOpenSub(size_t n) +{ + size_t current_value{m_num_to_open.load()}; + size_t new_value; + do { + new_value = current_value > n ? current_value - n : 0; + } while (!m_num_to_open.compare_exchange_strong(current_value, new_value)); + return new_value; +} + +void CConnman::PrivateBroadcast::NumToOpenWait() const +{ + m_num_to_open.wait(0); +} + +std::optional CConnman::PrivateBroadcast::ProxyForIPv4or6() const +{ + Proxy tor_proxy; + if (m_outbound_tor_ok_at_least_once.load() && GetProxy(NET_ONION, tor_proxy)) { + return tor_proxy; + } + return std::nullopt; +} + Mutex NetEventsInterface::g_msgproc_mutex; void CConnman::ThreadMessageHandler() @@ -3136,6 +3207,74 @@ void CConnman::ThreadI2PAcceptIncoming() } } +void CConnman::ThreadPrivateBroadcast() +{ + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + + size_t addrman_num_bad_addresses{0}; + while (!m_interrupt_net->interrupted()) { + + if (!fNetworkActive) { + m_interrupt_net->sleep_for(5s); + continue; + } + + CountingSemaphoreGrant<> conn_max_grant{m_private_broadcast.m_sem_conn_max}; // Would block if too many are opened. + + m_private_broadcast.NumToOpenWait(); + + if (m_interrupt_net->interrupted()) { + break; + } + + std::optional proxy; + const std::optional net{m_private_broadcast.PickNetwork(proxy)}; + if (!net.has_value()) { + LogWarning("[privatebroadcast] Connections needed but none of the Tor or I2P networks is reachable"); + m_interrupt_net->sleep_for(5s); + continue; + } + + const auto [addr, _] = addrman.Select(/*new_only=*/false, {net.value()}); + + if (!addr.IsValid() || IsLocal(addr)) { + ++addrman_num_bad_addresses; + if (addrman_num_bad_addresses > 100) { + LogDebug(BCLog::PRIVBROADCAST, "Connections needed but addrman keeps returning bad addresses, will retry"); + m_interrupt_net->sleep_for(500ms); + } + continue; + } + addrman_num_bad_addresses = 0; + + auto target_str{addr.ToStringAddrPort()}; + if (proxy.has_value()) { + target_str += " through the proxy at " + proxy->ToString(); + } + + const bool use_v2transport(addr.nServices & GetLocalServices() & NODE_P2P_V2); + + if (OpenNetworkConnection(addr, + /*fCountFailure=*/true, + std::move(conn_max_grant), + /*pszDest=*/nullptr, + ConnectionType::PRIVATE_BROADCAST, + use_v2transport, + proxy)) { + const size_t remaining{m_private_broadcast.NumToOpenSub(1)}; + LogDebug(BCLog::PRIVBROADCAST, "Socket connected to %s; remaining connections to open: %d", target_str, remaining); + } else { + const size_t remaining{m_private_broadcast.NumToOpen()}; + if (remaining == 0) { + LogDebug(BCLog::PRIVBROADCAST, "Failed to connect to %s, will not retry, no more connections needed", target_str); + } else { + LogDebug(BCLog::PRIVBROADCAST, "Failed to connect to %s, will retry to a different address; remaining connections to open: %d", target_str, remaining); + m_interrupt_net->sleep_for(100ms); // Prevent busy loop if OpenNetworkConnection() fails fast repeatedly. + } + } + } +} + bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions) { int nOne = 1; @@ -3416,6 +3555,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); }); } + if (gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + threadPrivateBroadcast = + std::thread(&util::TraceThread, "privbcast", [this] { ThreadPrivateBroadcast(); }); + } + // Dump network addresses scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL); @@ -3465,10 +3609,16 @@ void CConnman::Interrupt() semAddnode->release(); } } + + m_private_broadcast.m_sem_conn_max.release(); + m_private_broadcast.NumToOpenAdd(1); // Just unblock NumToOpenWait() to be able to continue with shutdown. } void CConnman::StopThreads() { + if (threadPrivateBroadcast.joinable()) { + threadPrivateBroadcast.join(); + } if (threadI2PAcceptIncoming.joinable()) { threadI2PAcceptIncoming.join(); } @@ -3901,6 +4051,14 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { AssertLockNotHeld(m_total_bytes_sent_mutex); + + if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() && + pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) { + // If we are sending the peer VERACK that means we successfully sent + // and received another message to/from that peer (VERSION). + m_private_broadcast.m_outbound_tor_ok_at_least_once.store(true); + } + size_t nMessageSize = msg.data.size(); LogDebug(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId()); if (m_capture_messages) { diff --git a/src/net.h b/src/net.h index dcbab754518..e7047c5bff2 100644 --- a/src/net.h +++ b/src/net.h @@ -73,6 +73,8 @@ static const int MAX_ADDNODE_CONNECTIONS = 8; static const int MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2; /** Maximum number of feeler connections */ static const int MAX_FEELER_CONNECTIONS = 1; +/** Maximum number of private broadcast connections */ +static constexpr size_t MAX_PRIVATE_BROADCAST_CONNECTIONS{64}; /** -listen default */ static const bool DEFAULT_LISTEN = true; /** The maximum number of peer connections to maintain. */ @@ -1187,6 +1189,70 @@ public: const std::optional& proxy_override = std::nullopt) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + /// Group of private broadcast related members. + class PrivateBroadcast + { + public: + /** + * Remember if we ever established at least one outbound connection to a + * Tor peer, including sending and receiving P2P messages. If this is + * true then the Tor proxy indeed works and is a proxy to the Tor network, + * not a misconfigured ordinary SOCKS5 proxy as -proxy or -onion. If that + * is the case, then we assume that connecting to an IPv4 or IPv6 address + * via that proxy will be done through the Tor network and a Tor exit node. + */ + std::atomic_bool m_outbound_tor_ok_at_least_once{false}; + + /** + * Semaphore used to guard against opening too many connections. + * Opening private broadcast connections will be paused if this is equal to 0. + */ + std::counting_semaphore<> m_sem_conn_max{MAX_PRIVATE_BROADCAST_CONNECTIONS}; + + /** + * Choose a network to open a connection to. + * @param[out] proxy Optional proxy to override the normal proxy selection. + * Will be set if !std::nullopt is returned. Could be set to `std::nullopt` + * if there is no need to override the proxy that would be used for connecting + * to the returned network. + * @retval std::nullopt No network could be selected. + * @retval !std::nullopt The network was selected and `proxy` is set (maybe to `std::nullopt`). + */ + std::optional PickNetwork(std::optional& proxy) const; + + /// Get the pending number of connections to open. + size_t NumToOpen() const; + + /** + * Increment the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Increment by this number. + */ + void NumToOpenAdd(size_t n); + + /** + * Decrement the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Decrement by this number. + * @return The number of connections that remain to be opened after the operation. + */ + size_t NumToOpenSub(size_t n); + + /// Wait for the number of needed connections to become greater than 0. + void NumToOpenWait() const; + + private: + /** + * Check if private broadcast can be done to IPv4 or IPv6 peers and if so via which proxy. + * If private broadcast connections should not be opened to IPv4 or IPv6, then this will + * return an empty optional. + */ + std::optional ProxyForIPv4or6() const; + + /// Number of `ConnectionType::PRIVATE_BROADCAST` connections to open. + std::atomic_size_t m_num_to_open{0}; + } m_private_broadcast; + bool CheckIncomingNonce(uint64_t nonce); void ASMapHealthCheck(); @@ -1361,6 +1427,7 @@ private: void ThreadOpenConnections(std::vector connect, std::span seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(); + void ThreadPrivateBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AcceptConnection(const ListenSocket& hListenSocket); /** @@ -1651,6 +1718,7 @@ private: std::thread threadOpenConnections; std::thread threadMessageHandler; std::thread threadI2PAcceptIncoming; + std::thread threadPrivateBroadcast; /** flag for deciding to connect to an extra outbound peer, * in excess of m_max_outbound_full_relay