Merge #18234: refactor: Replace boost::mutex,condition_var,chrono with std equivalents in scheduler

70a6b529f3 lint-cppcheck: Remove -DHAVE_WORKING_BOOST_SLEEP_FOR (Anthony Towns)
294937b39d scheduler_tests: re-enable mockforward test (Anthony Towns)
cea19f6859 Drop unused reverselock.h (Anthony Towns)
d0ebd93270 scheduler: switch from boost to std (Anthony Towns)
b9c4260127 sync.h: add REVERSE_LOCK (Anthony Towns)
306f71b4eb scheduler: don't rely on boost interrupt on shutdown (Anthony Towns)

Pull request description:

  Replacing boost functionality with C++11 stuff.

  Motivated by #18227, but should stand alone. Changing from `boost::condition_var` to `std::condition_var` means `threadGroup.interrupt_all` isn't enough to interrupt `serviceQueue` anymore, so that means calling `stop()` before `join_all()` is needed. And the existing reverselock.h code doesn't work with sync.h's DebugLock code (because the reversed lock won't be removed from `g_lockstack` which then leads to incorrect potential deadlock warnings), so I've replaced that with a dedicated class and macro that's aware of our debug lock behaviour.

  Fixes #16027, Fixes #14200, Fixes #18227

ACKs for top commit:
  laanwj:
    ACK 70a6b529f3

Tree-SHA512: d1da13adeabcf9186d114e2dad9a4fdbe2e440f7afbccde0c13dfbaf464efcd850b69d3371c5bf8b179d7ceb9d81f4af3cc22960b90834e41eaaf6d52ef7d331
This commit is contained in:
Wladimir J. van der Laan
2020-03-06 20:47:49 +01:00
14 changed files with 151 additions and 117 deletions

View File

@ -178,7 +178,6 @@ BITCOIN_CORE_H = \
random.h \ random.h \
randomenv.h \ randomenv.h \
reverse_iterator.h \ reverse_iterator.h \
reverselock.h \
rpc/blockchain.h \ rpc/blockchain.h \
rpc/client.h \ rpc/client.h \
rpc/protocol.h \ rpc/protocol.h \

View File

@ -206,6 +206,7 @@ void Shutdown(NodeContext& node)
// After everything has been shut down, but before things get flushed, stop the // After everything has been shut down, but before things get flushed, stop the
// CScheduler/checkqueue threadGroup // CScheduler/checkqueue threadGroup
if (node.scheduler) node.scheduler->stop();
threadGroup.interrupt_all(); threadGroup.interrupt_all();
threadGroup.join_all(); threadGroup.join_all();

View File

@ -1,34 +0,0 @@
// Copyright (c) 2015-2016 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_REVERSELOCK_H
#define BITCOIN_REVERSELOCK_H
/**
* An RAII-style reverse lock. Unlocks on construction and locks on destruction.
*/
template<typename Lock>
class reverse_lock
{
public:
explicit reverse_lock(Lock& _lock) : lock(_lock) {
_lock.unlock();
_lock.swap(templock);
}
~reverse_lock() {
templock.lock();
templock.swap(lock);
}
private:
reverse_lock(reverse_lock const&);
reverse_lock& operator=(reverse_lock const&);
Lock& lock;
Lock templock;
};
#endif // BITCOIN_REVERSELOCK_H

View File

@ -393,7 +393,7 @@ static UniValue mockscheduler(const JSONRPCRequest& request)
// protect against null pointer dereference // protect against null pointer dereference
CHECK_NONFATAL(g_rpc_node); CHECK_NONFATAL(g_rpc_node);
CHECK_NONFATAL(g_rpc_node->scheduler); CHECK_NONFATAL(g_rpc_node->scheduler);
g_rpc_node->scheduler->MockForward(boost::chrono::seconds(delta_seconds)); g_rpc_node->scheduler->MockForward(std::chrono::seconds(delta_seconds));
return NullUniValue; return NullUniValue;
} }

View File

