Add local thread pool to CCheckQueue

This commit is contained in:
Hennadii Stepanov
2020-08-21 09:24:05 +03:00
parent 0ef938685b
commit 01511776ac
5 changed files with 64 additions and 15 deletions

View File

@@ -6,6 +6,8 @@
#define BITCOIN_CHECKQUEUE_H
#include <sync.h>
#include <tinyformat.h>
#include <util/threadnames.h>
#include <algorithm>
#include <vector>
@@ -62,8 +64,11 @@ private:
//! The maximum number of elements to be processed in one batch
const unsigned int nBatchSize;
std::vector<std::thread> m_worker_threads;
bool m_request_stop{false};
/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false)
bool Loop(bool fMaster)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
@@ -85,7 +90,7 @@ private:
nTotal++;
}
// logically, the do loop starts here
while (queue.empty()) {
while (queue.empty() && !m_request_stop) {
if (fMaster && nTodo == 0) {
nTotal--;
bool fRet = fAllOk;
@@ -98,6 +103,10 @@ private:
cond.wait(lock); // wait
nIdle--;
}
if (m_request_stop) {
return false;
}
// Decide how many work units to process now.
// * Do not try to do everything at once, but aim for increasingly smaller batches so
// all workers finish approximately simultaneously.
@@ -132,16 +141,34 @@ public:
{
}
//! Create a pool of new worker threads.
void StartWorkerThreads(const int threads_num)
{
{
boost::unique_lock<boost::mutex> lock(mutex);
nIdle = 0;
nTotal = 0;
fAllOk = true;
}
assert(m_worker_threads.empty());
for (int n = 0; n < threads_num; ++n) {
m_worker_threads.emplace_back([this, n]() {
util::ThreadRename(strprintf("scriptch.%i", n));
Loop(false /* worker thread */);
});
}
}
//! Worker thread
void Thread()
{
Loop();
Loop(false /* worker thread */);
}
//! Wait until execution finishes, and return whether all evaluations were successful.
bool Wait()
{
return Loop(true);
return Loop(true /* master thread */);
}
//! Add a batch of checks to the queue
@@ -159,8 +186,25 @@ public:
condWorker.notify_all();
}
//! Stop all of the worker threads.
void StopWorkerThreads()
{
{
boost::unique_lock<boost::mutex> lock(mutex);
m_request_stop = true;
}
condWorker.notify_all();
for (std::thread& t : m_worker_threads) {
t.join();
}
m_worker_threads.clear();
boost::unique_lock<boost::mutex> lock(mutex);
m_request_stop = false;
}
~CCheckQueue()
{
assert(m_worker_threads.empty());
}
};