From 9ff1e82e7dbdf31ddf1c534853da4581a1f41bd5 Mon Sep 17 00:00:00 2001 From: l0rinc Date: Tue, 17 Feb 2026 16:21:30 -0500 Subject: [PATCH] test: cleanup, block threads via semaphore instead of shared_future No-behavior change. --- src/test/threadpool_tests.cpp | 60 ++++++++++++++--------------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp index 850cd16650d..aee9a722ae8 100644 --- a/src/test/threadpool_tests.cpp +++ b/src/test/threadpool_tests.cpp @@ -10,6 +10,8 @@ #include #include +#include +#include // General test values int NUM_WORKERS_DEFAULT = 0; @@ -51,28 +53,18 @@ template return std::move(*Assert(pool.Submit(std::forward(fn)))); } -// Block a number of worker threads by submitting tasks that wait on `blocker_future`. +// Block a number of worker threads by submitting tasks that wait on `release_sem`. // Returns the futures of the blocking tasks, ensuring all have started and are waiting. -std::vector> BlockWorkers(ThreadPool& threadPool, const std::shared_future& blocker_future, int num_of_threads_to_block) +std::vector> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block) { - // Per-thread ready promises to ensure all workers are actually blocked - std::vector> ready_promises(num_of_threads_to_block); - std::vector> ready_futures; - ready_futures.reserve(num_of_threads_to_block); - for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future()); - - // Fill all workers with blocking tasks - std::vector> blocking_tasks; - for (int i = 0; i < num_of_threads_to_block; i++) { - std::promise& ready = ready_promises[i]; - blocking_tasks.emplace_back(Submit(threadPool, [blocker_future, &ready]() { - ready.set_value(); - blocker_future.wait(); - })); - } - - // Wait until all threads are actually blocked - WAIT_FOR(ready_futures); + assert(threadPool.WorkersCount() >= num_of_threads_to_block); + std::latch ready{static_cast(num_of_threads_to_block)}; + std::vector> blocking_tasks(num_of_threads_to_block); + for (auto& f : blocking_tasks) f = Submit(threadPool, [&] { + ready.count_down(); + release_sem.acquire(); + }); + ready.wait(); return blocking_tasks; } @@ -115,10 +107,8 @@ BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks) { ThreadPool threadPool(POOL_NAME); threadPool.Start(NUM_WORKERS_DEFAULT); - // Single blocking future for all threads - std::promise blocker; - std::shared_future blocker_future(blocker.get_future()); - const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1); + std::counting_semaphore<> blocker(0); + const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1); // Now execute tasks on the single available worker // and check that all the tasks are executed. @@ -132,7 +122,7 @@ BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks) WAIT_FOR(futures); BOOST_CHECK_EQUAL(counter, num_tasks); - blocker.set_value(); + blocker.release(NUM_WORKERS_DEFAULT - 1); WAIT_FOR(blocking_tasks); threadPool.Stop(); BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0); @@ -195,9 +185,8 @@ BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy) ThreadPool threadPool(POOL_NAME); threadPool.Start(NUM_WORKERS_DEFAULT); - std::promise blocker; - std::shared_future blocker_future(blocker.get_future()); - const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); + std::counting_semaphore<> blocker(0); + const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); // Now submit tasks and check that none of them are executed. int num_tasks = 20; @@ -216,7 +205,7 @@ BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy) } BOOST_CHECK_EQUAL(counter.load(), num_tasks); BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); - blocker.set_value(); + blocker.release(NUM_WORKERS_DEFAULT); threadPool.Stop(); WAIT_FOR(blocking_tasks); } @@ -244,9 +233,8 @@ BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes) ThreadPool threadPool(POOL_NAME); threadPool.Start(NUM_WORKERS_DEFAULT); - std::promise blocker; - std::shared_future blocker_future(blocker.get_future()); - const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT); + std::counting_semaphore<> blocker(0); + const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); // Submit an extra task that should execute once a worker is free std::future future = Submit(threadPool, []() { return true; }); @@ -257,7 +245,7 @@ BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes) // Wait a short moment before unblocking the threads to mimic a concurrent shutdown std::thread thread_unblocker([&blocker]() { UninterruptibleSleep(300ms); - blocker.set_value(); + blocker.release(NUM_WORKERS_DEFAULT); }); // Stop the pool while the workers are still blocked @@ -314,13 +302,13 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions) // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks. threadPool.Start(/*num_workers=*/3); std::atomic counter{0}; - std::promise blocker; - const auto blocking_tasks = BlockWorkers(threadPool, blocker.get_future().share(), 1); + std::counting_semaphore<> blocker(0); + const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1); Submit(threadPool, [&threadPool, &counter]{ threadPool.Interrupt(); counter.fetch_add(1, std::memory_order_relaxed); }).get(); - blocker.set_value(); // unblock worker + blocker.release(1); // unblock worker BOOST_CHECK_EQUAL(counter.load(), 1); threadPool.Stop();