From 73da2a8a52f75b20cf3adfe36ad3804c41047d81 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 26 Jun 2026 16:33:14 -0400 Subject: [PATCH 1/4] http: prevent race condition between worker thread and I/O thread This prevents a losing race condition that could prevent the server from reading requests from an HTTP client. A connected socket can either be written to or read from based on the result of GenerateWaitSockets(). That method checks the HTTPRemoteClient flag m_send_ready. If it's `true` the implication is that there is data in the client's send buffer ready to go. Once that data is sent and the buffer is empty, MaybeSendBytesFromBuffer() sets it `false` again. The sad case was when a worker thread calling WriteReply() adds data to the send buffer, but before it sets m_send_ready to `true`, the I/O thread sends that data and empties the buffer. With the buffer unexpectedly empty, WriteReply() sets m_send_ready to `true`. The effect of this is that the socket will stay in "write" mode with nothing to write. With nothing to write, MaybeSendBytesFromBuffer() never sets it back to `false` and the socket is stuck forever. --- src/httpserver.cpp | 21 ++++++++++++++++----- src/httpserver.h | 13 ++++++++----- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 75ccf6528f5..a05d19db1f2 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -595,6 +595,16 @@ void HTTPRequest::WriteReply(HTTPStatusCode status, std::span r // data. The original data will go out of scope when WriteReply() returns. // This is analogous to the memcpy() in libevent's evbuffer_add() m_client->m_send_buffer.insert(m_client->m_send_buffer.end(), reply_body.begin(), reply_body.end()); + + // If the buffer already held data, the I/O thread is (or soon will be) + // draining it, so flag that there is more data to send. This must happen + // while holding m_send_mutex and while the buffer is known non-empty: + // setting m_send_ready after releasing the lock would race with the I/O + // thread draining the buffer to empty and clearing m_send_ready in + // between, leaving m_send_ready set on an empty buffer. The I/O loop would + // then only ever poll the socket for writeability, never read the client's + // next request, and wedge the connection. + if (!send_buffer_was_empty) m_client->m_send_ready = true; } LogDebug( @@ -611,10 +621,6 @@ void HTTPRequest::WriteReply(HTTPStatusCode status, std::span r // of waiting for the next iteration of the I/O loop. if (send_buffer_was_empty) { m_client->MaybeSendBytesFromBuffer(); - } else { - // Inform HTTPServer I/O that data is ready to be sent to this client - // in the next loop iteration. - m_client->m_send_ready = true; } // Signal to the I/O loop that we are ready to handle the next request. @@ -935,7 +941,12 @@ HTTPServer::IOReadiness HTTPServer::GenerateWaitSockets() const // Check if client is ready to send data. Don't try to receive again // until the send buffer is cleared (all data sent to client). - Sock::Event event = (http_client->m_send_ready ? Sock::SendEvent : Sock::RecvEvent); + // Keep this as a separate critical section from the m_sock_mutex one above: + // never hold m_sock_mutex and m_send_mutex at the same time here. + // MaybeSendBytesFromBuffer() locks m_send_mutex then m_sock_mutex, so nesting + // them in the opposite order here would risk a lock-order inversion deadlock. + const bool send_ready{WITH_LOCK(http_client->m_send_mutex, return http_client->m_send_ready;)}; + Sock::Event event = (send_ready ? Sock::SendEvent : Sock::RecvEvent); io_readiness.events_per_sock.emplace(sock, Sock::Events{event}); io_readiness.httpclients_per_sock.emplace(sock, http_client); } diff --git a/src/httpserver.h b/src/httpserver.h index 792b3690b92..9031eb61e0a 100644 --- a/src/httpserver.h +++ b/src/httpserver.h @@ -483,11 +483,14 @@ public: /// @} /** - * Set true by worker threads after writing a response to m_send_buffer. - * Set false by the HTTPServer I/O thread after flushing m_send_buffer. - * Checked in the HTTPServer I/O loop to avoid locking m_send_mutex if there's nothing to send. - */ - std::atomic_bool m_send_ready{false}; + * Set true by worker threads after writing a response to m_send_buffer. + * Set false by the HTTPServer I/O thread after flushing m_send_buffer. + * Checked in the HTTPServer I/O loop to decide whether to poll the socket for + * writeability or readability. + * Guarded by m_send_mutex so it stays consistent with m_send_buffer's emptiness: + * the two must always be updated together under the same lock. + */ + bool m_send_ready GUARDED_BY(m_send_mutex){false}; /** * Mutex that serializes the Send() and Recv() calls on `m_sock`. Reading From 922b08d375351e313dd92fbefdb166ee27838ac0 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 26 Jun 2026 15:25:44 -0400 Subject: [PATCH 2/4] test: socket error handling in HTTPServer using ErrorSock mock socket Implements a child class of DynSock which is used as the mock socket for HTTPServer unit tests. The ErrorSock::Send() method raises a non-permanent error on the first HTTPRequest::WriteReply() and then succeeds after the second. In httpserver_tests.cpp use this mechanism to ensure that the server retries a send operation if such an error is encountered, and cover both optimistic (worker thread WriteReply()) and non-optimistic (I/O thread SocketHandlerConnected()) send paths. --- src/test/httpserver_tests.cpp | 119 +++++++++++++++++++++++++++++++++ src/test/util/setup_common.cpp | 18 ----- src/test/util/setup_common.h | 12 +++- 3 files changed, 129 insertions(+), 20 deletions(-) diff --git a/src/test/httpserver_tests.cpp b/src/test/httpserver_tests.cpp index 33c8b7769b2..8e66d3d4c4c 100644 --- a/src/test/httpserver_tests.cpp +++ b/src/test/httpserver_tests.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -628,4 +629,122 @@ BOOST_AUTO_TEST_CASE(http_server_socket_tests) server.StopListening(); } +BOOST_AUTO_TEST_CASE(http_socket_error_tests) +{ + // Hard-code the server's request handler to respond to each request with + // an incremented block count. + int height{0}; + HTTPServer server{[&](std::shared_ptr req) { + req->WriteReply(HTTP_OK, strprintf("height: %d\n", height++)); + }}; + + // All replies will be the same size + static constexpr std::size_t reply_length = std::string_view{ + "HTTP/1.1 200 OK\r\n" + "Date: Thu, 01 Jan 2026 00:00:00 GMT\r\n" // All RFC1123 dates are 29 characters + "Content-Length: 10\r\n" + "Content-Type: text/html; charset=ISO-8859-1\r\n" + "\r\n" + "height: 0\n" + }.size(); + + /** + * A mocked Sock derived from DynSock whose Send() only succeeds when there is more than + * one reply being sent (send buffer length > reply_length). Otherwise it returns + * a recoverable error (WSAEAGAIN). + * + * After it sends successfully once, it continues to always succeed. + * + * Useful for testing "try again" logic around non-blocking socket Send() failures. + */ + class ErrorSock : public DynSock + { + public: + explicit ErrorSock(std::shared_ptr pipes) : DynSock{std::move(pipes)} {} + DynSock& operator=(Sock&&) override { assert(false); return *this; } + + ssize_t Send(const void* buf, size_t len, int flags) const override + { + if (len <= reply_length && !m_have_sent) { + #ifdef WIN32 + WSASetLastError(WSAEWOULDBLOCK); + #else + errno = WSAEAGAIN; + #endif + return -1; + } else { + m_have_sent = true; + return DynSock::Send(buf, len, flags); + } + } + + mutable bool m_have_sent{false}; + }; + + // Simpler server startup than the last test + CService addr_bind{Lookup("0.0.0.0", /*portDefault=*/0, /*fAllowLookup=*/false).value()}; + BOOST_REQUIRE(server.BindAndStartListening(addr_bind)); + server.StartSocketsThreads(); + + // Prepare initial requests + int num_requests = 3; + // Use keep-alive so the server holds the connection open for all requests. + std::string keepalive_request{full_request}; + keepalive_request.replace(keepalive_request.find("Connection: close"), 17, "Connection: keep-alive"); + // Combine all requests so they are read from the socket on a single iteration of the I/O loop + std::string all_requests; + for (int i = 0; i < num_requests; i++) { + all_requests += keepalive_request; + } + + // Watch the log messages to ensure that the first two replies were sent + // together. This indicates the non-optimistic send path was used + // because a reply was already sitting in the send buffer when a second reply + // was added. + DebugLogHelper find_two_replies{strprintf("Sent %d bytes to client", reply_length * 2), + [&](const std::string* s) { + return true; + }}; + // Last reply should be sent on its own by optimistic send path, because + // the send buffer was empty when the reply was written. + DebugLogHelper find_one_reply{strprintf("Sent %d bytes to client", reply_length), + [&](const std::string* s) { + return true; + }}; + + // Connect the ErrorSock as mock client with the preloaded data and get a handle on the I/O pipes + std::shared_ptr mock_client_socket_pipes{ + ConnectClient(std::as_bytes(std::span(all_requests))) + }; + + // Wait up to one minute for the last reply from the server + std::string actual; + char buf[0x10000] = {}; + int attempts = 1000; + while (attempts > 0) + { + ssize_t bytes_read = mock_client_socket_pipes->send.GetBytes(buf, sizeof(buf), 0); + if (bytes_read > 0) { + actual.append(buf, bytes_read); + if (actual.find(strprintf("height: %d", num_requests - 1)) != std::string::npos) { + break; + } + } + std::this_thread::sleep_for(10ms); + --attempts; + } + + // All replies were received + for (int i = 0; i < num_requests; i++) { + BOOST_REQUIRE(actual.find(strprintf("height: %d", i)) != std::string::npos); + } + + // Close the keep-alive connection + server.DisconnectAllClients(); + + server.InterruptNet(); + server.JoinSocketsThreads(); + server.StopListening(); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 830fca0ea9e..bcf637784d7 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -657,24 +657,6 @@ SocketTestingSetup::~SocketTestingSetup() CreateSock = m_create_sock_orig; } -std::shared_ptr SocketTestingSetup::ConnectClient(std::span data) -{ - // I/O pipes for a mock Connected Socket we can read and write to. - auto connected_socket_pipes(std::make_shared()); - - // Insert the payload - connected_socket_pipes->recv.PushBytes(data.data(), data.size()); - - // Create the Mock Connected Socket that represents a client. - // It needs I/O pipes but its queue can remain empty - std::unique_ptr connected_socket{std::make_unique(connected_socket_pipes)}; - - // Push into the queue of Accepted Sockets returned by the local CreateSock() - m_accepted_sockets.Push(std::move(connected_socket)); - - return connected_socket_pipes; -} - /** * @returns a real block (0000000000013b8ab2cd513b0261a14096412195a72a0c4827d229dcc7e0f7af) * with 9 txs. diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index e44c7e728bf..7818224e554 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -257,10 +257,18 @@ public: ~SocketTestingSetup(); /** - * Connect to the socket with a mock client (a DynSock) and send pre-loaded data. + * Connect to the socket with a mock client and send pre-loaded data. * Returns the I/O pipes from the mock client so we can read response data sent to it. + * Template parameter selects the socket type: DynSock by default. */ - std::shared_ptr ConnectClient(std::span data); + template + std::shared_ptr ConnectClient(std::span data) + { + auto connected_socket_pipes(std::make_shared()); + connected_socket_pipes->recv.PushBytes(data.data(), data.size()); + m_accepted_sockets.Push(std::make_unique(connected_socket_pipes)); + return connected_socket_pipes; + } private: //! Save the original value of CreateSock here and restore it when the test ends. From b98b10c07236cd37d96f14e4a220af11dc8a0fc6 Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 26 Jun 2026 15:32:41 -0400 Subject: [PATCH 3/4] test: introduce a worker thread in http socket error test --- src/test/httpserver_tests.cpp | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/test/httpserver_tests.cpp b/src/test/httpserver_tests.cpp index 8e66d3d4c4c..ff1eb22d703 100644 --- a/src/test/httpserver_tests.cpp +++ b/src/test/httpserver_tests.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -631,11 +632,20 @@ BOOST_AUTO_TEST_CASE(http_server_socket_tests) BOOST_AUTO_TEST_CASE(http_socket_error_tests) { + // Create a tiny threadpool for the HTTPRequest handler + ThreadPool workers("http"); + workers.Start(1); + // Hard-code the server's request handler to respond to each request with - // an incremented block count. - int height{0}; + // an incremented block count. Handle the replies in the worker thread. + std::atomic height{0}; HTTPServer server{[&](std::shared_ptr req) { - req->WriteReply(HTTP_OK, strprintf("height: %d\n", height++)); + auto item = [req, &height]() { + const int h = height.fetch_add(1); + req->WriteReply(HTTP_OK, strprintf("height: %d\n", h)); + }; + // Can't call BOOST_REQUIRE from worker thread + Assert(workers.Submit(std::move(item))); }}; // All replies will be the same size @@ -742,6 +752,8 @@ BOOST_AUTO_TEST_CASE(http_socket_error_tests) // Close the keep-alive connection server.DisconnectAllClients(); + workers.Stop(); + server.InterruptNet(); server.JoinSocketsThreads(); server.StopListening(); From f595daf1dd01e9730e0eafbc46b2e22eeb9f33fe Mon Sep 17 00:00:00 2001 From: Matthew Zipkin Date: Fri, 26 Jun 2026 16:09:14 -0400 Subject: [PATCH 4/4] test: ensure HTTPServer race condition is fixed The result of WriteReply() losing the race condition would prevent any new requests being read from the socket. The socket error test sent 3 requests all at once after connecting, so in this commit we separate the the third request to make the losing race condition more likely, and make its effect more obvious. --- src/test/httpserver_tests.cpp | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/test/httpserver_tests.cpp b/src/test/httpserver_tests.cpp index ff1eb22d703..aae899202d7 100644 --- a/src/test/httpserver_tests.cpp +++ b/src/test/httpserver_tests.cpp @@ -697,7 +697,7 @@ BOOST_AUTO_TEST_CASE(http_socket_error_tests) server.StartSocketsThreads(); // Prepare initial requests - int num_requests = 3; + int num_requests = 2; // Use keep-alive so the server holds the connection open for all requests. std::string keepalive_request{full_request}; keepalive_request.replace(keepalive_request.find("Connection: close"), 17, "Connection: keep-alive"); @@ -730,7 +730,34 @@ BOOST_AUTO_TEST_CASE(http_socket_error_tests) // Wait up to one minute for the last reply from the server std::string actual; char buf[0x10000] = {}; - int attempts = 1000; + int attempts = 6000; + while (attempts > 0) + { + ssize_t bytes_read = mock_client_socket_pipes->send.GetBytes(buf, sizeof(buf), 0); + if (bytes_read > 0) { + actual.append(buf, bytes_read); + if (actual.find(strprintf("height: %d", num_requests - 1)) != std::string::npos) { + break; + } + } + std::this_thread::sleep_for(10ms); + --attempts; + } + + // Send the third request. + // If there was a race between WriteReply() in the worker thread setting m_send_ready=true + // and SocketHandlerConnected() in the I/O thread flushing the send buffer, + // then the socket would be stuck in write mode with nothing to write, + // the server would never read from the socket, and this request would time out. + // Wait a second to ensure both the worker thread and I/O thread are idle. + // If we send the next request too soon it might get accepted by the server before + // it gets wedged shut. + std::this_thread::sleep_for(1000ms); + mock_client_socket_pipes->recv.PushBytes(keepalive_request.data(), keepalive_request.size()); + num_requests++; + + // Wait up to one minute for reply + attempts = 6000; while (attempts > 0) { ssize_t bytes_read = mock_client_socket_pipes->send.GetBytes(buf, sizeof(buf), 0);