net: move I2P-accept-incoming code from CConnman to SockMan

This commit is contained in:
Vasil Dimov 2024-08-27 16:23:31 +02:00
parent a367d556fd
commit be1d7418c1
No known key found for this signature in database
GPG Key ID: 54DF06F64B55CBBF
5 changed files with 151 additions and 80 deletions

View File

@ -131,6 +131,7 @@ add_library(bitcoin_common STATIC EXCLUDE_FROM_ALL
core_write.cpp
deploymentinfo.cpp
external_signer.cpp
i2p.cpp
init/common.cpp
kernel/chainparams.cpp
key.cpp
@ -208,7 +209,6 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL
headerssync.cpp
httprpc.cpp
httpserver.cpp
i2p.cpp
index/base.cpp
index/blockfilterindex.cpp
index/coinstatsindex.cpp

View File

@ -8,6 +8,7 @@
#include <logging.h>
#include <netbase.h>
#include <util/sock.h>
#include <util/thread.h>
bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg)
{
@ -91,6 +92,24 @@ bool SockMan::BindAndStartListening(const CService& to, bilingual_str& err_msg)
return true;
}
void SockMan::StartSocketsThreads(const Options& options)
{
if (options.i2p.has_value()) {
m_i2p_sam_session = std::make_unique<i2p::sam::Session>(
options.i2p->private_key_file, options.i2p->sam_proxy, &interruptNet);
m_thread_i2p_accept =
std::thread(&util::TraceThread, options.i2p->accept_thread_name, [this] { ThreadI2PAccept(); });
}
}
void SockMan::JoinSocketsThreads()
{
if (m_thread_i2p_accept.joinable()) {
m_thread_i2p_accept.join();
}
}
std::unique_ptr<Sock> SockMan::AcceptConnection(const Sock& listen_sock, CService& addr)
{
sockaddr_storage storage;
@ -127,3 +146,39 @@ void SockMan::StopListening()
}
void SockMan::EventI2PStatus(const CService&, I2PStatus) {}
void SockMan::ThreadI2PAccept()
{
static constexpr auto err_wait_begin = 1s;
static constexpr auto err_wait_cap = 5min;
auto err_wait = err_wait_begin;
i2p::Connection conn;
auto SleepOnFailure = [&]() {
interruptNet.sleep_for(err_wait);
if (err_wait < err_wait_cap) {
err_wait += 1s;
}
};
while (!interruptNet) {
if (!m_i2p_sam_session->Listen(conn)) {
EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING);
SleepOnFailure();
continue;
}
EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING);
if (!m_i2p_sam_session->Accept(conn)) {
SleepOnFailure();
continue;
}
EventNewConnectionAccepted(std::move(conn.sock), conn.me, conn.peer);
err_wait = err_wait_begin;
}
}

View File

@ -5,12 +5,16 @@
#ifndef BITCOIN_COMMON_SOCKMAN_H
#define BITCOIN_COMMON_SOCKMAN_H
#include <i2p.h>
#include <netaddress.h>
#include <netbase.h>
#include <util/fs.h>
#include <util/sock.h>
#include <util/translation.h>
#include <atomic>
#include <memory>
#include <thread>
#include <vector>
/**
@ -18,6 +22,7 @@
* To use this class, inherit from it and implement the pure virtual methods.
* Handled operations:
* - binding and listening on sockets
* - starting of necessary threads to process socket operations
* - accepting incoming connections
*/
class SockMan
@ -47,6 +52,7 @@ public:
/**
* Bind to a new address:port, start listening and add the listen socket to `m_listen`.
* Should be called before `StartSocketsThreads()`.
* @param[in] to Where to bind.
* @param[out] err_msg Error string if an error occurs.
* @retval true Success.
@ -54,6 +60,38 @@ public:
*/
bool BindAndStartListening(const CService& to, bilingual_str& err_msg);
/**
* Options to influence `StartSocketsThreads()`.
*/
struct Options {
struct I2P {
explicit I2P(const fs::path& file, const Proxy& proxy, std::string_view accept_thread_name)
: private_key_file{file},
sam_proxy{proxy},
accept_thread_name{accept_thread_name}
{}
const fs::path private_key_file;
const Proxy sam_proxy;
const std::string_view accept_thread_name;
};
/**
* I2P options. If set then a thread will be started that will accept incoming I2P connections.
*/
std::optional<I2P> i2p;
};
/**
* Start the necessary threads for sockets IO.
*/
void StartSocketsThreads(const Options& options);
/**
* Join (wait for) the threads started by `StartSocketsThreads()` to exit.
*/
void JoinSocketsThreads();
/**
* Accept a connection.
* @param[in] listen_sock Socket on which to accept the connection.
@ -72,6 +110,21 @@ public:
*/
void StopListening();
/**
* This is signaled when network activity should cease.
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
* the lifetime of `interruptNet` is not shorter than
* the lifetime of `m_i2p_sam_session`.
*/
CThreadInterrupt interruptNet;
/**
* I2P SAM session.
* Used to accept incoming and make outgoing I2P connections from a persistent
* address.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
/**
* List of listening sockets.
*/
@ -83,6 +136,16 @@ private:
// Pure virtual functions must be implemented by children classes.
//
/**
* Be notified when a new connection has been accepted.
* @param[in] sock Connected socket to communicate with the peer.
* @param[in] me The address and port at our side of the connection.
* @param[in] them The address and port at the peer's side of the connection.
*/
virtual void EventNewConnectionAccepted(std::unique_ptr<Sock>&& sock,
const CService& me,
const CService& them) = 0;
//
// Non-pure virtual functions can be overridden by children classes or left
// alone to use the default implementation from SockMan.
@ -96,10 +159,21 @@ private:
*/
virtual void EventI2PStatus(const CService& addr, I2PStatus new_status);
/**
* Accept incoming I2P connections in a loop and call
* `EventNewConnectionAccepted()` for each new connection.
*/
void ThreadI2PAccept();
/**
* The id to assign to the next created connection. Used to generate ids of connections.
*/
std::atomic<Id> m_next_id{0};
/**
* Thread that accepts incoming I2P connections in a loop, can be stopped via `interruptNet`.
*/
std::thread m_thread_i2p_accept;
};
#endif // BITCOIN_COMMON_SOCKMAN_H

