diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 2466a483921..1cae4fc43fd 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -107,6 +107,7 @@ add_executable(test_bitcoin system_ram_tests.cpp system_tests.cpp testnet4_miner_tests.cpp + threadpool_tests.cpp timeoffsets_tests.cpp torcontrol_tests.cpp transaction_tests.cpp diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp new file mode 100644 index 00000000000..46acf1d67d6 --- /dev/null +++ b/src/test/threadpool_tests.cpp @@ -0,0 +1,325 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include + +#include + +// General test values +int NUM_WORKERS_DEFAULT = 0; +constexpr char POOL_NAME[] = "test"; +constexpr auto WAIT_TIMEOUT = 120s; + +struct ThreadPoolFixture { + ThreadPoolFixture() { + NUM_WORKERS_DEFAULT = FastRandomContext().randrange(GetNumCores()) + 1; + LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT); + } +}; + +// Test Cases Overview +// 0) Submit task to a non-started pool. +// 1) Submit tasks and verify completion. +// 2) Maintain all threads busy except one. +// 3) Wait for work to finish. +// 4) Wait for result object. +// 5) The task throws an exception, catch must be done in the consumer side. +// 6) Busy workers, help them by processing tasks externally. +// 7) Recursive submission of tasks. +// 8) Submit task when all threads are busy, stop pool and verify task gets executed. +// 9) Congestion test; create more workers than available cores. +// 10) Ensure Interrupt() prevents further submissions. +BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture) + +#define WAIT_FOR(futures) \ + do { \ + for (const auto& f : futures) { \ + BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \ + } \ + } while (0) + +// Block a number of worker threads by submitting tasks that wait on `blocker_future`. +// Returns the futures of the blocking tasks, ensuring all have started and are waiting. +std::vector> BlockWorkers(ThreadPool& threadPool, const std::shared_future& blocker_future, int num_of_threads_to_block) +{ + // Per-thread ready promises to ensure all workers are actually blocked + std::vector> ready_promises(num_of_threads_to_block); + std::vector> ready_futures; + ready_futures.reserve(num_of_threads_to_block); + for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future()); + + // Fill all workers with blocking tasks + std::vector> blocking_tasks; + for (int i = 0; i < num_of_threads_to_block; i++) { + std::promise& ready = ready_promises[i]; + blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() { + ready.set_value(); + blocker_future.wait(); + })); + } + + // Wait until all threads are actually blocked + WAIT_FOR(ready_futures); + return blocking_tasks; +} + +// Test 0, submit task to a non-started pool +BOOST_AUTO_TEST_CASE(submit_task_before_start_fails) +{ + ThreadPool threadPool(POOL_NAME); + BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks"); + return true; + }); +} + +// Test 1, submit tasks and verify completion +BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully) +{ + int num_tasks = 50; + + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic counter = 0; + + // Store futures to ensure completion before checking counter. + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 1; i <= num_tasks; i++) { + futures.emplace_back(threadPool.Submit([&counter, i]() { + counter.fetch_add(i, std::memory_order_relaxed); + })); + } + + // Wait for all tasks to finish + WAIT_FOR(futures); + int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum. + BOOST_CHECK_EQUAL(counter.load(), expected_value); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); +} + +// Test 2, maintain all threads busy except one +BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + // Single blocking future for all threads + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1); + + // Now execute tasks on the single available worker + // and check that all the tasks are executed. + int num_tasks = 15; + int counter = 0; + + // Store futures to wait on + std::vector> futures(num_tasks); + for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; }); + + WAIT_FOR(futures); + BOOST_CHECK_EQUAL(counter, num_tasks); + + blocker.set_value(); + WAIT_FOR(blocking_tasks); + threadPool.Stop(); + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); +} + +// Test 3, wait for work to finish +BOOST_AUTO_TEST_CASE(wait_for_task_to_finish) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::atomic flag = false; + std::future future = threadPool.Submit([&flag]() { + UninterruptibleSleep(200ms); + flag.store(true, std::memory_order_release); + }); + BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready); + BOOST_CHECK(flag.load(std::memory_order_acquire)); +} + +// Test 4, obtain result object +BOOST_AUTO_TEST_CASE(get_result_from_completed_task) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + std::future future_bool = threadPool.Submit([]() { return true; }); + BOOST_CHECK(future_bool.get()); + + std::future future_str = threadPool.Submit([]() { return std::string("true"); }); + std::string result = future_str.get(); + BOOST_CHECK_EQUAL(result, "true"); +} + +// Test 5, throw exception and catch it on the consumer side +BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + int num_tasks = 5; + std::string err_msg{"something wrong happened"}; + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 0; i < num_tasks; i++) { + futures.emplace_back(threadPool.Submit([err_msg, i]() { + throw std::runtime_error(err_msg + util::ToString(i)); + })); + } + + for (int i = 0; i < num_tasks; i++) { + BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i)); + return true; + }); + } +} + +// Test 6, all workers are busy, help them by processing tasks from outside +BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); + + // Now submit tasks and check that none of them are executed. + int num_tasks = 20; + std::atomic counter = 0; + for (int i = 0; i < num_tasks; i++) { + (void)threadPool.Submit([&counter]() { + counter.fetch_add(1, std::memory_order_relaxed); + }); + } + UninterruptibleSleep(100ms); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks); + + // Now process manually + for (int i = 0; i < num_tasks; i++) { + threadPool.ProcessTask(); + } + BOOST_CHECK_EQUAL(counter.load(), num_tasks); + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); + blocker.set_value(); + threadPool.Stop(); + WAIT_FOR(blocking_tasks); +} + +// Test 7, submit tasks from other tasks +BOOST_AUTO_TEST_CASE(recursive_task_submission) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise signal; + (void)threadPool.Submit([&]() { + (void)threadPool.Submit([&]() { + signal.set_value(); + }); + }); + + signal.get_future().wait(); + threadPool.Stop(); +} + +// Test 8, submit task when all threads are busy and then stop the pool +BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::promise blocker; + std::shared_future blocker_future(blocker.get_future()); + const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); + + // Submit an extra task that should execute once a worker is free + std::future future = threadPool.Submit([]() { return true; }); + + // At this point, all workers are blocked, and the extra task is queued + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1); + + // Wait a short moment before unblocking the threads to mimic a concurrent shutdown + std::thread thread_unblocker([&blocker]() { + UninterruptibleSleep(300ms); + blocker.set_value(); + }); + + // Stop the pool while the workers are still blocked + threadPool.Stop(); + + // Expect the submitted task to complete + BOOST_CHECK(future.get()); + thread_unblocker.join(); + + // Obviously all the previously blocking tasks should be completed at this point too + WAIT_FOR(blocking_tasks); + + // Pool should be stopped and no workers remaining + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); +} + +// Test 9, more workers than available cores (congestion test) +BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2× + + int num_tasks = 200; + std::atomic counter{0}; + + std::vector> futures; + futures.reserve(num_tasks); + for (int i = 0; i < num_tasks; i++) { + futures.emplace_back(threadPool.Submit([&counter] { + counter.fetch_add(1, std::memory_order_relaxed); + })); + } + + WAIT_FOR(futures); + BOOST_CHECK_EQUAL(counter.load(), num_tasks); +} + +// Test 10, Interrupt() prevents further submissions +BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) +{ + // 1) Interrupt from main thread + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + threadPool.Interrupt(); + BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) { + BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks"); + return true; + }); + + // Reset pool + threadPool.Stop(); + + // 2) Interrupt() from a worker thread + // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks. + threadPool.Start(/*num_workers=*/3); + std::atomic counter{0}; + std::promise blocker; + const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1); + threadPool.Submit([&threadPool, &counter]{ + threadPool.Interrupt(); + counter.fetch_add(1, std::memory_order_relaxed); + }).get(); + blocker.set_value(); // unblock worker + + BOOST_CHECK_EQUAL(counter.load(), 1); + threadPool.Stop(); + WAIT_FOR(blocking_tasks); + BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/util/threadpool.h b/src/util/threadpool.h new file mode 100644 index 00000000000..b75a94157eb --- /dev/null +++ b/src/util/threadpool.h @@ -0,0 +1,211 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_THREADPOOL_H +#define BITCOIN_UTIL_THREADPOOL_H + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief Fixed-size thread pool for running arbitrary tasks concurrently. + * + * The thread pool maintains a set of worker threads that consume and execute + * tasks submitted through Submit(). Once started, tasks can be queued and + * processed asynchronously until Stop() is called. + * + * ### Thread-safety and lifecycle + * - `Start()` and `Stop()` must be called from a controller (non-worker) thread. + * Calling `Stop()` from a worker thread will deadlock, as it waits for all + * workers to join, including the current one. + * + * - `Submit()` can be called from any thread, including workers. It safely + * enqueues new work for execution as long as the pool has active workers. + * + * - `Interrupt()` stops new task submission and lets queued ones drain + * in the background. Callers can continue other shutdown steps and call + * Stop() at the end to ensure no remaining tasks are left to execute. + * + * - `Stop()` prevents further task submission and blocks until all the + * queued ones are completed. + */ +class ThreadPool +{ +private: + std::string m_name; + Mutex m_mutex; + std::queue> m_work_queue GUARDED_BY(m_mutex); + std::condition_variable m_cv; + // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool. + // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals. + // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable + bool m_interrupt GUARDED_BY(m_mutex){false}; + std::vector m_workers GUARDED_BY(m_mutex); + + void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + WAIT_LOCK(m_mutex, wait_lock); + for (;;) { + std::packaged_task task; + { + // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one. + if (!m_interrupt && m_work_queue.empty()) { + // Block until the pool is interrupted or a task is available. + m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); }); + } + + // If stopped and no work left, exit worker + if (m_interrupt && m_work_queue.empty()) { + return; + } + + task = std::move(m_work_queue.front()); + m_work_queue.pop(); + } + + { + // Execute the task without the lock + REVERSE_LOCK(wait_lock, m_mutex); + task(); + } + } + } + +public: + explicit ThreadPool(const std::string& name) : m_name(name) {} + + ~ThreadPool() + { + Stop(); // In case it hasn't been stopped. + } + + /** + * @brief Start worker threads. + * + * Creates and launches `num_workers` threads that begin executing tasks + * from the queue. If the pool is already started, throws. + * + * Must be called from a controller (non-worker) thread. + */ + void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + assert(num_workers > 0); + LOCK(m_mutex); + if (!m_workers.empty()) throw std::runtime_error("Thread pool already started"); + m_interrupt = false; // Reset + + // Create workers + m_workers.reserve(num_workers); + for (int i = 0; i < num_workers; i++) { + m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); }); + } + } + + /** + * @brief Stop all worker threads and wait for them to exit. + * + * Sets the interrupt flag, wakes all waiting workers, and joins them. + * Any remaining tasks in the queue will be processed before returning. + * + * Must be called from a controller (non-worker) thread. + */ + void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + // Notify workers and join them + std::vector threads_to_join; + { + LOCK(m_mutex); + // Ensure Stop() is not called from a worker thread while workers are still registered, + // otherwise a self-join deadlock would occur. + auto id = std::this_thread::get_id(); + for (const auto& worker : m_workers) assert(worker.get_id() != id); + // Early shutdown to return right away on any concurrent Submit() call + m_interrupt = true; + threads_to_join.swap(m_workers); + } + m_cv.notify_all(); + for (auto& worker : threads_to_join) worker.join(); + // Since we currently wait for tasks completion, sanity-check empty queue + WITH_LOCK(m_mutex, Assume(m_work_queue.empty())); + // Note: m_interrupt is left true until next Start() + } + + /** + * @brief Enqueues a new task for asynchronous execution. + * + * Returns a `std::future` that provides the task's result or propagates + * any exception it throws. + * Note: Ignoring the returned future requires guarding the task against + * uncaught exceptions, as they would otherwise be silently discarded. + */ + template [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + auto Submit(F&& fn) + { + std::packaged_task task{std::forward(fn)}; + auto future{task.get_future()}; + { + LOCK(m_mutex); + if (m_interrupt || m_workers.empty()) { + throw std::runtime_error("No active workers; cannot accept new tasks"); + } + m_work_queue.emplace(std::move(task)); + } + m_cv.notify_one(); + return future; + } + + /** + * @brief Execute a single queued task synchronously. + * Removes one task from the queue and executes it on the calling thread. + */ + void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + std::packaged_task task; + { + LOCK(m_mutex); + if (m_work_queue.empty()) return; + + // Pop the task + task = std::move(m_work_queue.front()); + m_work_queue.pop(); + } + task(); + } + + /** + * @brief Stop accepting new tasks and begin asynchronous shutdown. + * + * Wakes all worker threads so they can drain the queue and exit. + * Unlike Stop(), this function does not wait for threads to finish. + */ + void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + WITH_LOCK(m_mutex, m_interrupt = true); + m_cv.notify_all(); + } + + size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + return WITH_LOCK(m_mutex, return m_work_queue.size()); + } + + size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) + { + return WITH_LOCK(m_mutex, return m_workers.size()); + } +}; + +#endif // BITCOIN_UTIL_THREADPOOL_H