torcontrol: Remove libevent usage

Replace libevent-based approach with using the Sock class and CThreadInterrupt.
This commit is contained in:
Fabian Jahr
2025-12-27 01:29:04 +01:00
parent 8444efbd4a
commit eae193e750
3 changed files with 263 additions and 217 deletions

View File

@@ -14,12 +14,14 @@
class DummyTorControlConnection : public TorControlConnection
{
CThreadInterrupt m_dummy_interrupt;
public:
DummyTorControlConnection() : TorControlConnection{nullptr}
DummyTorControlConnection() : TorControlConnection{m_dummy_interrupt}
{
}
bool Connect(const std::string&, const ConnectionCB&, const ConnectionCB&)
bool Connect(const std::string&)
{
return true;
}

View File

@@ -26,6 +26,7 @@
#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <deque>
@@ -37,12 +38,6 @@
#include <utility>
#include <vector>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/thread.h>
#include <event2/util.h>
using util::ReplaceAll;
using util::SplitString;
using util::ToString;
@@ -58,146 +53,172 @@ static const std::string TOR_SAFE_SERVERKEY = "Tor safe cookie authentication se
/** For computing clientHash in SAFECOOKIE */
static const std::string TOR_SAFE_CLIENTKEY = "Tor safe cookie authentication controller-to-server hash";
/** Exponential backoff configuration - initial timeout in seconds */
constexpr float RECONNECT_TIMEOUT_START = 1.0;
constexpr std::chrono::duration<double> RECONNECT_TIMEOUT_START{1.0};
/** Exponential backoff configuration - growth factor */
constexpr float RECONNECT_TIMEOUT_EXP = 1.5;
constexpr double RECONNECT_TIMEOUT_EXP = 1.5;
/** Maximum reconnect timeout in seconds to prevent excessive delays */
constexpr float RECONNECT_TIMEOUT_MAX = 600.0;
constexpr std::chrono::duration<double> RECONNECT_TIMEOUT_MAX{600.0};
/** Maximum length for lines received on TorControlConnection.
* tor-control-spec.txt mentions that there is explicitly no limit defined to line length,
* this is belt-and-suspenders sanity limit to prevent memory exhaustion.
*/
constexpr int MAX_LINE_LENGTH = 100000;
/** Timeout for socket operations */
constexpr auto SOCKET_SEND_TIMEOUT = 10s;
/****** Low-level TorControlConnection ********/
TorControlConnection::TorControlConnection(struct event_base* _base)
: base(_base)
TorControlConnection::TorControlConnection(CThreadInterrupt& interrupt)
: m_interrupt(interrupt)
{
}
TorControlConnection::~TorControlConnection()
{
if (b_conn)
bufferevent_free(b_conn);
Disconnect();
}
void TorControlConnection::readcb(struct bufferevent *bev, void *ctx)
bool TorControlConnection::Connect(const std::string& tor_control_center)
{
TorControlConnection *self = static_cast<TorControlConnection*>(ctx);
struct evbuffer *input = bufferevent_get_input(bev);
size_t n_read_out = 0;
char *line;
assert(input);
// If there is not a whole line to read, evbuffer_readln returns nullptr
while((line = evbuffer_readln(input, &n_read_out, EVBUFFER_EOL_CRLF)) != nullptr)
{
std::string s(line, n_read_out);
free(line);
if (s.size() < 4) // Short line
continue;
// <status>(-|+| )<data><CRLF>
self->m_message.code = ToIntegral<int>(s.substr(0, 3)).value_or(0);
self->m_message.lines.push_back(s.substr(4));
char ch = s[3]; // '-','+' or ' '
if (ch == ' ') {
// Final line, dispatch reply and clean up
if (self->m_message.code >= 600) {
// (currently unused)
// Dispatch async notifications to async handler
// Synchronous and asynchronous messages are never interleaved
} else {
if (!self->reply_handlers.empty()) {
// Invoke reply handler with message
self->reply_handlers.front()(*self, self->m_message);
self->reply_handlers.pop_front();
} else {
LogDebug(BCLog::TOR, "Received unexpected sync reply %i", self->m_message.code);
}
}
self->m_message.Clear();
}
}
// Check for size of buffer - protect against memory exhaustion with very long lines
// Do this after evbuffer_readln to make sure all full lines have been
// removed from the buffer. Everything left is an incomplete line.
if (evbuffer_get_length(input) > MAX_LINE_LENGTH) {
LogWarning("tor: Disconnecting because MAX_LINE_LENGTH exceeded");
self->Disconnect();
}
}
void TorControlConnection::eventcb(struct bufferevent *bev, short what, void *ctx)
{
TorControlConnection *self = static_cast<TorControlConnection*>(ctx);
if (what & BEV_EVENT_CONNECTED) {
LogDebug(BCLog::TOR, "Successfully connected!");
self->connected(*self);
} else if (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR)) {
if (what & BEV_EVENT_ERROR) {
LogDebug(BCLog::TOR, "Error connecting to Tor control socket");
} else {
LogDebug(BCLog::TOR, "End of stream");
}
self->Disconnect();
self->disconnected(*self);
}
}
bool TorControlConnection::Connect(const std::string& tor_control_center, const ConnectionCB& _connected, const ConnectionCB& _disconnected)
{
if (b_conn) {
if (m_sock) {
Disconnect();
}
const std::optional<CService> control_service{Lookup(tor_control_center, DEFAULT_TOR_CONTROL_PORT, fNameLookup)};
std::optional<CService> control_service = Lookup(tor_control_center, DEFAULT_TOR_CONTROL_PORT, fNameLookup);
if (!control_service.has_value()) {
LogWarning("tor: Failed to look up control center %s", tor_control_center);
return false;
}
struct sockaddr_storage control_address;
socklen_t control_address_len = sizeof(control_address);
if (!control_service.value().GetSockAddr(reinterpret_cast<struct sockaddr*>(&control_address), &control_address_len)) {
LogWarning("tor: Error parsing socket address %s", tor_control_center);
return false;
}
// Create a new socket, set up callbacks and enable notification bits
b_conn = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
if (!b_conn) {
return false;
}
bufferevent_setcb(b_conn, TorControlConnection::readcb, nullptr, TorControlConnection::eventcb, this);
bufferevent_enable(b_conn, EV_READ|EV_WRITE);
this->connected = _connected;
this->disconnected = _disconnected;
// Finally, connect to tor_control_center
if (bufferevent_socket_connect(b_conn, reinterpret_cast<struct sockaddr*>(&control_address), control_address_len) < 0) {
m_sock = ConnectDirectly(control_service.value(), /*manual_connection=*/true);
if (!m_sock) {
LogWarning("tor: Error connecting to address %s", tor_control_center);
return false;
}
m_recv_buffer.clear();
m_message.Clear();
m_reply_handlers.clear();
LogDebug(BCLog::TOR, "Successfully connected to Tor control port");
return true;
}
void TorControlConnection::Disconnect()
{
if (b_conn)
bufferevent_free(b_conn);
b_conn = nullptr;
m_sock.reset();
m_recv_buffer.clear();
m_message.Clear();
m_reply_handlers.clear();
}
bool TorControlConnection::IsConnected() const
{
if (!m_sock) return false;
std::string errmsg;
const bool connected{m_sock->IsConnected(errmsg)};
if (!connected && !errmsg.empty()) {
LogDebug(BCLog::TOR, "Connection check failed: %s", errmsg);
}
return connected;
}
bool TorControlConnection::WaitForData(std::chrono::milliseconds timeout)
{
if (!m_sock) return false;
Sock::Event event{0};
if (!m_sock->Wait(timeout, Sock::RECV, &event)) {
return false;
}
if (event & Sock::ERR) {
LogDebug(BCLog::TOR, "Socket error detected");
Disconnect();
return false;
}
return (event & Sock::RECV);
}
bool TorControlConnection::ReceiveAndProcess()
{
if (!m_sock) return false;
std::byte buf[4096];
ssize_t nread = m_sock->Recv(buf, sizeof(buf), MSG_DONTWAIT);
if (nread < 0) {
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK || err == WSAEINTR || err == WSAEINPROGRESS) {
// No data available currently
return true;
}
LogWarning("tor: Error reading from socket: %s", NetworkErrorString(err));
return false;
}
if (nread == 0) {
LogDebug(BCLog::TOR, "End of stream");
return false;
}
m_recv_buffer.insert(m_recv_buffer.end(), buf, buf + nread);
try {
return ProcessBuffer();
} catch (const std::runtime_error& e) {
LogWarning("tor: Error processing receive buffer: %s", e.what());
return false;
}
}
bool TorControlConnection::ProcessBuffer()
{
util::LineReader reader(m_recv_buffer, MAX_LINE_LENGTH);
auto start = reader.it;
while (auto line = reader.ReadLine()) {
// Skip short lines
if (line->size() < 4) continue;
// Parse: <code><separator><data>
// <status>(-|+| )<data>
m_message.code = ToIntegral<int>(line->substr(0, 3)).value_or(0);
m_message.lines.push_back(line->substr(4));
char separator = (*line)[3]; // '-', '+', or ' '
if (separator == ' ') {
if (m_message.code >= 600) {
// Async notifications are currently unused
// Synchronous and asynchronous messages are never interleaved
LogDebug(BCLog::TOR, "Received async notification %i", m_message.code);
} else if (!m_reply_handlers.empty()) {
// Invoke reply handler with message
m_reply_handlers.front()(*this, m_message);
m_reply_handlers.pop_front();
} else {
LogDebug(BCLog::TOR, "Received unexpected sync reply %i", m_message.code);
}
m_message.Clear();
}
}
m_recv_buffer.erase(m_recv_buffer.begin(), m_recv_buffer.begin() + std::distance(start, reader.it));
return true;
}
bool TorControlConnection::Command(const std::string &cmd, const ReplyHandlerCB& reply_handler)
{
if (!b_conn)
if (!m_sock) return false;
std::string command = cmd + "\r\n";
try {
m_sock->SendComplete(std::span<const char>{command}, SOCKET_SEND_TIMEOUT, m_interrupt);
} catch (const std::runtime_error& e) {
LogWarning("tor: Error sending command: %s", e.what());
return false;
struct evbuffer *buf = bufferevent_get_output(b_conn);
if (!buf)
return false;
evbuffer_add(buf, cmd.data(), cmd.size());
evbuffer_add(buf, "\r\n", 2);
reply_handlers.push_back(reply_handler);
}
m_reply_handlers.push_back(reply_handler);
return true;
}
@@ -320,38 +341,89 @@ std::map<std::string,std::string> ParseTorReplyMapping(const std::string &s)
return mapping;
}
TorController::TorController(struct event_base* _base, const std::string& tor_control_center, const CService& target):
base(_base),
m_tor_control_center(tor_control_center), m_conn(base), m_reconnect(true), m_reconnect_timeout(RECONNECT_TIMEOUT_START),
m_target(target)
TorController::TorController(const std::string& tor_control_center, const CService& target)
: m_tor_control_center(tor_control_center),
m_conn(m_interrupt),
m_reconnect(true),
m_reconnect_timeout(RECONNECT_TIMEOUT_START),
m_target(target)
{
reconnect_ev = event_new(base, -1, 0, reconnect_cb, this);
if (!reconnect_ev)
LogWarning("tor: Failed to create event for reconnection: out of memory?");
// Start connection attempts immediately
if (!m_conn.Connect(m_tor_control_center, std::bind_front(&TorController::connected_cb, this),
std::bind_front(&TorController::disconnected_cb, this) )) {
LogWarning("tor: Initiating connection to Tor control port %s failed", m_tor_control_center);
}
// Read service private key if cached
std::pair<bool,std::string> pkf = ReadBinaryFile(GetPrivateKeyFile());
if (pkf.first) {
LogDebug(BCLog::TOR, "Reading cached private key from %s", fs::PathToString(GetPrivateKeyFile()));
m_private_key = pkf.second;
}
m_thread = std::thread(&util::TraceThread, "torcontrol", [this] { ThreadControl(); });
}
TorController::~TorController()
{
if (reconnect_ev) {
event_free(reconnect_ev);
reconnect_ev = nullptr;
}
Interrupt();
Join();
if (m_service.IsValid()) {
RemoveLocal(m_service);
}
}
void TorController::Interrupt()
{
m_reconnect = false;
m_interrupt();
}
void TorController::Join()
{
if (m_thread.joinable()) {
m_thread.join();
}
}
void TorController::ThreadControl()
{
LogDebug(BCLog::TOR, "Entering Tor control thread");
while (!m_interrupt) {
// Try to connect if not connected already
if (!m_conn.IsConnected()) {
LogDebug(BCLog::TOR, "Attempting to connect to Tor control port %s", m_tor_control_center);
if (!m_conn.Connect(m_tor_control_center)) {
LogWarning("tor: Initiating connection to Tor control port %s failed", m_tor_control_center);
if (!m_reconnect) {
break;
}
// Wait before retrying with exponential backoff
LogDebug(BCLog::TOR, "Retrying in %.1f seconds", m_reconnect_timeout.count());
if (!m_interrupt.sleep_for(std::chrono::duration_cast<std::chrono::milliseconds>(m_reconnect_timeout))) {
break;
}
m_reconnect_timeout = std::min(m_reconnect_timeout * RECONNECT_TIMEOUT_EXP, RECONNECT_TIMEOUT_MAX);
continue;
}
// Successfully connected, reset timeout and trigger connected callback
m_reconnect_timeout = RECONNECT_TIMEOUT_START;
connected_cb(m_conn);
}
// Wait for data with a timeout
if (!m_conn.WaitForData(std::chrono::seconds(1))) {
// Check if still connected
if (!m_conn.IsConnected()) {
LogDebug(BCLog::TOR, "Lost connection to Tor control port");
disconnected_cb(m_conn);
continue;
}
// Just a timeout, continue waiting
continue;
}
// Process incoming data
if (!m_conn.ReceiveAndProcess()) {
disconnected_cb(m_conn);
}
}
LogDebug(BCLog::TOR, "Exited Tor control thread");
}
void TorController::get_socks_cb(TorControlConnection& _conn, const TorControlReply& reply)
{
// NOTE: We can only get here if -onion is unset
@@ -515,7 +587,7 @@ void TorController::auth_cb(TorControlConnection& _conn, const TorControlReply&
* CookieString | ClientNonce | ServerNonce)
*
*/
static std::vector<uint8_t> ComputeResponse(const std::string &key, const std::vector<uint8_t> &cookie, const std::vector<uint8_t> &client_nonce, const std::vector<uint8_t> &server_nonce)
static std::vector<uint8_t> ComputeResponse(std::string_view key, std::span<const uint8_t> cookie, std::span<const uint8_t> client_nonce, std::span<const uint8_t> server_nonce)
{
CHMAC_SHA256 computeHash((const uint8_t*)key.data(), key.size());
std::vector<uint8_t> computedHash(CHMAC_SHA256::OUTPUT_SIZE, 0);
@@ -654,26 +726,8 @@ void TorController::disconnected_cb(TorControlConnection& _conn)
if (!m_reconnect)
return;
LogDebug(BCLog::TOR, "Not connected to Tor control port %s, retrying in %.2f s",
m_tor_control_center, m_reconnect_timeout);
// Single-shot timer for reconnect. Use exponential backoff with a maximum.
struct timeval time = MillisToTimeval(int64_t(m_reconnect_timeout * 1000.0));
if (reconnect_ev)
event_add(reconnect_ev, &time);
m_reconnect_timeout = std::min(m_reconnect_timeout * RECONNECT_TIMEOUT_EXP, RECONNECT_TIMEOUT_MAX);
}
void TorController::Reconnect()
{
/* Try to reconnect and reestablish if we get booted - for example, Tor
* may be restarting.
*/
if (!m_conn.Connect(m_tor_control_center, std::bind_front(&TorController::connected_cb, this),
std::bind_front(&TorController::disconnected_cb, this) )) {
LogWarning("tor: Re-initiating connection to Tor control port %s failed", m_tor_control_center);
}
LogDebug(BCLog::TOR, "Not connected to Tor control port %s, will retry", m_tor_control_center);
_conn.Disconnect();
}
fs::path TorController::GetPrivateKeyFile()
@@ -681,59 +735,33 @@ fs::path TorController::GetPrivateKeyFile()
return gArgs.GetDataDirNet() / "onion_v3_private_key";
}
void TorController::reconnect_cb(evutil_socket_t fd, short what, void *arg)
{
TorController *self = static_cast<TorController*>(arg);
self->Reconnect();
}
/****** Thread ********/
static struct event_base *gBase;
static std::thread torControlThread;
static void TorControlThread(CService onion_service_target)
{
TorController ctrl(gBase, gArgs.GetArg("-torcontrol", DEFAULT_TOR_CONTROL), onion_service_target);
event_base_dispatch(gBase);
}
/**
* TODO: TBD if introducing a global is the preferred approach here since we
* usually try to avoid them. We could let init manage the lifecycle or make
* this a part of NodeContext maybe instead.
*/
static std::unique_ptr<TorController> g_tor_controller;
void StartTorControl(CService onion_service_target)
{
assert(!gBase);
#ifdef WIN32
evthread_use_windows_threads();
#else
evthread_use_pthreads();
#endif
gBase = event_base_new();
if (!gBase) {
LogWarning("tor: Unable to create event_base");
return;
}
torControlThread = std::thread(&util::TraceThread, "torcontrol", [onion_service_target] {
TorControlThread(onion_service_target);
});
assert(!g_tor_controller);
g_tor_controller = std::make_unique<TorController>(gArgs.GetArg("-torcontrol", DEFAULT_TOR_CONTROL), onion_service_target);
}
void InterruptTorControl()
{
if (gBase) {
LogInfo("tor: Thread interrupt");
event_base_once(gBase, -1, EV_TIMEOUT, [](evutil_socket_t, short, void*) {
event_base_loopbreak(gBase);
}, nullptr, nullptr);
}
if (!g_tor_controller) return;
LogInfo("tor: Thread interrupt");
g_tor_controller->Interrupt();
}
void StopTorControl()
{
if (gBase) {
torControlThread.join();
event_base_free(gBase);
gBase = nullptr;
}
if (!g_tor_controller) return;
g_tor_controller->Join();
g_tor_controller.reset();
}
CService DefaultOnionServiceTarget(uint16_t port)