View File

@ -1732,9 +1732,9 @@ bool CConnman::AttemptToEvictConnection()
return false;
}
void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
const CService& addr_bind,
const CService& addr)
void CConnman::EventNewConnectionAccepted(std::unique_ptr<Sock>&& sock,
const CService& addr_bind,
const CService& addr)
{
int nInbound = 0;
@ -2201,7 +2201,7 @@ void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock
addr_accepted = MaybeFlipIPv6toCJDNS(addr_accepted);
const CService addr_bind{MaybeFlipIPv6toCJDNS(GetBindAddress(*sock))};
CreateNodeFromAcceptedSocket(std::move(sock_accepted), addr_bind, addr_accepted);
EventNewConnectionAccepted(std::move(sock_accepted), addr_bind, addr_accepted);
}
}
}
@ -3072,42 +3072,6 @@ void CConnman::EventI2PStatus(const CService& addr, SockMan::I2PStatus new_statu
}
}
void CConnman::ThreadI2PAcceptIncoming()
{
static constexpr auto err_wait_begin = 1s;
static constexpr auto err_wait_cap = 5min;
auto err_wait = err_wait_begin;
i2p::Connection conn;
auto SleepOnFailure = [&]() {
interruptNet.sleep_for(err_wait);
if (err_wait < err_wait_cap) {
err_wait += 1s;
}
};
while (!interruptNet) {
if (!m_i2p_sam_session->Listen(conn)) {
EventI2PStatus(conn.me, SockMan::I2PStatus::STOP_LISTENING);
SleepOnFailure();
continue;
}
EventI2PStatus(conn.me, SockMan::I2PStatus::START_LISTENING);
if (!m_i2p_sam_session->Accept(conn)) {
SleepOnFailure();
continue;
}
CreateNodeFromAcceptedSocket(std::move(conn.sock), conn.me, conn.peer);
err_wait = err_wait_begin;
}
}
void Discover()
{
if (!fDiscover)
@ -3232,12 +3196,6 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
return false;
}
Proxy i2p_sam;
if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) {
m_i2p_sam_session = std::make_unique<i2p::sam::Session>(gArgs.GetDataDirNet() / "i2p_private_key",
i2p_sam, &interruptNet);
}
// Randomize the order in which we may query seednode to potentially prevent connecting to the same one every restart (and signal that we have restarted)
std::vector<std::string> seed_nodes = connOptions.vSeedNodes;
if (!seed_nodes.empty()) {
@ -3283,6 +3241,15 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&util::TraceThread, "net", [this] { ThreadSocketHandler(); });
SockMan::Options sockman_options;
Proxy i2p_sam;
if (GetProxy(NET_I2P, i2p_sam) && connOptions.m_i2p_accept_incoming) {
sockman_options.i2p.emplace(gArgs.GetDataDirNet() / "i2p_private_key", i2p_sam, "i2paccept");
}
StartSocketsThreads(sockman_options);
if (!gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED))
LogPrintf("DNS seeding disabled\n");
else
@ -3308,11 +3275,6 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
// Process messages
threadMessageHandler = std::thread(&util::TraceThread, "msghand", [this] { ThreadMessageHandler(); });
if (m_i2p_sam_session) {
threadI2PAcceptIncoming =
std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); });
}
// Dump network addresses
scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL);
@ -3366,9 +3328,8 @@ void CConnman::Interrupt()
void CConnman::StopThreads()
{
if (threadI2PAcceptIncoming.joinable()) {
threadI2PAcceptIncoming.join();
}
JoinSocketsThreads();
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadOpenConnections.joinable())

View File

@ -1292,18 +1292,15 @@ private:
virtual void EventI2PStatus(const CService& addr, SockMan::I2PStatus new_status) override;
void ThreadI2PAcceptIncoming();
/**
* Create a `CNode` object from a socket that has just been accepted and add the node to
* the `m_nodes` member.
* Create a `CNode` object and add it to the `m_nodes` member.
* @param[in] sock Connected socket to communicate with the peer.
* @param[in] addr_bind The address and port at our side of the connection.
* @param[in] addr The address and port at the peer's side of the connection.
* @param[in] me The address and port at our side of the connection.
* @param[in] them The address and port at the peer's side of the connection.
*/
void CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
const CService& addr_bind,
const CService& addr);
virtual void EventNewConnectionAccepted(std::unique_ptr<Sock>&& sock,
const CService& me,
const CService& them) override;
void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex);
void NotifyNumConnectionsChanged();
@ -1531,27 +1528,11 @@ private:
Mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc{false};
/**
* This is signaled when network activity should cease.
* A pointer to it is saved in `m_i2p_sam_session`, so make sure that
* the lifetime of `interruptNet` is not shorter than
* the lifetime of `m_i2p_sam_session`.
*/
CThreadInterrupt interruptNet;
/**
* I2P SAM session.
* Used to accept incoming and make outgoing I2P connections from a persistent
* address.
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;
std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMessageHandler;
std::thread threadI2PAcceptIncoming;
/** flag for deciding to connect to an extra outbound peer,
* in excess of m_max_outbound_full_relay