From eae193e750235e4023b3e6313284eede2cd7a022 Mon Sep 17 00:00:00 2001 From: Fabian Jahr Date: Sat, 27 Dec 2025 01:29:04 +0100 Subject: [PATCH] torcontrol: Remove libevent usage Replace libevent-based approach with using the Sock class and CThreadInterrupt. --- src/test/fuzz/torcontrol.cpp | 6 +- src/torcontrol.cpp | 396 +++++++++++++++++++---------------- src/torcontrol.h | 78 ++++--- 3 files changed, 263 insertions(+), 217 deletions(-) diff --git a/src/test/fuzz/torcontrol.cpp b/src/test/fuzz/torcontrol.cpp index 1ea3069000a..47874ddf63d 100644 --- a/src/test/fuzz/torcontrol.cpp +++ b/src/test/fuzz/torcontrol.cpp @@ -14,12 +14,14 @@ class DummyTorControlConnection : public TorControlConnection { + CThreadInterrupt m_dummy_interrupt; + public: - DummyTorControlConnection() : TorControlConnection{nullptr} + DummyTorControlConnection() : TorControlConnection{m_dummy_interrupt} { } - bool Connect(const std::string&, const ConnectionCB&, const ConnectionCB&) + bool Connect(const std::string&) { return true; } diff --git a/src/torcontrol.cpp b/src/torcontrol.cpp index 39ecb6d3200..e8e680d7906 100644 --- a/src/torcontrol.cpp +++ b/src/torcontrol.cpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -37,12 +38,6 @@ #include #include -#include -#include -#include -#include -#include - using util::ReplaceAll; using util::SplitString; using util::ToString; @@ -58,146 +53,172 @@ static const std::string TOR_SAFE_SERVERKEY = "Tor safe cookie authentication se /** For computing clientHash in SAFECOOKIE */ static const std::string TOR_SAFE_CLIENTKEY = "Tor safe cookie authentication controller-to-server hash"; /** Exponential backoff configuration - initial timeout in seconds */ -constexpr float RECONNECT_TIMEOUT_START = 1.0; +constexpr std::chrono::duration RECONNECT_TIMEOUT_START{1.0}; /** Exponential backoff configuration - growth factor */ -constexpr float RECONNECT_TIMEOUT_EXP = 1.5; +constexpr double RECONNECT_TIMEOUT_EXP = 1.5; /** Maximum reconnect timeout in seconds to prevent excessive delays */ -constexpr float RECONNECT_TIMEOUT_MAX = 600.0; +constexpr std::chrono::duration RECONNECT_TIMEOUT_MAX{600.0}; /** Maximum length for lines received on TorControlConnection. * tor-control-spec.txt mentions that there is explicitly no limit defined to line length, * this is belt-and-suspenders sanity limit to prevent memory exhaustion. */ constexpr int MAX_LINE_LENGTH = 100000; +/** Timeout for socket operations */ +constexpr auto SOCKET_SEND_TIMEOUT = 10s; /****** Low-level TorControlConnection ********/ -TorControlConnection::TorControlConnection(struct event_base* _base) - : base(_base) +TorControlConnection::TorControlConnection(CThreadInterrupt& interrupt) + : m_interrupt(interrupt) { } TorControlConnection::~TorControlConnection() { - if (b_conn) - bufferevent_free(b_conn); + Disconnect(); } -void TorControlConnection::readcb(struct bufferevent *bev, void *ctx) +bool TorControlConnection::Connect(const std::string& tor_control_center) { - TorControlConnection *self = static_cast(ctx); - struct evbuffer *input = bufferevent_get_input(bev); - size_t n_read_out = 0; - char *line; - assert(input); - // If there is not a whole line to read, evbuffer_readln returns nullptr - while((line = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF)) != nullptr) - { - std::string s(line, n_read_out); - free(line); - if (s.size() < 4) // Short line - continue; - // (-|+| ) - self->m_message.code = ToIntegral(s.substr(0, 3)).value_or(0); - self->m_message.lines.push_back(s.substr(4)); - char ch = s[3]; // '-','+' or ' ' - if (ch == ' ') { - // Final line, dispatch reply and clean up - if (self->m_message.code >= 600) { - // (currently unused) - // Dispatch async notifications to async handler - // Synchronous and asynchronous messages are never interleaved - } else { - if (!self->reply_handlers.empty()) { - // Invoke reply handler with message - self->reply_handlers.front()(*self, self->m_message); - self->reply_handlers.pop_front(); - } else { - LogDebug(BCLog::TOR, "Received unexpected sync reply %i", self->m_message.code); - } - } - self->m_message.Clear(); - } - } - // Check for size of buffer - protect against memory exhaustion with very long lines - // Do this after evbuffer_readln to make sure all full lines have been - // removed from the buffer. Everything left is an incomplete line. - if (evbuffer_get_length(input) > MAX_LINE_LENGTH) { - LogWarning("tor: Disconnecting because MAX_LINE_LENGTH exceeded"); - self->Disconnect(); - } -} - -void TorControlConnection::eventcb(struct bufferevent *bev, short what, void *ctx) -{ - TorControlConnection *self = static_cast(ctx); - if (what & BEV_EVENT_CONNECTED) { - LogDebug(BCLog::TOR, "Successfully connected!"); - self->connected(*self); - } else if (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR)) { - if (what & BEV_EVENT_ERROR) { - LogDebug(BCLog::TOR, "Error connecting to Tor control socket"); - } else { - LogDebug(BCLog::TOR, "End of stream"); - } - self->Disconnect(); - self->disconnected(*self); - } -} - -bool TorControlConnection::Connect(const std::string& tor_control_center, const ConnectionCB& _connected, const ConnectionCB& _disconnected) -{ - if (b_conn) { + if (m_sock) { Disconnect(); } - const std::optional control_service{Lookup(tor_control_center, DEFAULT_TOR_CONTROL_PORT, fNameLookup)}; + std::optional control_service = Lookup(tor_control_center, DEFAULT_TOR_CONTROL_PORT, fNameLookup); if (!control_service.has_value()) { LogWarning("tor: Failed to look up control center %s", tor_control_center); return false; } - struct sockaddr_storage control_address; - socklen_t control_address_len = sizeof(control_address); - if (!control_service.value().GetSockAddr(reinterpret_cast(&control_address), &control_address_len)) { - LogWarning("tor: Error parsing socket address %s", tor_control_center); - return false; - } - - // Create a new socket, set up callbacks and enable notification bits - b_conn = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); - if (!b_conn) { - return false; - } - bufferevent_setcb(b_conn, TorControlConnection::readcb, nullptr, TorControlConnection::eventcb, this); - bufferevent_enable(b_conn, EV_READ|EV_WRITE); - this->connected = _connected; - this->disconnected = _disconnected; - - // Finally, connect to tor_control_center - if (bufferevent_socket_connect(b_conn, reinterpret_cast(&control_address), control_address_len) < 0) { + m_sock = ConnectDirectly(control_service.value(), /*manual_connection=*/true); + if (!m_sock) { LogWarning("tor: Error connecting to address %s", tor_control_center); return false; } + + m_recv_buffer.clear(); + m_message.Clear(); + m_reply_handlers.clear(); + + LogDebug(BCLog::TOR, "Successfully connected to Tor control port"); return true; } void TorControlConnection::Disconnect() { - if (b_conn) - bufferevent_free(b_conn); - b_conn = nullptr; + m_sock.reset(); + m_recv_buffer.clear(); + m_message.Clear(); + m_reply_handlers.clear(); +} + +bool TorControlConnection::IsConnected() const +{ + if (!m_sock) return false; + std::string errmsg; + const bool connected{m_sock->IsConnected(errmsg)}; + if (!connected && !errmsg.empty()) { + LogDebug(BCLog::TOR, "Connection check failed: %s", errmsg); + } + return connected; +} + +bool TorControlConnection::WaitForData(std::chrono::milliseconds timeout) +{ + if (!m_sock) return false; + + Sock::Event event{0}; + if (!m_sock->Wait(timeout, Sock::RECV, &event)) { + return false; + } + if (event & Sock::ERR) { + LogDebug(BCLog::TOR, "Socket error detected"); + Disconnect(); + return false; + } + + return (event & Sock::RECV); +} + +bool TorControlConnection::ReceiveAndProcess() +{ + if (!m_sock) return false; + + std::byte buf[4096]; + ssize_t nread = m_sock->Recv(buf, sizeof(buf), MSG_DONTWAIT); + + if (nread < 0) { + int err = WSAGetLastError(); + if (err == WSAEWOULDBLOCK || err == WSAEINTR || err == WSAEINPROGRESS) { + // No data available currently + return true; + } + LogWarning("tor: Error reading from socket: %s", NetworkErrorString(err)); + return false; + } + + if (nread == 0) { + LogDebug(BCLog::TOR, "End of stream"); + return false; + } + + m_recv_buffer.insert(m_recv_buffer.end(), buf, buf + nread); + try { + return ProcessBuffer(); + } catch (const std::runtime_error& e) { + LogWarning("tor: Error processing receive buffer: %s", e.what()); + return false; + } +} + +bool TorControlConnection::ProcessBuffer() +{ + util::LineReader reader(m_recv_buffer, MAX_LINE_LENGTH); + auto start = reader.it; + + while (auto line = reader.ReadLine()) { + // Skip short lines + if (line->size() < 4) continue; + + // Parse: + // (-|+| ) + m_message.code = ToIntegral(line->substr(0, 3)).value_or(0); + m_message.lines.push_back(line->substr(4)); + char separator = (*line)[3]; // '-', '+', or ' ' + + if (separator == ' ') { + if (m_message.code >= 600) { + // Async notifications are currently unused + // Synchronous and asynchronous messages are never interleaved + LogDebug(BCLog::TOR, "Received async notification %i", m_message.code); + } else if (!m_reply_handlers.empty()) { + // Invoke reply handler with message + m_reply_handlers.front()(*this, m_message); + m_reply_handlers.pop_front(); + } else { + LogDebug(BCLog::TOR, "Received unexpected sync reply %i", m_message.code); + } + m_message.Clear(); + } + } + + m_recv_buffer.erase(m_recv_buffer.begin(), m_recv_buffer.begin() + std::distance(start, reader.it)); + return true; } bool TorControlConnection::Command(const std::string &cmd, const ReplyHandlerCB& reply_handler) { - if (!b_conn) + if (!m_sock) return false; + + std::string command = cmd + "\r\n"; + try { + m_sock->SendComplete(std::span{command}, SOCKET_SEND_TIMEOUT, m_interrupt); + } catch (const std::runtime_error& e) { + LogWarning("tor: Error sending command: %s", e.what()); return false; - struct evbuffer *buf = bufferevent_get_output(b_conn); - if (!buf) - return false; - evbuffer_add(buf, cmd.data(), cmd.size()); - evbuffer_add(buf, "\r\n", 2); - reply_handlers.push_back(reply_handler); + } + + m_reply_handlers.push_back(reply_handler); return true; } @@ -320,38 +341,89 @@ std::map ParseTorReplyMapping(const std::string &s) return mapping; } -TorController::TorController(struct event_base* _base, const std::string& tor_control_center, const CService& target): - base(_base), - m_tor_control_center(tor_control_center), m_conn(base), m_reconnect(true), m_reconnect_timeout(RECONNECT_TIMEOUT_START), - m_target(target) +TorController::TorController(const std::string& tor_control_center, const CService& target) + : m_tor_control_center(tor_control_center), + m_conn(m_interrupt), + m_reconnect(true), + m_reconnect_timeout(RECONNECT_TIMEOUT_START), + m_target(target) { - reconnect_ev = event_new(base, -1, 0, reconnect_cb, this); - if (!reconnect_ev) - LogWarning("tor: Failed to create event for reconnection: out of memory?"); - // Start connection attempts immediately - if (!m_conn.Connect(m_tor_control_center, std::bind_front(&TorController::connected_cb, this), - std::bind_front(&TorController::disconnected_cb, this) )) { - LogWarning("tor: Initiating connection to Tor control port %s failed", m_tor_control_center); - } // Read service private key if cached std::pair pkf = ReadBinaryFile(GetPrivateKeyFile()); if (pkf.first) { LogDebug(BCLog::TOR, "Reading cached private key from %s", fs::PathToString(GetPrivateKeyFile())); m_private_key = pkf.second; } + m_thread = std::thread(&util::TraceThread, "torcontrol", [this] { ThreadControl(); }); } TorController::~TorController() { - if (reconnect_ev) { - event_free(reconnect_ev); - reconnect_ev = nullptr; - } + Interrupt(); + Join(); if (m_service.IsValid()) { RemoveLocal(m_service); } } +void TorController::Interrupt() +{ + m_reconnect = false; + m_interrupt(); +} + +void TorController::Join() +{ + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void TorController::ThreadControl() +{ + LogDebug(BCLog::TOR, "Entering Tor control thread"); + + while (!m_interrupt) { + // Try to connect if not connected already + if (!m_conn.IsConnected()) { + LogDebug(BCLog::TOR, "Attempting to connect to Tor control port %s", m_tor_control_center); + + if (!m_conn.Connect(m_tor_control_center)) { + LogWarning("tor: Initiating connection to Tor control port %s failed", m_tor_control_center); + if (!m_reconnect) { + break; + } + // Wait before retrying with exponential backoff + LogDebug(BCLog::TOR, "Retrying in %.1f seconds", m_reconnect_timeout.count()); + if (!m_interrupt.sleep_for(std::chrono::duration_cast(m_reconnect_timeout))) { + break; + } + m_reconnect_timeout = std::min(m_reconnect_timeout * RECONNECT_TIMEOUT_EXP, RECONNECT_TIMEOUT_MAX); + continue; + } + // Successfully connected, reset timeout and trigger connected callback + m_reconnect_timeout = RECONNECT_TIMEOUT_START; + connected_cb(m_conn); + } + // Wait for data with a timeout + if (!m_conn.WaitForData(std::chrono::seconds(1))) { + // Check if still connected + if (!m_conn.IsConnected()) { + LogDebug(BCLog::TOR, "Lost connection to Tor control port"); + disconnected_cb(m_conn); + continue; + } + // Just a timeout, continue waiting + continue; + } + // Process incoming data + if (!m_conn.ReceiveAndProcess()) { + disconnected_cb(m_conn); + } + } + LogDebug(BCLog::TOR, "Exited Tor control thread"); +} + void TorController::get_socks_cb(TorControlConnection& _conn, const TorControlReply& reply) { // NOTE: We can only get here if -onion is unset @@ -515,7 +587,7 @@ void TorController::auth_cb(TorControlConnection& _conn, const TorControlReply& * CookieString | ClientNonce | ServerNonce) * */ -static std::vector ComputeResponse(const std::string &key, const std::vector &cookie, const std::vector &client_nonce, const std::vector &server_nonce) +static std::vector ComputeResponse(std::string_view key, std::span cookie, std::span client_nonce, std::span server_nonce) { CHMAC_SHA256 computeHash((const uint8_t*)key.data(), key.size()); std::vector computedHash(CHMAC_SHA256::OUTPUT_SIZE, 0); @@ -654,26 +726,8 @@ void TorController::disconnected_cb(TorControlConnection& _conn) if (!m_reconnect) return; - LogDebug(BCLog::TOR, "Not connected to Tor control port %s, retrying in %.2f s", - m_tor_control_center, m_reconnect_timeout); - - // Single-shot timer for reconnect. Use exponential backoff with a maximum. - struct timeval time = MillisToTimeval(int64_t(m_reconnect_timeout * 1000.0)); - if (reconnect_ev) - event_add(reconnect_ev, &time); - - m_reconnect_timeout = std::min(m_reconnect_timeout * RECONNECT_TIMEOUT_EXP, RECONNECT_TIMEOUT_MAX); -} - -void TorController::Reconnect() -{ - /* Try to reconnect and reestablish if we get booted - for example, Tor - * may be restarting. - */ - if (!m_conn.Connect(m_tor_control_center, std::bind_front(&TorController::connected_cb, this), - std::bind_front(&TorController::disconnected_cb, this) )) { - LogWarning("tor: Re-initiating connection to Tor control port %s failed", m_tor_control_center); - } + LogDebug(BCLog::TOR, "Not connected to Tor control port %s, will retry", m_tor_control_center); + _conn.Disconnect(); } fs::path TorController::GetPrivateKeyFile() @@ -681,59 +735,33 @@ fs::path TorController::GetPrivateKeyFile() return gArgs.GetDataDirNet() / "onion_v3_private_key"; } -void TorController::reconnect_cb(evutil_socket_t fd, short what, void *arg) -{ - TorController *self = static_cast(arg); - self->Reconnect(); -} - /****** Thread ********/ -static struct event_base *gBase; -static std::thread torControlThread; -static void TorControlThread(CService onion_service_target) -{ - TorController ctrl(gBase, gArgs.GetArg("-torcontrol", DEFAULT_TOR_CONTROL), onion_service_target); - - event_base_dispatch(gBase); -} +/** + * TODO: TBD if introducing a global is the preferred approach here since we + * usually try to avoid them. We could let init manage the lifecycle or make + * this a part of NodeContext maybe instead. + */ +static std::unique_ptr g_tor_controller; void StartTorControl(CService onion_service_target) { - assert(!gBase); -#ifdef WIN32 - evthread_use_windows_threads(); -#else - evthread_use_pthreads(); -#endif - gBase = event_base_new(); - if (!gBase) { - LogWarning("tor: Unable to create event_base"); - return; - } - - torControlThread = std::thread(&util::TraceThread, "torcontrol", [onion_service_target] { - TorControlThread(onion_service_target); - }); + assert(!g_tor_controller); + g_tor_controller = std::make_unique(gArgs.GetArg("-torcontrol", DEFAULT_TOR_CONTROL), onion_service_target); } void InterruptTorControl() { - if (gBase) { - LogInfo("tor: Thread interrupt"); - event_base_once(gBase, -1, EV_TIMEOUT, [](evutil_socket_t, short, void*) { - event_base_loopbreak(gBase); - }, nullptr, nullptr); - } + if (!g_tor_controller) return; + LogInfo("tor: Thread interrupt"); + g_tor_controller->Interrupt(); } void StopTorControl() { - if (gBase) { - torControlThread.join(); - event_base_free(gBase); - gBase = nullptr; - } + if (!g_tor_controller) return; + g_tor_controller->Join(); + g_tor_controller.reset(); } CService DefaultOnionServiceTarget(uint16_t port) diff --git a/src/torcontrol.h b/src/torcontrol.h index 9f4970296cc..7dfc6207a9a 100644 --- a/src/torcontrol.h +++ b/src/torcontrol.h @@ -10,13 +10,15 @@ #include #include - -#include +#include +#include #include #include #include +#include #include +#include #include constexpr uint16_t DEFAULT_TOR_SOCKS_PORT{9050}; @@ -57,22 +59,19 @@ public: class TorControlConnection { public: - typedef std::function ConnectionCB; typedef std::function ReplyHandlerCB; /** Create a new TorControlConnection. */ - explicit TorControlConnection(struct event_base *base); + explicit TorControlConnection(CThreadInterrupt& interrupt); ~TorControlConnection(); /** * Connect to a Tor control port. * tor_control_center is address of the form host:port. - * connected is the handler that is called when connection is successfully established. - * disconnected is a handler that is called when the connection is broken. * Return true on success. */ - bool Connect(const std::string& tor_control_center, const ConnectionCB& connected, const ConnectionCB& disconnected); + bool Connect(const std::string& tor_control_center); /** * Disconnect from Tor control port. @@ -85,23 +84,38 @@ public: */ bool Command(const std::string &cmd, const ReplyHandlerCB& reply_handler); + /** + * Check if the connection is established. + */ + bool IsConnected() const; + + /** + * Wait for data to be available on the socket. + * @param[in] timeout Maximum time to wait + * @return true if data is available, false on timeout or error + */ + bool WaitForData(std::chrono::milliseconds timeout); + + /** + * Read available data from socket and process complete replies. + * Dispatches to registered reply handlers. + * @return true if connection is still open, false if connection was closed + */ + bool ReceiveAndProcess(); + private: - /** Callback when ready for use */ - std::function connected; - /** Callback when connection lost */ - std::function disconnected; - /** Libevent event base */ - struct event_base *base; - /** Connection to control socket */ - struct bufferevent* b_conn{nullptr}; + /** Reference to interrupt object for clean shutdown */ + CThreadInterrupt& m_interrupt; + /** Socket for the connection */ + std::unique_ptr m_sock; /** Message being received */ TorControlReply m_message; /** Response handlers */ - std::deque reply_handlers; - - /** Libevent handlers: internal */ - static void readcb(struct bufferevent *bev, void *ctx); - static void eventcb(struct bufferevent *bev, short what, void *ctx); + std::deque m_reply_handlers; + /** Buffer for incoming data */ + std::vector m_recv_buffer; + /** Process complete lines from the receive buffer */ + bool ProcessBuffer(); }; /****** Bitcoin specific TorController implementation ********/ @@ -112,8 +126,8 @@ private: class TorController { public: - TorController(struct event_base* base, const std::string& tor_control_center, const CService& target); - TorController() : m_conn{nullptr} { + TorController(const std::string& tor_control_center, const CService& target); + TorController() : m_conn(m_interrupt) { // Used for testing only. } ~TorController(); @@ -121,23 +135,28 @@ public: /** Get name of file to store private key in */ fs::path GetPrivateKeyFile(); - /** Reconnect, after getting disconnected */ - void Reconnect(); + /** Interrupt the controller thread */ + void Interrupt(); + + /** Wait for the controller thread to exit */ + void Join(); private: - struct event_base* base; + CThreadInterrupt m_interrupt; + std::thread m_thread; const std::string m_tor_control_center; TorControlConnection m_conn; std::string m_private_key; std::string m_service_id; - bool m_reconnect; - struct event *reconnect_ev = nullptr; - float m_reconnect_timeout; + std::atomic m_reconnect; + std::chrono::duration m_reconnect_timeout; CService m_service; const CService m_target; /** Cookie for SAFECOOKIE auth */ std::vector m_cookie; /** ClientNonce for SAFECOOKIE auth */ std::vector m_client_nonce; + /** Main control thread */ + void ThreadControl(); public: /** Callback for GETINFO net/listeners/socks result */ @@ -154,9 +173,6 @@ public: void connected_cb(TorControlConnection& conn); /** Callback after connection lost or failed connection attempt */ void disconnected_cb(TorControlConnection& conn); - - /** Callback for reconnect timer */ - static void reconnect_cb(evutil_socket_t fd, short what, void *arg); }; #endif // BITCOIN_TORCONTROL_H