From be1d7418c15e275136161bb965d406691c961892 Mon Sep 17 00:00:00 2001 From: Vasil Dimov Date: Tue, 27 Aug 2024 16:23:31 +0200 Subject: [PATCH] net: move I2P-accept-incoming code from CConnman to SockMan --- src/CMakeLists.txt | 2 +- src/common/sockman.cpp | 55 +++++++++++++++++++++++++++++++ src/common/sockman.h | 74 ++++++++++++++++++++++++++++++++++++++++++ src/net.cpp | 69 +++++++++------------------------------ src/net.h | 31 ++++-------------- 5 files changed, 151 insertions(+), 80 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ed37c69e555..9fd3e48a06b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -131,6 +131,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL core_write.cpp deploymentinfo.cpp external_signer.cpp + i2p.cpp init/common.cpp kernel/chainparams.cpp key.cpp @@ -208,7 +209,6 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL headerssync.cpp httprpc.cpp httpserver.cpp - i2p.cpp index/base.cpp index/blockfilterindex.cpp index/coinstatsindex.cpp diff --git a/src/common/sockman.cpp b/src/common/sockman.cpp index f6fc85f20d2..a534971e6c2 100644 --- a/src/common/sockman.cpp +++ b/src/common/sockman.cpp @@ -8,6 +8,7 @@ #include #include #include +#include bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) { @@ -91,6 +92,24 @@ bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg) return true; } +void SockMan::StartSocketsThreads(const Options& options) +{ + if (options.i2p.has_value()) { + m_i2p_sam_session = std::make_unique( + options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet); + + m_thread_i2p_accept = + std::thread(&util::TraceThread, options.i2p->accept_thread_name, [this] { ThreadI2PAccept(); }); + } +} + +void SockMan::JoinSocketsThreads() +{ + if (m_thread_i2p_accept.joinable()) { + m_thread_i2p_accept.join(); + } +} + std::unique_ptr SockMan::AcceptConnection(const Sock& listen_sock, CService& addr) { sockaddr_storage storage; @@ -127,3 +146,39 @@ void SockMan::StopListening() } void SockMan::EventI2PStatus(const CService&, I2PStatus) {} + +void SockMan::ThreadI2PAccept() +{ + static constexpr auto err_wait_begin = 1s; + static constexpr auto err_wait_cap = 5min; + auto err_wait = err_wait_begin; + + i2p::Connection conn; + + auto SleepOnFailure = [&]() { + interruptNet.sleep_for(err_wait); + if (err_wait < err_wait_cap) { + err_wait += 1s; + } + }; + + while (!interruptNet) { + + if (!m_i2p_sam_session->Listen(conn)) { + EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING); + SleepOnFailure(); + continue; + } + + EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING); + + if (!m_i2p_sam_session->Accept(conn)) { + SleepOnFailure(); + continue; + } + + EventNewConnectionAccepted(std::move(conn.sock), conn.me, conn.peer); + + err_wait = err_wait_begin; + } +} diff --git a/src/common/sockman.h b/src/common/sockman.h index 329b5fbbb3a..6e09f777ac8 100644 --- a/src/common/sockman.h +++ b/src/common/sockman.h @@ -5,12 +5,16 @@ #ifndef BITCOIN_COMMON_SOCKMAN_H #define BITCOIN_COMMON_SOCKMAN_H +#include #include +#include +#include #include #include #include #include +#include #include /** @@ -18,6 +22,7 @@ * To use this class, inherit from it and implement the pure virtual methods. * Handled operations: * - binding and listening on sockets + * - starting of necessary threads to process socket operations * - accepting incoming connections */ class SockMan @@ -47,6 +52,7 @@ public: /** * Bind to a new address:port, start listening and add the listen socket to `m_listen`. + * Should be called before `StartSocketsThreads()`. * @param[in] to Where to bind. * @param[out] err_msg Error string if an error occurs. * @retval true Success. @@ -54,6 +60,38 @@ public: */ bool BindAndStartListening(const CService& to, bilingual_str& err_msg); + /** + * Options to influence `StartSocketsThreads()`. + */ + struct Options { + struct I2P { + explicit I2P(const fs::path& file, const Proxy& proxy, std::string_view accept_thread_name) + : private_key_file{file}, + sam_proxy{proxy}, + accept_thread_name{accept_thread_name} + {} + + const fs::path private_key_file; + const Proxy sam_proxy; + const std::string_view accept_thread_name; + }; + + /** + * I2P options. If set then a thread will be started that will accept incoming I2P connections. + */ + std::optional i2p; + }; + + /** + * Start the necessary threads for sockets IO. + */ + void StartSocketsThreads(const Options& options); + + /** + * Join (wait for) the threads started by `StartSocketsThreads()` to exit. + */ + void JoinSocketsThreads(); + /** * Accept a connection. * @param[in] listen_sock Socket on which to accept the connection. @@ -72,6 +110,21 @@ public: */ void StopListening(); + /** + * This is signaled when network activity should cease. + * A pointer to it is saved in `m_i2p_sam_session`, so make sure that + * the lifetime of `interruptNet` is not shorter than + * the lifetime of `m_i2p_sam_session`. + */ + CThreadInterrupt interruptNet; + + /** + * I2P SAM session. + * Used to accept incoming and make outgoing I2P connections from a persistent + * address. + */ + std::unique_ptr m_i2p_sam_session; + /** * List of listening sockets. */ @@ -83,6 +136,16 @@ private: // Pure virtual functions must be implemented by children classes. // + /** + * Be notified when a new connection has been accepted. + * @param[in] sock Connected socket to communicate with the peer. + * @param[in] me The address and port at our side of the connection. + * @param[in] them The address and port at the peer's side of the connection. + */ + virtual void EventNewConnectionAccepted(std::unique_ptr&& sock, + const CService& me, + const CService& them) = 0; + // // Non-pure virtual functions can be overridden by children classes or left // alone to use the default implementation from SockMan. @@ -96,10 +159,21 @@ private: */ virtual void EventI2PStatus(const CService& addr, I2PStatus new_status); + /** + * Accept incoming I2P connections in a loop and call + * `EventNewConnectionAccepted()` for each new connection. + */ + void ThreadI2PAccept(); + /** * The id to assign to the next created connection. Used to generate ids of connections. */ std::atomic m_next_id{0}; + + /** + * Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`. + */ + std::thread m_thread_i2p_accept; }; #endif // BITCOIN_COMMON_SOCKMAN_H diff --git a/src/net.cpp b/src/net.cpp index fd398e08b48..9eefcc43dde 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1732,9 +1732,9 @@ bool CConnman::AttemptToEvictConnection() return false; } -void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, - const CService& addr_bind, - const CService& addr) +void CConnman::EventNewConnectionAccepted(std::unique_ptr&& sock, + const CService& addr_bind, + const CService& addr) { int nInbound = 0; @@ -2201,7 +2201,7 @@ void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock addr_accepted = MaybeFlipIPv6toCJDNS(addr_accepted); const CService addr_bind{MaybeFlipIPv6toCJDNS(GetBindAddress(*sock))}; - CreateNodeFromAcceptedSocket(std::move(sock_accepted), addr_bind, addr_accepted); + EventNewConnectionAccepted(std::move(sock_accepted), addr_bind, addr_accepted); } } } @@ -3072,42 +3072,6 @@ void CConnman::EventI2PStatus(const CService& addr, SockMan::I2PStatus new_statu } } -void CConnman::ThreadI2PAcceptIncoming() -{ - static constexpr auto err_wait_begin = 1s; - static constexpr auto err_wait_cap = 5min; - auto err_wait = err_wait_begin; - - i2p::Connection conn; - - auto SleepOnFailure = [&]() { - interruptNet.sleep_for(err_wait); - if (err_wait < err_wait_cap) { - err_wait += 1s; - } - }; - - while (!interruptNet) { - - if (!m_i2p_sam_session->Listen(conn)) { - EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING); - SleepOnFailure(); - continue; - } - - EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING); - - if (!m_i2p_sam_session->Accept(conn)) { - SleepOnFailure(); - continue; - } - - CreateNodeFromAcceptedSocket(std::move(conn.sock), conn.me, conn.peer); - - err_wait = err_wait_begin; - } -} - void Discover() { if (!fDiscover) @@ -3232,12 +3196,6 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) return false; } - Proxy i2p_sam; - if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) { - m_i2p_sam_session = std::make_unique(gArgs.GetDataDirNet() / "i2p_private_key", - i2p_sam, &interruptNet); - } - // Randomize the order in which we may query seednode to potentially prevent connecting to the same one every restart (and signal that we have restarted) std::vector seed_nodes = connOptions.vSeedNodes; if (!seed_nodes.empty()) { @@ -3283,6 +3241,15 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&util::TraceThread, "net", [this] { ThreadSocketHandler(); }); + SockMan::Options sockman_options; + + Proxy i2p_sam; + if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) { + sockman_options.i2p.emplace(gArgs.GetDataDirNet() / "i2p_private_key", i2p_sam, "i2paccept"); + } + + StartSocketsThreads(sockman_options); + if (!gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED)) LogPrintf("DNS seeding disabled\n"); else @@ -3308,11 +3275,6 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) // Process messages threadMessageHandler = std::thread(&util::TraceThread, "msghand", [this] { ThreadMessageHandler(); }); - if (m_i2p_sam_session) { - threadI2PAcceptIncoming = - std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); }); - } - // Dump network addresses scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL); @@ -3366,9 +3328,8 @@ void CConnman::Interrupt() void CConnman::StopThreads() { - if (threadI2PAcceptIncoming.joinable()) { - threadI2PAcceptIncoming.join(); - } + JoinSocketsThreads(); + if (threadMessageHandler.joinable()) threadMessageHandler.join(); if (threadOpenConnections.joinable()) diff --git a/src/net.h b/src/net.h index 0edf6acfbb7..aa9c3574f0f 100644 --- a/src/net.h +++ b/src/net.h @@ -1292,18 +1292,15 @@ private: virtual void EventI2PStatus(const CService& addr, SockMan::I2PStatus new_status) override; - void ThreadI2PAcceptIncoming(); - /** - * Create a `CNode` object from a socket that has just been accepted and add the node to - * the `m_nodes` member. + * Create a `CNode` object and add it to the `m_nodes` member. * @param[in] sock Connected socket to communicate with the peer. - * @param[in] addr_bind The address and port at our side of the connection. - * @param[in] addr The address and port at the peer's side of the connection. + * @param[in] me The address and port at our side of the connection. + * @param[in] them The address and port at the peer's side of the connection. */ - void CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, - const CService& addr_bind, - const CService& addr); + virtual void EventNewConnectionAccepted(std::unique_ptr&& sock, + const CService& me, + const CService& them) override; void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex); void NotifyNumConnectionsChanged(); @@ -1531,27 +1528,11 @@ private: Mutex mutexMsgProc; std::atomic flagInterruptMsgProc{false}; - /** - * This is signaled when network activity should cease. - * A pointer to it is saved in `m_i2p_sam_session`, so make sure that - * the lifetime of `interruptNet` is not shorter than - * the lifetime of `m_i2p_sam_session`. - */ - CThreadInterrupt interruptNet; - - /** - * I2P SAM session. - * Used to accept incoming and make outgoing I2P connections from a persistent - * address. - */ - std::unique_ptr m_i2p_sam_session; - std::thread threadDNSAddressSeed; std::thread threadSocketHandler; std::thread threadOpenAddedConnections; std::thread threadOpenConnections; std::thread threadMessageHandler; - std::thread threadI2PAcceptIncoming; /** flag for deciding to connect to an extra outbound peer, * in excess of m_max_outbound_full_relay