diff --git a/src/util/threadpool.h b/src/util/threadpool.h index c039b59c4de..6fc294989d1 100644 --- a/src/util/threadpool.h +++ b/src/util/threadpool.h @@ -105,8 +105,8 @@ public: { assert(num_workers > 0); LOCK(m_mutex); + if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping"); if (!m_workers.empty()) throw std::runtime_error("Thread pool already started"); - m_interrupt = false; // Reset // Create workers m_workers.reserve(num_workers); @@ -122,6 +122,7 @@ public: * Any remaining tasks in the queue will be processed before returning. * * Must be called from a controller (non-worker) thread. + * Concurrent calls to Start() will be rejected while Stop() is in progress. */ void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { @@ -139,9 +140,12 @@ public: } m_cv.notify_all(); for (auto& worker : threads_to_join) worker.join(); + // Since we currently wait for tasks completion, sanity-check empty queue - WITH_LOCK(m_mutex, Assume(m_work_queue.empty())); - // Note: m_interrupt is left true until next Start() + LOCK(m_mutex); + Assume(m_work_queue.empty()); + // Re-allow Start() now that all workers have exited + m_interrupt = false; } enum class SubmitError { @@ -202,6 +206,10 @@ public: * * Wakes all worker threads so they can drain the queue and exit. * Unlike Stop(), this function does not wait for threads to finish. + * + * Note: The next step in the pool lifecycle is calling Stop(), which + * releases any dangling resources and resets the pool state + * for shutdown or restart. */ void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {