From 0ef938685b5c079a6f5a98daf0e3865d718d817b Mon Sep 17 00:00:00 2001 From: Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:23:42 +0300 Subject: [PATCH 1/5] refactor: Use member initializers in CCheckQueue --- src/checkqueue.h | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/checkqueue.h b/src/checkqueue.h index e3faa1dec08..3e22cd8c600 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -44,23 +44,23 @@ private: std::vector queue; //! The number of workers (including the master) that are idle. - int nIdle; + int nIdle{0}; //! The total number of workers (including the master). - int nTotal; + int nTotal{0}; //! The temporary evaluation result. - bool fAllOk; + bool fAllOk{true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ - unsigned int nTodo; + unsigned int nTodo{0}; //! The maximum number of elements to be processed in one batch - unsigned int nBatchSize; + const unsigned int nBatchSize; /** Internal function that does bulk of the verification work. */ bool Loop(bool fMaster = false) @@ -127,7 +127,10 @@ public: boost::mutex ControlMutex; //! Create a new check queue - explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} + explicit CCheckQueue(unsigned int nBatchSizeIn) + : nBatchSize(nBatchSizeIn) + { + } //! Worker thread void Thread() From 01511776acb0c7ec216dc9c8112531067763f1cb Mon Sep 17 00:00:00 2001 From: Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:24:05 +0300 Subject: [PATCH 2/5] Add local thread pool to CCheckQueue --- src/checkqueue.h | 52 +++++++++++++++++++++++++++++++--- src/init.cpp | 5 ++-- src/test/util/setup_common.cpp | 5 ++-- src/validation.cpp | 11 +++++-- src/validation.h | 6 ++-- 5 files changed, 64 insertions(+), 15 deletions(-) diff --git a/src/checkqueue.h b/src/checkqueue.h index 3e22cd8c600..fcd6e87af3c 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -6,6 +6,8 @@ #define BITCOIN_CHECKQUEUE_H #include +#include +#include #include #include @@ -62,8 +64,11 @@ private: //! The maximum number of elements to be processed in one batch const unsigned int nBatchSize; + std::vector m_worker_threads; + bool m_request_stop{false}; + /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) + bool Loop(bool fMaster) { boost::condition_variable& cond = fMaster ? condMaster : condWorker; std::vector vChecks; @@ -85,7 +90,7 @@ private: nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -98,6 +103,10 @@ private: cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. @@ -132,16 +141,34 @@ public: { } + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) + { + { + boost::unique_lock lock(mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } + } + //! Worker thread void Thread() { - Loop(); + Loop(false /* worker thread */); } //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { - return Loop(true); + return Loop(true /* master thread */); } //! Add a batch of checks to the queue @@ -159,8 +186,25 @@ public: condWorker.notify_all(); } + //! Stop all of the worker threads. + void StopWorkerThreads() + { + { + boost::unique_lock lock(mutex); + m_request_stop = true; + } + condWorker.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + m_worker_threads.clear(); + boost::unique_lock lock(mutex); + m_request_stop = false; + } + ~CCheckQueue() { + assert(m_worker_threads.empty()); } }; diff --git a/src/init.cpp b/src/init.cpp index 5ad807cbac5..bc99994a4dd 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -224,6 +224,7 @@ void Shutdown(NodeContext& node) if (g_load_block.joinable()) g_load_block.join(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. @@ -1307,9 +1308,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA LogPrintf("Script verification uses %d additional threads\n", script_threads); if (script_threads >= 1) { g_parallel_script_checks = true; - for (int i = 0; i < script_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_threads); } assert(!node.scheduler); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 2d3137e1e2a..74498c6c793 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -162,9 +162,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vector(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); @@ -182,6 +180,7 @@ TestingSetup::~TestingSetup() if (m_node.scheduler) m_node.scheduler->stop(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); m_node.connman.reset(); diff --git a/src/validation.cpp b/src/validation.cpp index d4463bf17bb..046583fa5ea 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1817,9 +1817,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, BlockValidationSt static CCheckQueue scriptcheckqueue(128); -void ThreadScriptCheck(int worker_num) { - util::ThreadRename(strprintf("scriptch.%i", worker_num)); - scriptcheckqueue.Thread(); +void StartScriptCheckWorkerThreads(int threads_num) +{ + scriptcheckqueue.StartWorkerThreads(threads_num); +} + +void StopScriptCheckWorkerThreads() +{ + scriptcheckqueue.StopWorkerThreads(); } VersionBitsCache versionbitscache GUARDED_BY(cs_main); diff --git a/src/validation.h b/src/validation.h index d88bd077652..df3b16dc15b 100644 --- a/src/validation.h +++ b/src/validation.h @@ -158,8 +158,10 @@ void LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, FlatFi bool LoadGenesisBlock(const CChainParams& chainparams); /** Unload database information */ void UnloadBlockIndex(CTxMemPool* mempool, ChainstateManager& chainman); -/** Run an instance of the script checking thread */ -void ThreadScriptCheck(int worker_num); +/** Run instances of script checking worker threads */ +void StartScriptCheckWorkerThreads(int threads_num); +/** Stop all of the script checking worker threads */ +void StopScriptCheckWorkerThreads(); /** * Return transaction from the block at block_index. * If block_index is not provided, fall back to mempool. From dba30695fc42f45828db008e7e5b81cb2b5d8551 Mon Sep 17 00:00:00 2001 From: Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:24:32 +0300 Subject: [PATCH 3/5] test: Use CCheckQueue local thread pool --- src/test/checkqueue_tests.cpp | 51 ++++++++-------------------------- src/test/transaction_tests.cpp | 8 ++---- 2 files changed, 14 insertions(+), 45 deletions(-) diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 8348810ac1e..4ce5b30d06d 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -148,10 +148,7 @@ typedef CCheckQueue FrozenCleanup_Queue; static void Correct_Queue_range(std::vector range) { auto small_queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{small_queue->Thread();}); - } + small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; for (const size_t i : range) { @@ -168,8 +165,7 @@ static void Correct_Queue_range(std::vector range) BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } } - tg.interrupt_all(); - tg.join_all(); + small_queue->StopWorkerThreads(); } /** Test that 0 checks is correct @@ -212,11 +208,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { auto fail_queue = MakeUnique(QUEUE_BATCH_SIZE); - - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1001; ++i) { CCheckQueueControl control(fail_queue.get()); @@ -237,18 +229,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { auto fail_queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (auto times = 0; times < 10; ++times) { for (const bool end_fails : {true, false}) { @@ -263,8 +251,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_REQUIRE(r != end_fails); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that unique checks are actually all called individually, rather than @@ -273,11 +260,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); size_t COUNT = 100000; size_t total = COUNT; @@ -300,8 +283,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) } BOOST_REQUIRE(r); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } @@ -313,10 +295,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1000; ++i) { size_t total = i; { @@ -335,8 +314,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } // Test that a new verification cannot occur until all checks @@ -344,11 +322,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; bool fails = false; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); std::thread t0([&]() { CCheckQueueControl control(queue.get()); std::vector vChecks(1); @@ -378,9 +353,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) FrozenCleanupCheck::cv.notify_one(); // Wait for control to finish t0.join(); - tg.interrupt_all(); - tg.join_all(); BOOST_REQUIRE(!fails); + queue->StopWorkerThreads(); } @@ -445,4 +419,3 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) } } BOOST_AUTO_TEST_SUITE_END() - diff --git a/src/test/transaction_tests.cpp b/src/test/transaction_tests.cpp index 94b5dba913b..7afc8ccc319 100644 --- a/src/test/transaction_tests.cpp +++ b/src/test/transaction_tests.cpp @@ -427,12 +427,10 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) // check all inputs concurrently, with the cache PrecomputedTransactionData txdata(tx); - boost::thread_group threadGroup; CCheckQueue scriptcheckqueue(128); CCheckQueueControl control(&scriptcheckqueue); - for (int i=0; i<20; i++) - threadGroup.create_thread(std::bind(&CCheckQueue::Thread, std::ref(scriptcheckqueue))); + scriptcheckqueue.StartWorkerThreads(20); std::vector coins; for(uint32_t i = 0; i < mtx.vin.size(); i++) { @@ -454,9 +452,7 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) bool controlCheck = control.Wait(); assert(controlCheck); - - threadGroup.interrupt_all(); - threadGroup.join_all(); + scriptcheckqueue.StopWorkerThreads(); } SignatureData CombineSignatures(const CMutableTransaction& input1, const CMutableTransaction& input2, const CTransactionRef tx) From 6784ac471bb32b6bb8e2de60986f123eb4990706 Mon Sep 17 00:00:00 2001 From: Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:24:48 +0300 Subject: [PATCH 4/5] bench: Use CCheckQueue local thread pool --- src/bench/checkqueue.cpp | 10 ++-------- src/checkqueue.h | 6 ------ 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index ffa772d8c10..d7b8c1badc6 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -10,8 +10,6 @@ #include #include -#include - #include static const size_t BATCHES = 101; @@ -44,12 +42,9 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench) void swap(PrevectorJob& x){p.swap(x.p);}; }; CCheckQueue queue {QUEUE_BATCH_SIZE}; - boost::thread_group tg; // The main thread should be counted to prevent thread oversubscription, and // to decrease the variance of benchmark results. - for (auto x = 0; x < GetNumCores() - 1; ++x) { - tg.create_thread([&]{queue.Thread();}); - } + queue.StartWorkerThreads(GetNumCores() - 1); // create all the data once, then submit copies in the benchmark. FastRandomContext insecure_rand(true); @@ -70,8 +65,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench) // it is done explicitly here for clarity control.Wait(); }); - tg.interrupt_all(); - tg.join_all(); + queue.StopWorkerThreads(); ECC_Stop(); } BENCHMARK(CCheckQueueSpeedPrevectorJob); diff --git a/src/checkqueue.h b/src/checkqueue.h index fcd6e87af3c..83b371fd429 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -159,12 +159,6 @@ public: } } - //! Worker thread - void Thread() - { - Loop(false /* worker thread */); - } - //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { From bb6fcc75d1ec94b733d1477c816351c50be5faf9 Mon Sep 17 00:00:00 2001 From: Hennadii Stepanov <32963518+hebasto@users.noreply.github.com> Date: Fri, 21 Aug 2020 09:25:07 +0300 Subject: [PATCH 5/5] refactor: Drop boost::thread stuff in CCheckQueue --- src/checkqueue.h | 53 +++++++++++++++-------------------- src/test/checkqueue_tests.cpp | 4 +-- test/lint/lint-includes.sh | 1 - 3 files changed, 25 insertions(+), 33 deletions(-) diff --git a/src/checkqueue.h b/src/checkqueue.h index 83b371fd429..4ceeb3600a3 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -12,9 +12,6 @@ #include #include -#include -#include - template class CCheckQueueControl; @@ -33,58 +30,58 @@ class CCheckQueue { private: //! Mutex to protect the inner state - boost::mutex mutex; + Mutex m_mutex; //! Worker threads block on this when out of work - boost::condition_variable condWorker; + std::condition_variable m_worker_cv; //! Master thread blocks on this when out of work - boost::condition_variable condMaster; + std::condition_variable m_master_cv; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) - std::vector queue; + std::vector queue GUARDED_BY(m_mutex); //! The number of workers (including the master) that are idle. - int nIdle{0}; + int nIdle GUARDED_BY(m_mutex){0}; //! The total number of workers (including the master). - int nTotal{0}; + int nTotal GUARDED_BY(m_mutex){0}; //! The temporary evaluation result. - bool fAllOk{true}; + bool fAllOk GUARDED_BY(m_mutex){true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ - unsigned int nTodo{0}; + unsigned int nTodo GUARDED_BY(m_mutex){0}; //! The maximum number of elements to be processed in one batch const unsigned int nBatchSize; std::vector m_worker_threads; - bool m_request_stop{false}; + bool m_request_stop GUARDED_BY(m_mutex){false}; /** Internal function that does bulk of the verification work. */ bool Loop(bool fMaster) { - boost::condition_variable& cond = fMaster ? condMaster : condWorker; + std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; std::vector vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { - boost::unique_lock lock(mutex); + WAIT_LOCK(m_mutex, lock); // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) if (nNow) { fAllOk &= fOk; nTodo -= nNow; if (nTodo == 0 && !fMaster) // We processed the last element; inform the master it can exit and return the result - condMaster.notify_one(); + m_master_cv.notify_one(); } else { // first iteration nTotal++; @@ -115,7 +112,7 @@ private: nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); vChecks.resize(nNow); for (unsigned int i = 0; i < nNow; i++) { - // We want the lock on the mutex to be as short as possible, so swap jobs from the global + // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global // queue to the local batch vector instead of copying. vChecks[i].swap(queue.back()); queue.pop_back(); @@ -133,7 +130,7 @@ private: public: //! Mutex to ensure only one concurrent CCheckQueueControl - boost::mutex ControlMutex; + Mutex m_control_mutex; //! Create a new check queue explicit CCheckQueue(unsigned int nBatchSizeIn) @@ -145,7 +142,7 @@ public: void StartWorkerThreads(const int threads_num) { { - boost::unique_lock lock(mutex); + LOCK(m_mutex); nIdle = 0; nTotal = 0; fAllOk = true; @@ -168,32 +165,28 @@ public: //! Add a batch of checks to the queue void Add(std::vector& vChecks) { - boost::unique_lock lock(mutex); + LOCK(m_mutex); for (T& check : vChecks) { queue.push_back(T()); check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) - condWorker.notify_one(); + m_worker_cv.notify_one(); else if (vChecks.size() > 1) - condWorker.notify_all(); + m_worker_cv.notify_all(); } //! Stop all of the worker threads. void StopWorkerThreads() { - { - boost::unique_lock lock(mutex); - m_request_stop = true; - } - condWorker.notify_all(); + WITH_LOCK(m_mutex, m_request_stop = true); + m_worker_cv.notify_all(); for (std::thread& t : m_worker_threads) { t.join(); } m_worker_threads.clear(); - boost::unique_lock lock(mutex); - m_request_stop = false; + WITH_LOCK(m_mutex, m_request_stop = false); } ~CCheckQueue() @@ -222,7 +215,7 @@ public: { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { - ENTER_CRITICAL_SECTION(pqueue->ControlMutex); + ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); } } @@ -246,7 +239,7 @@ public: if (!fDone) Wait(); if (pqueue != nullptr) { - LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); + LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); } } }; diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 4ce5b30d06d..996bf4de5d4 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -342,7 +342,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) } // Try to get control of the queue a bunch of times for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } { // Unfreeze (we need lock n case of spurious wakeup) @@ -405,7 +405,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.wait(l, [&](){return has_lock;}); bool fails = false; for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } has_tried = true; cv.notify_one(); diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index fde77aea2dc..e6885904459 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -69,7 +69,6 @@ EXPECTED_BOOST_INCLUDES=( boost/signals2/signal.hpp boost/test/unit_test.hpp boost/thread/condition_variable.hpp - boost/thread/mutex.hpp boost/thread/shared_mutex.hpp boost/thread/thread.hpp boost/variant.hpp