net: implement opening PRIVATE_BROADCAST connections

Implement opening `ConnectionType::PRIVATE_BROADCAST` connections with
the following properties:
* Only to Tor or I2P (or IPv4/IPv6 through the Tor proxy, if provided)
* Open such connections only when requested and don't maintain N opened
  connections of this type.
* Since this is substantially different than what
  `OpenNetworkConnection()` does, open the private broadcast connections
  from a different thread instead of modifying `OpenNetworkConnection()`
  to also open those types of connections.

Co-authored-by: Andrew Toth <andrewstoth@gmail.com>
This commit is contained in:
Vasil Dimov
2025-04-02 06:07:41 +02:00
parent 01dad4efe2
commit bb49d26032
3 changed files with 232 additions and 3 deletions

View File

@@ -458,7 +458,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect,
i2p::Connection conn;
bool connected{false};
if (m_i2p_sam_session) {
// If an I2P SAM session already exists, normally we would re-use it. But in the case of
// private broadcast we force a new transient session. A Connect() using m_i2p_sam_session
// would use our permanent I2P address as a source address.
if (m_i2p_sam_session && conn_type != ConnectionType::PRIVATE_BROADCAST) {
connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed);
} else {
{
@@ -3052,6 +3055,74 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect,
return true;
}
std::optional<Network> CConnman::PrivateBroadcast::PickNetwork(std::optional<Proxy>& proxy) const
{
prevector<4, Network> nets;
std::optional<Proxy> clearnet_proxy;
proxy.reset();
if (g_reachable_nets.Contains(NET_ONION)) {
nets.push_back(NET_ONION);
clearnet_proxy = ProxyForIPv4or6();
if (clearnet_proxy.has_value()) {
if (g_reachable_nets.Contains(NET_IPV4)) {
nets.push_back(NET_IPV4);
}
if (g_reachable_nets.Contains(NET_IPV6)) {
nets.push_back(NET_IPV6);
}
}
}
if (g_reachable_nets.Contains(NET_I2P)) {
nets.push_back(NET_I2P);
}
if (nets.empty()) {
return std::nullopt;
}
const Network net{nets[FastRandomContext{}.randrange(nets.size())]};
if (net == NET_IPV4 || net == NET_IPV6) {
proxy = clearnet_proxy;
}
return net;
}
size_t CConnman::PrivateBroadcast::NumToOpen() const
{
return m_num_to_open;
}
void CConnman::PrivateBroadcast::NumToOpenAdd(size_t n)
{
m_num_to_open += n;
m_num_to_open.notify_all();
}
size_t CConnman::PrivateBroadcast::NumToOpenSub(size_t n)
{
size_t current_value{m_num_to_open.load()};
size_t new_value;
do {
new_value = current_value > n ? current_value - n : 0;
} while (!m_num_to_open.compare_exchange_strong(current_value, new_value));
return new_value;
}
void CConnman::PrivateBroadcast::NumToOpenWait() const
{
m_num_to_open.wait(0);
}
std::optional<Proxy> CConnman::PrivateBroadcast::ProxyForIPv4or6() const
{
Proxy tor_proxy;
if (m_outbound_tor_ok_at_least_once.load() && GetProxy(NET_ONION, tor_proxy)) {
return tor_proxy;
}
return std::nullopt;
}
Mutex NetEventsInterface::g_msgproc_mutex;
void CConnman::ThreadMessageHandler()
@@ -3136,6 +3207,74 @@ void CConnman::ThreadI2PAcceptIncoming()
}
}
void CConnman::ThreadPrivateBroadcast()
{
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
size_t addrman_num_bad_addresses{0};
while (!m_interrupt_net->interrupted()) {
if (!fNetworkActive) {
m_interrupt_net->sleep_for(5s);
continue;
}
CountingSemaphoreGrant<> conn_max_grant{m_private_broadcast.m_sem_conn_max}; // Would block if too many are opened.
m_private_broadcast.NumToOpenWait();
if (m_interrupt_net->interrupted()) {
break;
}
std::optional<Proxy> proxy;
const std::optional<Network> net{m_private_broadcast.PickNetwork(proxy)};
if (!net.has_value()) {
LogWarning("[privatebroadcast] Connections needed but none of the Tor or I2P networks is reachable");
m_interrupt_net->sleep_for(5s);
continue;
}
const auto [addr, _] = addrman.Select(/*new_only=*/false, {net.value()});
if (!addr.IsValid() || IsLocal(addr)) {
++addrman_num_bad_addresses;
if (addrman_num_bad_addresses > 100) {
LogDebug(BCLog::PRIVBROADCAST, "Connections needed but addrman keeps returning bad addresses, will retry");
m_interrupt_net->sleep_for(500ms);
}
continue;
}
addrman_num_bad_addresses = 0;
auto target_str{addr.ToStringAddrPort()};
if (proxy.has_value()) {
target_str += " through the proxy at " + proxy->ToString();
}
const bool use_v2transport(addr.nServices & GetLocalServices() & NODE_P2P_V2);
if (OpenNetworkConnection(addr,
/*fCountFailure=*/true,
std::move(conn_max_grant),
/*pszDest=*/nullptr,
ConnectionType::PRIVATE_BROADCAST,
use_v2transport,
proxy)) {
const size_t remaining{m_private_broadcast.NumToOpenSub(1)};
LogDebug(BCLog::PRIVBROADCAST, "Socket connected to %s; remaining connections to open: %d", target_str, remaining);
} else {
const size_t remaining{m_private_broadcast.NumToOpen()};
if (remaining == 0) {
LogDebug(BCLog::PRIVBROADCAST, "Failed to connect to %s, will not retry, no more connections needed", target_str);
} else {
LogDebug(BCLog::PRIVBROADCAST, "Failed to connect to %s, will retry to a different address; remaining connections to open: %d", target_str, remaining);
m_interrupt_net->sleep_for(100ms); // Prevent busy loop if OpenNetworkConnection() fails fast repeatedly.
}
}
}
}
bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions)
{
int nOne = 1;
@@ -3416,6 +3555,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); });
}
if (gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) {
threadPrivateBroadcast =
std::thread(&util::TraceThread, "privbcast", [this] { ThreadPrivateBroadcast(); });
}
// Dump network addresses
scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL);
@@ -3465,10 +3609,16 @@ void CConnman::Interrupt()
semAddnode->release();
}
}
m_private_broadcast.m_sem_conn_max.release();
m_private_broadcast.NumToOpenAdd(1); // Just unblock NumToOpenWait() to be able to continue with shutdown.
}
void CConnman::StopThreads()
{
if (threadPrivateBroadcast.joinable()) {
threadPrivateBroadcast.join();
}
if (threadI2PAcceptIncoming.joinable()) {
threadI2PAcceptIncoming.join();
}
@@ -3901,6 +4051,14 @@ bool CConnman::NodeFullyConnected(const CNode* pnode)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
AssertLockNotHeld(m_total_bytes_sent_mutex);
if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() &&
pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) {
// If we are sending the peer VERACK that means we successfully sent
// and received another message to/from that peer (VERSION).
m_private_broadcast.m_outbound_tor_ok_at_least_once.store(true);
}
size_t nMessageSize = msg.data.size();
LogDebug(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId());
if (m_capture_messages) {