View File

@@ -10,13 +10,15 @@
#include <netaddress.h>
#include <util/fs.h>
#include <event2/util.h>
#include <util/sock.h>
#include <util/threadinterrupt.h>
#include <cstdint>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <vector>
constexpr uint16_t DEFAULT_TOR_SOCKS_PORT{9050};
@@ -57,22 +59,19 @@ public:
class TorControlConnection
{
public:
typedef std::function<void(TorControlConnection&)> ConnectionCB;
typedef std::function<void(TorControlConnection &,const TorControlReply &)> ReplyHandlerCB;
/** Create a new TorControlConnection.
*/
explicit TorControlConnection(struct event_base *base);
explicit TorControlConnection(CThreadInterrupt& interrupt);
~TorControlConnection();
/**
* Connect to a Tor control port.
* tor_control_center is address of the form host:port.
* connected is the handler that is called when connection is successfully established.
* disconnected is a handler that is called when the connection is broken.
* Return true on success.
*/
bool Connect(const std::string& tor_control_center, const ConnectionCB& connected, const ConnectionCB& disconnected);
bool Connect(const std::string& tor_control_center);
/**
* Disconnect from Tor control port.
@@ -85,23 +84,38 @@ public:
*/
bool Command(const std::string &cmd, const ReplyHandlerCB& reply_handler);
/**
* Check if the connection is established.
*/
bool IsConnected() const;
/**
* Wait for data to be available on the socket.
* @param[in] timeout Maximum time to wait
* @return true if data is available, false on timeout or error
*/
bool WaitForData(std::chrono::milliseconds timeout);
/**
* Read available data from socket and process complete replies.
* Dispatches to registered reply handlers.
* @return true if connection is still open, false if connection was closed
*/
bool ReceiveAndProcess();
private:
/** Callback when ready for use */
std::function<void(TorControlConnection&)> connected;
/** Callback when connection lost */
std::function<void(TorControlConnection&)> disconnected;
/** Libevent event base */
struct event_base *base;
/** Connection to control socket */
struct bufferevent* b_conn{nullptr};
/** Reference to interrupt object for clean shutdown */
CThreadInterrupt& m_interrupt;
/** Socket for the connection */
std::unique_ptr<Sock> m_sock;
/** Message being received */
TorControlReply m_message;
/** Response handlers */
std::deque<ReplyHandlerCB> reply_handlers;
/** Libevent handlers: internal */
static void readcb(struct bufferevent *bev, void *ctx);
static void eventcb(struct bufferevent *bev, short what, void *ctx);
std::deque<ReplyHandlerCB> m_reply_handlers;
/** Buffer for incoming data */
std::vector<std::byte> m_recv_buffer;
/** Process complete lines from the receive buffer */
bool ProcessBuffer();
};
/****** Bitcoin specific TorController implementation ********/
@@ -112,8 +126,8 @@ private:
class TorController
{
public:
TorController(struct event_base* base, const std::string& tor_control_center, const CService& target);
TorController() : m_conn{nullptr} {
TorController(const std::string& tor_control_center, const CService& target);
TorController() : m_conn(m_interrupt) {
// Used for testing only.
}
~TorController();
@@ -121,23 +135,28 @@ public:
/** Get name of file to store private key in */
fs::path GetPrivateKeyFile();
/** Reconnect, after getting disconnected */
void Reconnect();
/** Interrupt the controller thread */
void Interrupt();
/** Wait for the controller thread to exit */
void Join();
private:
struct event_base* base;
CThreadInterrupt m_interrupt;
std::thread m_thread;
const std::string m_tor_control_center;
TorControlConnection m_conn;
std::string m_private_key;
std::string m_service_id;
bool m_reconnect;
struct event *reconnect_ev = nullptr;
float m_reconnect_timeout;
std::atomic<bool> m_reconnect;
std::chrono::duration<double> m_reconnect_timeout;
CService m_service;
const CService m_target;
/** Cookie for SAFECOOKIE auth */
std::vector<uint8_t> m_cookie;
/** ClientNonce for SAFECOOKIE auth */
std::vector<uint8_t> m_client_nonce;
/** Main control thread */
void ThreadControl();
public:
/** Callback for GETINFO net/listeners/socks result */
@@ -154,9 +173,6 @@ public:
void connected_cb(TorControlConnection& conn);
/** Callback after connection lost or failed connection attempt */
void disconnected_cb(TorControlConnection& conn);
/** Callback for reconnect timer */
static void reconnect_cb(evutil_socket_t fd, short what, void *arg);
};
#endif // BITCOIN_TORCONTROL_H