threadpool: add ranged Submit overload

Co-authored-by: l0rinc <pap.lorinc@gmail.com>
This commit is contained in:
Andrew Toth
2026-02-12 12:16:50 -05:00
parent 9cad97f6cd
commit 79571b9181
2 changed files with 107 additions and 3 deletions

View File

@@ -11,7 +11,11 @@
#include <util/time.h>
#include <boost/test/unit_test.hpp>
#include <array>
#include <functional>
#include <latch>
#include <ranges>
#include <semaphore>
// General test values
@@ -41,6 +45,7 @@ struct ThreadPoolFixture {
// 11) Start() must not cause a deadlock when called during Stop().
// 12) Ensure queued tasks complete after Interrupt().
// 13) Ensure the Stop() calling thread helps drain the queue.
// 14) Submit range of tasks in one lock acquisition.
BOOST_FIXTURE_TEST_SUITE(threadpool_tests, ThreadPoolFixture)
#define WAIT_FOR(futures) \
@@ -107,6 +112,11 @@ BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
res = threadPool.Submit(fn_empty);
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
std::vector<std::function<void()>> tasks;
const auto range_res{threadPool.Submit(std::move(tasks))};
BOOST_CHECK(!range_res);
BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers");
}
// Test 1, submit tasks and verify completion
@@ -323,6 +333,11 @@ BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
BOOST_CHECK(!res);
BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
std::vector<std::function<void()>> tasks;
const auto range_res{threadPool.Submit(std::move(tasks))};
BOOST_CHECK(!range_res);
BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted");
// Reset pool
threadPool.Stop();
@@ -442,4 +457,43 @@ BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
WAIT_FOR(blocking_tasks);
}
// Test 14, submit range of tasks in one lock acquisition
BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully)
{
constexpr int32_t num_tasks{50};
ThreadPool threadPool{POOL_NAME};
threadPool.Start(NUM_WORKERS_DEFAULT);
std::atomic_int32_t sum{0};
const auto square{[&sum](int32_t i) {
sum.fetch_add(i, std::memory_order_relaxed);
return i * i;
}};
std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks;
std::vector<std::function<int32_t()>> vector_tasks;
vector_tasks.reserve(static_cast<size_t>(num_tasks));
for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); };
vector_tasks.emplace_back([i, square] { return square(i); });
}
auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))};
BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks));
std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2));
auto squares_sum{0};
for (auto& future : futures) {
squares_sum += future.get();
}
// 2x Gauss sum.
const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
BOOST_CHECK_EQUAL(sum, expected_sum);
BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum);
BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
}
BOOST_AUTO_TEST_SUITE_END()