@ -5,7 +5,6 @@
#include <scheduler.h> #include <scheduler.h>
#include <random.h> #include <random.h>
#include <reverselock.h>
#include <assert.h> #include <assert.h>
#include <utility> #include <utility>
@ -20,18 +19,9 @@ CScheduler::~CScheduler()
} }
#if BOOST_VERSION < 105000
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
{
// Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
// start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch()).count());
}
#endif
void CScheduler::serviceQueue() void CScheduler::serviceQueue()
{ {
boost::unique_lock<boost::mutex> lock(newTaskMutex); WAIT_LOCK(newTaskMutex, lock);
++nThreadsServicingQueue; ++nThreadsServicingQueue;
// newTaskMutex is locked throughout this loop EXCEPT // newTaskMutex is locked throughout this loop EXCEPT
@ -40,7 +30,7 @@ void CScheduler::serviceQueue()
while (!shouldStop()) { while (!shouldStop()) {
try { try {
if (!shouldStop() && taskQueue.empty()) { if (!shouldStop() && taskQueue.empty()) {
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock); REVERSE_LOCK(lock);
} }
while (!shouldStop() && taskQueue.empty()) { while (!shouldStop() && taskQueue.empty()) {
// Wait until there is something to do. // Wait until there is something to do.
@ -50,21 +40,13 @@ void CScheduler::serviceQueue()
// Wait until either there is a new task, or until // Wait until either there is a new task, or until
// the time of the first item on the queue: // the time of the first item on the queue:
// wait_until needs boost 1.50 or later; older versions have timed_wait:
#if BOOST_VERSION < 105000
while (!shouldStop() && !taskQueue.empty() &&
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
// Keep waiting until timeout
}
#else
// Some boost versions have a conflicting overload of wait_until that returns void.
// Explicitly use a template here to avoid hitting that overload.
while (!shouldStop() && !taskQueue.empty()) { while (!shouldStop() && !taskQueue.empty()) {
boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout) if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
break; // Exit loop after timeout, it means we reached the time of the event break; // Exit loop after timeout, it means we reached the time of the event
} }
#endif }
// If there are multiple threads, the queue can empty while we're waiting (another // If there are multiple threads, the queue can empty while we're waiting (another
// thread may service the task we were waiting on). // thread may service the task we were waiting on).
if (shouldStop() || taskQueue.empty()) if (shouldStop() || taskQueue.empty())
@ -76,7 +58,7 @@ void CScheduler::serviceQueue()
{ {
// Unlock before calling f, so it can reschedule itself or another task // Unlock before calling f, so it can reschedule itself or another task
// without deadlocking: // without deadlocking:
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock); REVERSE_LOCK(lock);
f(); f();
} }
} catch (...) { } catch (...) {
@ -91,7 +73,7 @@ void CScheduler::serviceQueue()
void CScheduler::stop(bool drain) void CScheduler::stop(bool drain)
{ {
{ {
boost::unique_lock<boost::mutex> lock(newTaskMutex); LOCK(newTaskMutex);
if (drain) if (drain)
stopWhenEmpty = true; stopWhenEmpty = true;
else else
@ -100,10 +82,10 @@ void CScheduler::stop(bool drain)
newTaskScheduled.notify_all(); newTaskScheduled.notify_all();
} }
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
{ {
{ {
boost::unique_lock<boost::mutex> lock(newTaskMutex); LOCK(newTaskMutex);
taskQueue.insert(std::make_pair(t, f)); taskQueue.insert(std::make_pair(t, f));
} }
newTaskScheduled.notify_one(); newTaskScheduled.notify_one();
@ -111,18 +93,18 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
{ {
schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds)); schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds));
} }
void CScheduler::MockForward(boost::chrono::seconds delta_seconds) void CScheduler::MockForward(std::chrono::seconds delta_seconds)
{ {
assert(delta_seconds.count() > 0 && delta_seconds < boost::chrono::hours{1}); assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
{ {
boost::unique_lock<boost::mutex> lock(newTaskMutex); LOCK(newTaskMutex);
// use temp_queue to maintain updated schedule // use temp_queue to maintain updated schedule
std::multimap<boost::chrono::system_clock::time_point, Function> temp_queue; std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
for (const auto& element : taskQueue) { for (const auto& element : taskQueue) {
temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
@ -147,10 +129,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds
scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds); scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
} }
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const std::chrono::system_clock::time_point &last) const
{ {
boost::unique_lock<boost::mutex> lock(newTaskMutex); LOCK(newTaskMutex);
size_t result = taskQueue.size(); size_t result = taskQueue.size();
if (!taskQueue.empty()) { if (!taskQueue.empty()) {
first = taskQueue.begin()->first; first = taskQueue.begin()->first;
@ -160,7 +142,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
} }
bool CScheduler::AreThreadsServicingQueue() const { bool CScheduler::AreThreadsServicingQueue() const {
boost::unique_lock<boost::mutex> lock(newTaskMutex); LOCK(newTaskMutex);
return nThreadsServicingQueue; return nThreadsServicingQueue;
} }
@ -174,7 +156,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
if (m_are_callbacks_running) return; if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return; if (m_callbacks_pending.empty()) return;
} }
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
} }
void SingleThreadedSchedulerClient::ProcessQueue() { void SingleThreadedSchedulerClient::ProcessQueue() {

View File

@ -7,11 +7,12 @@
// //
// NOTE: // NOTE:
// boost::thread / boost::chrono should be ported to std::thread / std::chrono // boost::thread should be ported to std::thread
// when we support C++11. // when we support C++11.
// //
#include <boost/chrono/chrono.hpp> #include <condition_variable>
#include <boost/thread.hpp> #include <functional>
#include <list>
#include <map> #include <map>
#include <sync.h> #include <sync.h>
@ -27,8 +28,8 @@
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
// //
// ... then at program shutdown, clean up the thread running serviceQueue: // ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
// t->interrupt(); // s->stop();
// t->join(); // t->join();
// delete t; // delete t;
// delete s; // Must be done after thread is interrupted/joined. // delete s; // Must be done after thread is interrupted/joined.
@ -43,7 +44,7 @@ public:
typedef std::function<void()> Function; typedef std::function<void()> Function;
// Call func at/after time t // Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now()); void schedule(Function f, std::chrono::system_clock::time_point t);
// Convenience method: call f once deltaMilliSeconds from now // Convenience method: call f once deltaMilliSeconds from now
void scheduleFromNow(Function f, int64_t deltaMilliSeconds); void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
@ -60,7 +61,7 @@ public:
* Iterates through items on taskQueue and reschedules them * Iterates through items on taskQueue and reschedules them
* to be delta_seconds sooner. * to be delta_seconds sooner.
*/ */
void MockForward(boost::chrono::seconds delta_seconds); void MockForward(std::chrono::seconds delta_seconds);
// To keep things as simple as possible, there is no unschedule. // To keep things as simple as possible, there is no unschedule.
@ -75,20 +76,20 @@ public:
// Returns number of tasks waiting to be serviced, // Returns number of tasks waiting to be serviced,
// and first and last task times // and first and last task times
size_t getQueueInfo(boost::chrono::system_clock::time_point &first, size_t getQueueInfo(std::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const; std::chrono::system_clock::time_point &last) const;
// Returns true if there are threads actively running in serviceQueue() // Returns true if there are threads actively running in serviceQueue()
bool AreThreadsServicingQueue() const; bool AreThreadsServicingQueue() const;
private: private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue; mutable Mutex newTaskMutex;
boost::condition_variable newTaskScheduled; std::condition_variable newTaskScheduled;
mutable boost::mutex newTaskMutex; std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
int nThreadsServicingQueue; int nThreadsServicingQueue GUARDED_BY(newTaskMutex);
bool stopRequested; bool stopRequested GUARDED_BY(newTaskMutex);
bool stopWhenEmpty; bool stopWhenEmpty GUARDED_BY(newTaskMutex);
bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
}; };
/** /**

View File

@ -13,7 +13,7 @@
#include <util/strencodings.h> #include <util/strencodings.h>
#include <util/threadnames.h> #include <util/threadnames.h>
#include <system_error>
#include <map> #include <map>
#include <set> #include <set>
@ -60,6 +60,11 @@ struct CLockLocation {
mutexName, sourceFile, itostr(sourceLine), (fTry ? " (TRY)" : ""), m_thread_name); mutexName, sourceFile, itostr(sourceLine), (fTry ? " (TRY)" : ""), m_thread_name);
} }
std::string Name() const
{
return mutexName;
}
private: private:
bool fTry; bool fTry;
std::string mutexName; std::string mutexName;
@ -155,6 +160,18 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs
push_lock(cs, CLockLocation(pszName, pszFile, nLine, fTry, util::ThreadGetInternalName())); push_lock(cs, CLockLocation(pszName, pszFile, nLine, fTry, util::ThreadGetInternalName()));
} }
void CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line)
{
if (!g_lockstack.empty()) {
const auto& lastlock = g_lockstack.back();
if (lastlock.first == cs) {
lockname = lastlock.second.Name();
return;
}
}
throw std::system_error(EPERM, std::generic_category(), strprintf("%s:%s %s was not most recent critical section locked", file, line, guardname));
}
void LeaveCritical() void LeaveCritical()
{ {
pop_lock(); pop_lock();

View File

@ -50,6 +50,7 @@ LEAVE_CRITICAL_SECTION(mutex); // no RAII
#ifdef DEBUG_LOCKORDER #ifdef DEBUG_LOCKORDER
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false); void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false);
void LeaveCritical(); void LeaveCritical();
void CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line);
std::string LocksHeld(); std::string LocksHeld();
void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs); void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs);
void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs); void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs);
@ -64,6 +65,7 @@ extern bool g_debug_lockorder_abort;
#else #else
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {} void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {}
void static inline LeaveCritical() {} void static inline LeaveCritical() {}
void static inline CheckLastCritical(void* cs, std::string& lockname, const char* guardname, const char* file, int line) {}
void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs) {} void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) ASSERT_EXCLUSIVE_LOCK(cs) {}
void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {} void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline DeleteLock(void* cs) {} void static inline DeleteLock(void* cs) {}
@ -171,8 +173,45 @@ public:
{ {
return Base::owns_lock(); return Base::owns_lock();
} }
protected:
// needed for reverse_lock
UniqueLock() { }
public:
/**
* An RAII-style reverse lock. Unlocks on construction and locks on destruction.
*/
class reverse_lock {
public:
explicit reverse_lock(UniqueLock& _lock, const char* _guardname, const char* _file, int _line) : lock(_lock), file(_file), line(_line) {
CheckLastCritical((void*)lock.mutex(), lockname, _guardname, _file, _line);
lock.unlock();
LeaveCritical();
lock.swap(templock);
}
~reverse_lock() {
templock.swap(lock);
EnterCritical(lockname.c_str(), file.c_str(), line, (void*)lock.mutex());
lock.lock();
}
private:
reverse_lock(reverse_lock const&);
reverse_lock& operator=(reverse_lock const&);
UniqueLock& lock;
UniqueLock templock;
std::string lockname;
const std::string file;
const int line;
};
friend class reverse_lock;
}; };
#define REVERSE_LOCK(g) decltype(g)::reverse_lock PASTE2(revlock, __COUNTER__)(g, #g, __FILE__, __LINE__)
template<typename MutexArg> template<typename MutexArg>
using DebugLock = UniqueLock<typename std::remove_reference<typename std::remove_pointer<MutexArg>::type>::type>; using DebugLock = UniqueLock<typename std::remove_reference<typename std::remove_pointer<MutexArg>::type>::type>;

View File

@ -2,7 +2,7 @@
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <reverselock.h> #include <sync.h>
#include <test/util/setup_common.h> #include <test/util/setup_common.h>
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
@ -11,21 +11,50 @@ BOOST_FIXTURE_TEST_SUITE(reverselock_tests, BasicTestingSetup)
BOOST_AUTO_TEST_CASE(reverselock_basics) BOOST_AUTO_TEST_CASE(reverselock_basics)
{ {
boost::mutex mutex; Mutex mutex;
boost::unique_lock<boost::mutex> lock(mutex); WAIT_LOCK(mutex, lock);
BOOST_CHECK(lock.owns_lock()); BOOST_CHECK(lock.owns_lock());
{ {
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock); REVERSE_LOCK(lock);
BOOST_CHECK(!lock.owns_lock()); BOOST_CHECK(!lock.owns_lock());
} }
BOOST_CHECK(lock.owns_lock()); BOOST_CHECK(lock.owns_lock());
} }
BOOST_AUTO_TEST_CASE(reverselock_multiple)
{
Mutex mutex2;
Mutex mutex;
WAIT_LOCK(mutex2, lock2);
WAIT_LOCK(mutex, lock);
// Make sure undoing two locks succeeds
{
REVERSE_LOCK(lock);
BOOST_CHECK(!lock.owns_lock());
REVERSE_LOCK(lock2);
BOOST_CHECK(!lock2.owns_lock());
}
BOOST_CHECK(lock.owns_lock());
BOOST_CHECK(lock2.owns_lock());
}
BOOST_AUTO_TEST_CASE(reverselock_errors) BOOST_AUTO_TEST_CASE(reverselock_errors)
{ {
boost::mutex mutex; Mutex mutex2;
boost::unique_lock<boost::mutex> lock(mutex); Mutex mutex;
WAIT_LOCK(mutex2, lock2);
WAIT_LOCK(mutex, lock);
#ifdef DEBUG_LOCKORDER
// Make sure trying to reverse lock a previous lock fails
try {
REVERSE_LOCK(lock2);
BOOST_CHECK(false); // REVERSE_LOCK(lock2) succeeded
} catch(...) { }
BOOST_CHECK(lock2.owns_lock());
#endif
// Make sure trying to reverse lock an unlocked lock fails // Make sure trying to reverse lock an unlocked lock fails
lock.unlock(); lock.unlock();
@ -34,7 +63,7 @@ BOOST_AUTO_TEST_CASE(reverselock_errors)
bool failed = false; bool failed = false;
try { try {
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock); REVERSE_LOCK(lock);
} catch(...) { } catch(...) {
failed = true; failed = true;
} }
@ -49,7 +78,7 @@ BOOST_AUTO_TEST_CASE(reverselock_errors)
lock.lock(); lock.lock();
BOOST_CHECK(lock.owns_lock()); BOOST_CHECK(lock.owns_lock());
{ {
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock); REVERSE_LOCK(lock);
BOOST_CHECK(!lock.owns_lock()); BOOST_CHECK(!lock.owns_lock());
} }

View File

@ -11,13 +11,13 @@
BOOST_AUTO_TEST_SUITE(scheduler_tests) BOOST_AUTO_TEST_SUITE(scheduler_tests)
static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime) static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime)
{ {
{ {
boost::unique_lock<boost::mutex> lock(mutex); boost::unique_lock<boost::mutex> lock(mutex);
counter += delta; counter += delta;
} }
boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min(); std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min();
if (rescheduleTime != noTime) { if (rescheduleTime != noTime) {
CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime); CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
s.schedule(f, rescheduleTime); s.schedule(f, rescheduleTime);
@ -45,15 +45,15 @@ BOOST_AUTO_TEST_CASE(manythreads)
auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000] auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000] auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
boost::chrono::system_clock::time_point now = start; std::chrono::system_clock::time_point now = start;
boost::chrono::system_clock::time_point first, last; std::chrono::system_clock::time_point first, last;
size_t nTasks = microTasks.getQueueInfo(first, last); size_t nTasks = microTasks.getQueueInfo(first, last);
BOOST_CHECK(nTasks == 0); BOOST_CHECK(nTasks == 0);
for (int i = 0; i < 100; ++i) { for (int i = 0; i < 100; ++i) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng); int whichCounter = zeroToNine(rng);
CScheduler::Function f = std::bind(&microTask, std::ref(microTasks), CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
@ -71,14 +71,14 @@ BOOST_AUTO_TEST_CASE(manythreads)
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks));
UninterruptibleSleep(std::chrono::microseconds{600}); UninterruptibleSleep(std::chrono::microseconds{600});
now = boost::chrono::system_clock::now(); now = std::chrono::system_clock::now();
// More threads and more tasks: // More threads and more tasks:
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks));
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng); int whichCounter = zeroToNine(rng);
CScheduler::Function f = std::bind(&microTask, std::ref(microTasks), CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
@ -142,7 +142,6 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
BOOST_CHECK_EQUAL(counter2, 100); BOOST_CHECK_EQUAL(counter2, 100);
} }
/* disabled for now. See discussion in https://github.com/bitcoin/bitcoin/pull/18174
BOOST_AUTO_TEST_CASE(mockforward) BOOST_AUTO_TEST_CASE(mockforward)
{ {
CScheduler scheduler; CScheduler scheduler;
@ -157,14 +156,14 @@ BOOST_AUTO_TEST_CASE(mockforward)
scheduler.scheduleFromNow(dummy, 8*min_in_milli); scheduler.scheduleFromNow(dummy, 8*min_in_milli);
// check taskQueue // check taskQueue
boost::chrono::system_clock::time_point first, last; std::chrono::system_clock::time_point first, last;
size_t num_tasks = scheduler.getQueueInfo(first, last); size_t num_tasks = scheduler.getQueueInfo(first, last);
BOOST_CHECK_EQUAL(num_tasks, 3ul); BOOST_CHECK_EQUAL(num_tasks, 3ul);
std::thread scheduler_thread([&]() { scheduler.serviceQueue(); }); std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
// bump the scheduler forward 5 minutes // bump the scheduler forward 5 minutes
scheduler.MockForward(boost::chrono::seconds(5*60)); scheduler.MockForward(std::chrono::seconds(5*60));
// ensure scheduler has chance to process all tasks queued for before 1 ms from now. // ensure scheduler has chance to process all tasks queued for before 1 ms from now.
scheduler.scheduleFromNow([&scheduler]{ scheduler.stop(false); }, 1); scheduler.scheduleFromNow([&scheduler]{ scheduler.stop(false); }, 1);
@ -178,11 +177,10 @@ BOOST_AUTO_TEST_CASE(mockforward)
BOOST_CHECK_EQUAL(counter, 2); BOOST_CHECK_EQUAL(counter, 2);
// check that the time of the remaining job has been updated // check that the time of the remaining job has been updated
boost::chrono::system_clock::time_point now = boost::chrono::system_clock::now(); std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
int delta = boost::chrono::duration_cast<boost::chrono::seconds>(first - now).count(); int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
// should be between 2 & 3 minutes from now // should be between 2 & 3 minutes from now
BOOST_CHECK(delta > 2*60 && delta < 3*60); BOOST_CHECK(delta > 2*60 && delta < 3*60);
} }
*/
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

