diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp index 4855b3343e5..c7878c3523b 100644 --- a/src/test/threadpool_tests.cpp +++ b/src/test/threadpool_tests.cpp @@ -39,6 +39,7 @@ struct ThreadPoolFixture { // 10) Ensure Interrupt() prevents further submissions. // 11) Start() must not cause a deadlock when called during Stop(). // 12) Ensure queued tasks complete after Interrupt(). +// 13) Ensure the Stop() calling thread helps drain the queue. BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture) #define WAIT_FOR(futures) \ @@ -380,4 +381,40 @@ BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt) WAIT_FOR(blocking_tasks); } +// Test 13, ensure the Stop() calling thread helps drain the queue +BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue) +{ + ThreadPool threadPool(POOL_NAME); + threadPool.Start(NUM_WORKERS_DEFAULT); + + std::counting_semaphore<> blocker(0); + const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT); + + auto main_thread_id = std::this_thread::get_id(); + std::atomic main_thread_tasks{0}; + const size_t num_tasks = 20; + for (size_t i = 0; i < num_tasks; i++) { + (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() { + if (std::this_thread::get_id() == main_thread_id) + main_thread_tasks.fetch_add(1, std::memory_order_relaxed); + }); + } + BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks); + + // Delay release so Stop() drain all tasks from the calling thread + std::thread unblocker([&blocker, &threadPool]() { + while (threadPool.WorkQueueSize() > 0) { + std::this_thread::yield(); + } + blocker.release(NUM_WORKERS_DEFAULT); + }); + + threadPool.Stop(); + unblocker.join(); + + // Check the main thread processed all tasks + BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks); + WAIT_FOR(blocking_tasks); +} + BOOST_AUTO_TEST_SUITE_END()