From 59d24bd5dd2a4549888cf7c557461e6b4959f82f Mon Sep 17 00:00:00 2001 From: furszy Date: Thu, 12 Feb 2026 14:08:22 -0500 Subject: [PATCH 1/2] threadpool: make Submit return Expected instead of throwing Unlike exceptions, which can be ignored as they require extra try-catch blocks, returning expected errors forces callers to always handle submission failures. Not throwing an exception also fixes an unclean shutdown bug #34573 since we no longer throw when attempting to Submit() from the libevent callback http_request_cb(). --- src/test/fuzz/threadpool.cpp | 4 +-- src/test/threadpool_tests.cpp | 48 ++++++++++++++++++++--------------- src/util/threadpool.h | 47 +++++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 35 deletions(-) diff --git a/src/test/fuzz/threadpool.cpp b/src/test/fuzz/threadpool.cpp index 293aa63e0fe..a5b01db1383 100644 --- a/src/test/fuzz/threadpool.cpp +++ b/src/test/fuzz/threadpool.cpp @@ -87,10 +87,10 @@ FUZZ_TARGET(threadpool, .init = setup_threadpool_test) EXCLUSIVE_LOCKS_REQUIRED( std::future fut; if (will_throw) { expected_fail_tasks++; - fut = g_pool.Submit(ThrowTask{}); + fut = *Assert(g_pool.Submit(ThrowTask{})); } else { expected_task_counter++; - fut = g_pool.Submit(CounterTask{task_counter}); + fut = *Assert(g_pool.Submit(CounterTask{task_counter})); } // If caller wants to wait immediately, consume the future here (safe). diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp index 46acf1d67d6..850cd16650d 100644 --- a/src/test/threadpool_tests.cpp +++ b/src/test/threadpool_tests.cpp @@ -44,6 +44,13 @@ BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture) } \ } while (0) +// Helper to unwrap a valid pool submission +template +[[nodiscard]] auto Submit(ThreadPool& pool, F&& fn) +{ + return std::move(*Assert(pool.Submit(std::forward(fn)))); +} + // Block a number of worker threads by submitting tasks that wait on `blocker_future`. // Returns the futures of the blocking tasks, ensuring all have started and are waiting. std::vector> BlockWorkers(ThreadPool& threadPool, const std::shared_future& blocker_future, int num_of_threads_to_block) @@ -58,7 +65,7 @@ std::vector> BlockWorkers(ThreadPool& threadPool, const std::s std::vector> blocking_tasks; for (int i = 0; i < num_of_threads_to_block; i++) { std::promise& ready = ready_promises[i]; - blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() { + blocking_tasks.emplace_back(Submit(threadPool, [blocker_future, &ready]() { ready.set_value(); blocker_future.wait(); })); @@ -73,10 +80,9 @@ std::vector> BlockWorkers(ThreadPool& threadPool, const std::s BOOST_AUTO_TEST_CASE(submit_task_before_start_fails) { ThreadPool threadPool(POOL_NAME); - BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) { - BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks"); - return true; - }); + auto res = threadPool.Submit([]{ return false; }); + BOOST_CHECK(!res); + BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers"); } // Test 1, submit tasks and verify completion @@ -92,7 +98,7 @@ BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully) std::vector> futures; futures.reserve(num_tasks); for (int i = 1; i <= num_tasks; i++) { - futures.emplace_back(threadPool.Submit([&counter, i]() { + futures.emplace_back(Submit(threadPool, [&counter, i]() { counter.fetch_add(i, std::memory_order_relaxed); })); } @@ -121,7 +127,7 @@ BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks) // Store futures to wait on std::vector> futures(num_tasks); - for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; }); + for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; }); WAIT_FOR(futures); BOOST_CHECK_EQUAL(counter, num_tasks); @@ -138,7 +144,7 @@ BOOST_AUTO_TEST_CASE(wait_for_task_to_finish) ThreadPool threadPool(POOL_NAME); threadPool.Start(NUM_WORKERS_DEFAULT); std::atomic flag = false; - std::future future = threadPool.Submit([&flag]() { + std::future future = Submit(threadPool, [&flag]() { UninterruptibleSleep(200ms); flag.store(true, std::memory_order_release); }); @@ -151,10 +157,10 @@ BOOST_AUTO_TEST_CASE(get_result_from_completed_task) { ThreadPool threadPool(POOL_NAME); threadPool.Start(NUM_WORKERS_DEFAULT); - std::future future_bool = threadPool.Submit([]() { return true; }); + std::future future_bool = Submit(threadPool, []() { return true; }); BOOST_CHECK(future_bool.get()); - std::future future_str = threadPool.Submit([]() { return std::string("true"); }); + std::future future_str = Submit(threadPool, []() { return std::string("true"); }); std::string result = future_str.get(); BOOST_CHECK_EQUAL(result, "true"); } @@ -170,7 +176,7 @@ BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future) std::vector> futures; futures.reserve(num_tasks); for (int i = 0; i < num_tasks; i++) { - futures.emplace_back(threadPool.Submit([err_msg, i]() { + futures.emplace_back(Submit(threadPool, [err_msg, i]() { throw std::runtime_error(err_msg + util::ToString(i)); })); } @@ -197,7 +203,7 @@ BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy) int num_tasks = 20; std::atomic counter = 0; for (int i = 0; i < num_tasks; i++) { - (void)threadPool.Submit([&counter]() { + (void)Submit(threadPool, [&counter]() { counter.fetch_add(1, std::memory_order_relaxed); }); } @@ -222,8 +228,8 @@ BOOST_AUTO_TEST_CASE(recursive_task_submission) threadPool.Start(NUM_WORKERS_DEFAULT); std::promise signal; - (void)threadPool.Submit([&]() { - (void)threadPool.Submit([&]() { + (void)Submit(threadPool, [&]() { + (void)Submit(threadPool, [&]() { signal.set_value(); }); }); @@ -243,7 +249,7 @@ BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes) const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); // Submit an extra task that should execute once a worker is free - std::future future = threadPool.Submit([]() { return true; }); + std::future future = Submit(threadPool, []() { return true; }); // At this point, all workers are blocked, and the extra task is queued BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1); @@ -280,7 +286,7 @@ BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores) std::vector> futures; futures.reserve(num_tasks); for (int i = 0; i < num_tasks; i++) { - futures.emplace_back(threadPool.Submit([&counter] { + futures.emplace_back(Submit(threadPool, [&counter] { counter.fetch_add(1, std::memory_order_relaxed); })); } @@ -296,10 +302,10 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) ThreadPool threadPool(POOL_NAME); threadPool.Start(NUM_WORKERS_DEFAULT); threadPool.Interrupt(); - BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) { - BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks"); - return true; - }); + + auto res = threadPool.Submit([]{}); + BOOST_CHECK(!res); + BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted"); // Reset pool threadPool.Stop(); @@ -310,7 +316,7 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) std::atomic counter{0}; std::promise blocker; const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1); - threadPool.Submit([&threadPool, &counter]{ + Submit(threadPool, [&threadPool, &counter]{ threadPool.Interrupt(); counter.fetch_add(1, std::memory_order_relaxed); }).get(); diff --git a/src/util/threadpool.h b/src/util/threadpool.h index b75a94157eb..c039b59c4de 100644 --- a/src/util/threadpool.h +++ b/src/util/threadpool.h @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -15,8 +16,8 @@ #include #include #include -#include #include +#include #include #include @@ -143,28 +144,39 @@ public: // Note: m_interrupt is left true until next Start() } + enum class SubmitError { + Inactive, + Interrupted, + }; + /** * @brief Enqueues a new task for asynchronous execution. * - * Returns a `std::future` that provides the task's result or propagates - * any exception it throws. - * Note: Ignoring the returned future requires guarding the task against - * uncaught exceptions, as they would otherwise be silently discarded. + * @param fn Callable to execute asynchronously. + * @return On success, a future containing fn's result. + * On failure, an error indicating why the task was rejected: + * - SubmitError::Inactive: Pool has no workers (never started or already stopped). + * - SubmitError::Interrupted: Pool task acceptance has been interrupted. + * + * Thread-safe: Can be called from any thread, including within the provided 'fn' callable. + * + * @warning Ignoring the returned future requires guarding the task against + * uncaught exceptions, as they would otherwise be silently discarded. */ - template [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) - auto Submit(F&& fn) + template + [[nodiscard]] util::Expected>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { - std::packaged_task task{std::forward(fn)}; + std::packaged_task()> task{std::forward(fn)}; auto future{task.get_future()}; { LOCK(m_mutex); - if (m_interrupt || m_workers.empty()) { - throw std::runtime_error("No active workers; cannot accept new tasks"); - } + if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; + if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; + m_work_queue.emplace(std::move(task)); } m_cv.notify_one(); - return future; + return {std::move(future)}; } /** @@ -208,4 +220,15 @@ public: } }; +constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept { + switch (err) { + case ThreadPool::SubmitError::Inactive: + return "No active workers"; + case ThreadPool::SubmitError::Interrupted: + return "Interrupted"; + } + Assume(false); // Unreachable + return "Unknown error"; +} + #endif // BITCOIN_UTIL_THREADPOOL_H From 726b3663cc8e2164d4e9452f12f5866f5e8f6f1a Mon Sep 17 00:00:00 2001 From: furszy Date: Thu, 12 Feb 2026 14:08:22 -0500 Subject: [PATCH 2/2] http: properly respond to HTTP request during shutdown Makes sure we respond to the client as the HTTP request attempts to submit a task to the thread pool during server shutdown. Roughly what happens: 1) The server receives an HTTP request and starts calling http_request_cb(). 2) Meanwhile on another thread, shutdown is triggered which calls InterruptHTTPServer() and unregisters libevent http_request_cb() callback and interrupts the thread pool. 3) The request (step 1) resumes and tries to submit a task to the now-interrupted server. This fix detects failed submissions immediately, and the server responds with HTTP_SERVICE_UNAVAILABLE. --- src/httpserver.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/httpserver.cpp b/src/httpserver.cpp index 671e119642f..587f7d21ac8 100644 --- a/src/httpserver.cpp +++ b/src/httpserver.cpp @@ -211,7 +211,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg) } } } - auto hreq{std::make_unique(req, *static_cast(arg))}; + auto hreq{std::make_shared(req, *static_cast(arg))}; // Early address-based allow check if (!ClientAllowed(hreq->GetPeer())) { @@ -258,7 +258,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg) return; } - auto item = [req = std::move(hreq), in_path = std::move(path), fn = i->handler]() { + auto item = [req = hreq, in_path = std::move(path), fn = i->handler]() { std::string err_msg; try { fn(req.get(), in_path); @@ -276,7 +276,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg) req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg); }; - [[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))}; + if (auto res = g_threadpool_http.Submit(std::move(item)); !res.has_value()) { + Assume(hreq.use_count() == 1); // ensure request will be deleted + // Both SubmitError::Inactive and SubmitError::Interrupted mean shutdown + LogWarning("HTTP request rejected during server shutdown: '%s'", SubmitErrorString(res.error())); + hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Request rejected during server shutdown"); + return; + } } else { hreq->WriteReply(HTTP_NOT_FOUND); }