View File

@ -70,6 +70,8 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup)
// shutdown sequence (c.f. Shutdown() in init.cpp) // shutdown sequence (c.f. Shutdown() in init.cpp)
txindex.Stop(); txindex.Stop();
// txindex job may be scheduled, so stop scheduler before destructing
m_node.scheduler->stop();
threadGroup.interrupt_all(); threadGroup.interrupt_all();
threadGroup.join_all(); threadGroup.join_all();

View File

@ -140,6 +140,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha
TestingSetup::~TestingSetup() TestingSetup::~TestingSetup()
{ {
if (m_node.scheduler) m_node.scheduler->stop();
threadGroup.interrupt_all(); threadGroup.interrupt_all();
threadGroup.join_all(); threadGroup.join_all();
GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().FlushBackgroundCallbacks();

View File

@ -66,7 +66,7 @@ function join_array {
ENABLED_CHECKS_REGEXP=$(join_array "|" "${ENABLED_CHECKS[@]}") ENABLED_CHECKS_REGEXP=$(join_array "|" "${ENABLED_CHECKS[@]}")
IGNORED_WARNINGS_REGEXP=$(join_array "|" "${IGNORED_WARNINGS[@]}") IGNORED_WARNINGS_REGEXP=$(join_array "|" "${IGNORED_WARNINGS[@]}")
WARNINGS=$(git ls-files -- "*.cpp" "*.h" ":(exclude)src/leveldb/" ":(exclude)src/crc32c/" ":(exclude)src/secp256k1/" ":(exclude)src/univalue/" | \ WARNINGS=$(git ls-files -- "*.cpp" "*.h" ":(exclude)src/leveldb/" ":(exclude)src/crc32c/" ":(exclude)src/secp256k1/" ":(exclude)src/univalue/" | \
xargs cppcheck --enable=all -j "$(getconf _NPROCESSORS_ONLN)" --language=c++ --std=c++11 --template=gcc -D__cplusplus -DCLIENT_VERSION_BUILD -DCLIENT_VERSION_IS_RELEASE -DCLIENT_VERSION_MAJOR -DCLIENT_VERSION_MINOR -DCLIENT_VERSION_REVISION -DCOPYRIGHT_YEAR -DDEBUG -DHAVE_WORKING_BOOST_SLEEP_FOR -I src/ -q 2>&1 | sort -u | \ xargs cppcheck --enable=all -j "$(getconf _NPROCESSORS_ONLN)" --language=c++ --std=c++11 --template=gcc -D__cplusplus -DCLIENT_VERSION_BUILD -DCLIENT_VERSION_IS_RELEASE -DCLIENT_VERSION_MAJOR -DCLIENT_VERSION_MINOR -DCLIENT_VERSION_REVISION -DCOPYRIGHT_YEAR -DDEBUG -I src/ -q 2>&1 | sort -u | \
grep -E "${ENABLED_CHECKS_REGEXP}" | \ grep -E "${ENABLED_CHECKS_REGEXP}" | \
grep -vE "${IGNORED_WARNINGS_REGEXP}") grep -vE "${IGNORED_WARNINGS_REGEXP}")
if [[ ${WARNINGS} != "" ]]; then if [[ ${WARNINGS} != "" ]]; then

View File

@ -53,7 +53,6 @@ EXPECTED_BOOST_INCLUDES=(
boost/algorithm/string/classification.hpp boost/algorithm/string/classification.hpp
boost/algorithm/string/replace.hpp boost/algorithm/string/replace.hpp
boost/algorithm/string/split.hpp boost/algorithm/string/split.hpp
boost/chrono/chrono.hpp
boost/date_time/posix_time/posix_time.hpp boost/date_time/posix_time/posix_time.hpp
boost/filesystem.hpp boost/filesystem.hpp
boost/filesystem/fstream.hpp boost/filesystem/fstream.hpp