mirror of
https://github.com/bitcoin/bitcoin.git
synced 2026-06-03 17:54:19 +02:00
Squashed 'src/ipc/libmultiprocess/' changes from 27c7e8e5a581..b4120d34bad2
b4120d34bad2 Merge bitcoin-core/libmultiprocess#192: doc: fix typos 6ecbdcd35a93 doc: fix typos a11e6905c238 Merge bitcoin-core/libmultiprocess#186: Fix mptest failures in bitcoin CI 6f340a583f2b doc: fix DrahtBot LLM Linter error c6f7fdf17350 type-context: revert client disconnect workaround e09143d2ea2f proxy-types: fix UndefinedBehaviorSanitizer: null-pointer-use 84b292fcc4db mptest: fix MemorySanitizer: use-of-uninitialized-value fe4a188803c6 proxy-io: fix race conditions in disconnect callback code d8011c83608e proxy-io: fix race conditions in ProxyClientBase cleanup handler 97e82ce19c47 doc: Add note about Waiter::m_mutex and interaction with the EventLoop::m_mutex 81d58f5580e8 refactor: Rename ProxyClient cleanup_it variable 07230f259f55 refactor: rename ProxyClient<Thread>::m_cleanup_it c0efaa5e8cb1 Merge chaincodelabs/libmultiprocess#187: ci: have bash scripts explicitly opt out of locale dependence. 0d986ff144cd mptest: fix race condition in TestSetup constructor d2f6aa2e84ef ci: add thread sanitizer job 3a6db38e561f ci: rename configs to .bash 401e0ce1d9c3 ci: add copyright to bash scripts e956467ae464 ci: export LC_ALL 8954cc0377d8 Merge chaincodelabs/libmultiprocess#184: Add CI jobs and fix clang-tidy and iwyu errors 757e13a75546 ci: add gnu32 cross-compiled 32-bit build 15bf349000eb doc: fix typo found by DrahtBot 1a598d5905f7 clang-tidy: drop 'bitcoin-*' check cbb1e43fdc6e ci: test libc++ instead of libstdc++ in one job 76313450c2c4 type-context: disable clang-tidy UndefinedBinaryOperatorResult error 4896e7fe51ba proxy-types: fix clang-tidy EnumCastOutOfRange error 060a73926956 proxy-types: fix clang-tidy StackAddressEscape error 977d721020f6 ci: add github actions jobs testing gcc, clang-20, clang-tidy, and iwyu 0d5f1faae5da iwyu: fix add/remove include errors 753d2b10cc27 util: fix clang-tidy modernize-use-equals-default error ae4f1dc2bb1a type-number: fix clang-tidy modernize-use-nullptr error 07a741bf6946 proxy-types: fix clang-tidy bugprone-use-after-move error 3673114bc9d9 proxy-types: fix clang-tidy bugprone-use-after-move error 422923f38485 proxy-types: fix clang-tidy bugprone-use-after-move error c6784c6adefa mpgen: disable clang-tidy misc-no-recursion error c5498aa11ba6 tidy: copy clang-tidy file from bitcoin core 258a617c1eec Merge chaincodelabs/libmultiprocess#160: refactor: EventLoop locking cleanups + client disconnect exception 84cf56a0b5f4 test: Test disconnects during IPC calls 949573da8411 Prevent IPC server crash if disconnected during IPC call 019839758085 Merge chaincodelabs/libmultiprocess#179: scripted-diff: Remove copyright year (ranges) ea38392960e1 Prevent EventLoop async cleanup thread early exit during shutdown 616d9a75d20a doc: Document ProxyClientBase destroy_connection option 56fff76f940b Improve IPC client disconnected exceptions 9b8ed3dc5f87 refactor: Add clang thread safety annotations to EventLoop 52256e730f51 refactor: Remove DestructorCatcher and AsyncCallable f24894794adf refactor: Drop addClient/removeClient methods 2b830e558e61 refactor: Use EventLoopRef instead of addClient/removeClient 315ff537fb65 refactor: Add ProxyContext EventLoop* member 9aaeec3678d3 proxy-io.h: Add EventLoopRef RAII class handle addClient/removeClient refcounting f58c8d8ba2f0 proxy-io.h: Add more detailed EventLoop comment 5108445e5d16 test: Add test coverage for client & server disconnections 59030c68cb5f Merge chaincodelabs/libmultiprocess#181: type-function.h: Fix CustomBuildField overload 688140b1dffc test: Add coverage for type-function.h 8b96229da58e type-function.h: Fix CustomBuildField overload fa2ff9a66842 scripted-diff: Remove copyright year (ranges) git-subtree-dir: src/ipc/libmultiprocess git-subtree-split: b4120d34bad2de28141c5770f6e8df8e54898987
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2019 The Bitcoin Core developers
|
||||
// Copyright (c) The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
@@ -6,13 +6,15 @@
|
||||
#include <mp/util.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <capnp/schema.h>
|
||||
#include <capnp/schema-parser.h>
|
||||
#include <cerrno>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <errno.h>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <initializer_list>
|
||||
#include <iostream>
|
||||
#include <kj/array.h>
|
||||
#include <kj/common.h>
|
||||
@@ -26,6 +28,7 @@
|
||||
#include <string>
|
||||
#include <system_error>
|
||||
#include <unistd.h>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#define PROXY_BIN "mpgen"
|
||||
@@ -76,7 +79,7 @@ static bool GetAnnotationInt32(const Reader& reader, uint64_t id, int32_t* resul
|
||||
return false;
|
||||
}
|
||||
|
||||
static void ForEachMethod(const capnp::InterfaceSchema& interface, const std::function<void(const capnp::InterfaceSchema& interface, const capnp::InterfaceSchema::Method)>& callback)
|
||||
static void ForEachMethod(const capnp::InterfaceSchema& interface, const std::function<void(const capnp::InterfaceSchema& interface, const capnp::InterfaceSchema::Method)>& callback) // NOLINT(misc-no-recursion)
|
||||
{
|
||||
for (const auto super : interface.getSuperclasses()) {
|
||||
ForEachMethod(super, callback);
|
||||
@@ -198,19 +201,45 @@ static void Generate(kj::StringPtr src_prefix,
|
||||
|
||||
std::ofstream cpp_server(output_path + ".proxy-server.c++");
|
||||
cpp_server << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
|
||||
cpp_server << "// IWYU pragma: no_include <kj/memory.h>\n";
|
||||
cpp_server << "// IWYU pragma: no_include <memory>\n";
|
||||
cpp_server << "// IWYU pragma: begin_keep\n";
|
||||
cpp_server << "#include <" << include_path << ".proxy.h>\n";
|
||||
cpp_server << "#include <" << include_path << ".proxy-types.h>\n";
|
||||
cpp_server << "#include <" << PROXY_TYPES << ">\n\n";
|
||||
cpp_server << "#include <capnp/generated-header-support.h>\n";
|
||||
cpp_server << "#include <cstring>\n";
|
||||
cpp_server << "#include <kj/async.h>\n";
|
||||
cpp_server << "#include <kj/common.h>\n";
|
||||
cpp_server << "#include <kj/exception.h>\n";
|
||||
cpp_server << "#include <mp/proxy.h>\n";
|
||||
cpp_server << "#include <mp/util.h>\n";
|
||||
cpp_server << "#include <" << PROXY_TYPES << ">\n";
|
||||
cpp_server << "// IWYU pragma: end_keep\n\n";
|
||||
cpp_server << "namespace mp {\n";
|
||||
|
||||
std::ofstream cpp_client(output_path + ".proxy-client.c++");
|
||||
cpp_client << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
|
||||
cpp_client << "// IWYU pragma: no_include <kj/memory.h>\n";
|
||||
cpp_client << "// IWYU pragma: no_include <memory>\n";
|
||||
cpp_client << "// IWYU pragma: begin_keep\n";
|
||||
cpp_client << "#include <" << include_path << ".h>\n";
|
||||
cpp_client << "#include <" << include_path << ".proxy.h>\n";
|
||||
cpp_client << "#include <" << include_path << ".proxy-types.h>\n";
|
||||
cpp_client << "#include <" << PROXY_TYPES << ">\n\n";
|
||||
cpp_client << "#include <capnp/generated-header-support.h>\n";
|
||||
cpp_client << "#include <cstring>\n";
|
||||
cpp_client << "#include <kj/common.h>\n";
|
||||
cpp_client << "#include <mp/proxy.h>\n";
|
||||
cpp_client << "#include <mp/util.h>\n";
|
||||
cpp_client << "#include <" << PROXY_TYPES << ">\n";
|
||||
cpp_client << "// IWYU pragma: end_keep\n\n";
|
||||
cpp_client << "namespace mp {\n";
|
||||
|
||||
std::ofstream cpp_types(output_path + ".proxy-types.c++");
|
||||
cpp_types << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
|
||||
cpp_types << "#include <" << include_path << ".proxy-types.h>\n";
|
||||
cpp_types << "// IWYU pragma: no_include \"mp/proxy.h\"\n";
|
||||
cpp_types << "// IWYU pragma: no_include \"mp/proxy-io.h\"\n";
|
||||
cpp_types << "#include <" << include_path << ".proxy.h>\n";
|
||||
cpp_types << "#include <" << include_path << ".proxy-types.h> // IWYU pragma: keep\n";
|
||||
cpp_types << "#include <" << PROXY_TYPES << ">\n\n";
|
||||
cpp_types << "namespace mp {\n";
|
||||
|
||||
@@ -226,10 +255,12 @@ static void Generate(kj::StringPtr src_prefix,
|
||||
inl << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
|
||||
inl << "#ifndef " << guard << "_PROXY_TYPES_H\n";
|
||||
inl << "#define " << guard << "_PROXY_TYPES_H\n\n";
|
||||
inl << "#include <" << include_path << ".proxy.h>\n";
|
||||
inl << "// IWYU pragma: no_include \"mp/proxy.h\"\n";
|
||||
inl << "#include <mp/proxy.h> // IWYU pragma: keep\n";
|
||||
inl << "#include <" << include_path << ".proxy.h> // IWYU pragma: keep\n";
|
||||
for (const auto annotation : file_schema.getProto().getAnnotations()) {
|
||||
if (annotation.getId() == INCLUDE_TYPES_ANNOTATION_ID) {
|
||||
inl << "#include <" << annotation.getValue().getText() << ">\n";
|
||||
inl << "#include \"" << annotation.getValue().getText() << "\" // IWYU pragma: export\n";
|
||||
}
|
||||
}
|
||||
inl << "namespace mp {\n";
|
||||
@@ -238,10 +269,10 @@ static void Generate(kj::StringPtr src_prefix,
|
||||
h << "// Generated by " PROXY_BIN " from " << src_file << "\n\n";
|
||||
h << "#ifndef " << guard << "_PROXY_H\n";
|
||||
h << "#define " << guard << "_PROXY_H\n\n";
|
||||
h << "#include <" << include_path << ".h>\n";
|
||||
h << "#include <" << include_path << ".h> // IWYU pragma: keep\n";
|
||||
for (const auto annotation : file_schema.getProto().getAnnotations()) {
|
||||
if (annotation.getId() == INCLUDE_ANNOTATION_ID) {
|
||||
h << "#include <" << annotation.getValue().getText() << ">\n";
|
||||
h << "#include \"" << annotation.getValue().getText() << "\" // IWYU pragma: export\n";
|
||||
}
|
||||
}
|
||||
h << "#include <" << PROXY_DECL << ">\n\n";
|
||||
|
||||
184
src/mp/proxy.cpp
184
src/mp/proxy.cpp
@@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2019 The Bitcoin Core developers
|
||||
// Copyright (c) The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
@@ -10,23 +10,23 @@
|
||||
#include <mp/type-threadmap.h>
|
||||
#include <mp/util.h>
|
||||
|
||||
#include <assert.h>
|
||||
#include <atomic>
|
||||
#include <capnp/blob.h>
|
||||
#include <capnp/capability.h>
|
||||
#include <capnp/rpc.h>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <kj/async-io.h>
|
||||
#include <kj/async.h>
|
||||
#include <kj/async-io.h>
|
||||
#include <kj/async-prelude.h>
|
||||
#include <kj/common.h>
|
||||
#include <kj/debug.h>
|
||||
#include <kj/exception.h>
|
||||
#include <kj/function.h>
|
||||
#include <kj/memory.h>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <stddef.h>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <sys/socket.h>
|
||||
@@ -37,9 +37,6 @@
|
||||
|
||||
namespace mp {
|
||||
|
||||
template <typename Interface>
|
||||
struct ProxyServer;
|
||||
|
||||
thread_local ThreadContext g_thread_context;
|
||||
|
||||
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
|
||||
@@ -48,12 +45,49 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
|
||||
m_loop.log() << "Uncaught exception in daemonized task.";
|
||||
}
|
||||
|
||||
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
|
||||
{
|
||||
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
|
||||
loop_lock->assert_locked(m_loop->m_mutex);
|
||||
m_loop->m_num_clients += 1;
|
||||
}
|
||||
|
||||
// Due to the conditionals in this function, MP_NO_TSA is required to avoid
|
||||
// error "error: mutex 'loop_lock' is not held on every path through here
|
||||
// [-Wthread-safety-analysis]"
|
||||
void EventLoopRef::reset(bool relock) MP_NO_TSA
|
||||
{
|
||||
if (auto* loop{m_loop}) {
|
||||
m_loop = nullptr;
|
||||
auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
|
||||
loop_lock->assert_locked(loop->m_mutex);
|
||||
assert(loop->m_num_clients > 0);
|
||||
loop->m_num_clients -= 1;
|
||||
if (loop->done()) {
|
||||
loop->m_cv.notify_all();
|
||||
int post_fd{loop->m_post_fd};
|
||||
loop_lock->unlock();
|
||||
char buffer = 0;
|
||||
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
|
||||
// By default, do not try to relock `loop_lock` after writing,
|
||||
// because the event loop could wake up and destroy itself and the
|
||||
// mutex might no longer exist.
|
||||
if (relock) loop_lock->lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
|
||||
|
||||
Connection::~Connection()
|
||||
{
|
||||
// Shut down RPC system first, since this will garbage collect Server
|
||||
// objects that were not freed before the connection was closed, some of
|
||||
// which may call addAsyncCleanup and add more cleanup callbacks which can
|
||||
// run below.
|
||||
// Shut down RPC system first, since this will garbage collect any
|
||||
// ProxyServer objects that were not freed before the connection was closed.
|
||||
// Typically all ProxyServer objects associated with this connection will be
|
||||
// freed before this call returns. However that will not be the case if
|
||||
// there are asynchronous IPC calls over this connection still currently
|
||||
// executing. In that case, Cap'n Proto will destroy the ProxyServer objects
|
||||
// after the calls finish.
|
||||
m_rpc_system.reset();
|
||||
|
||||
// ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
|
||||
@@ -98,23 +132,17 @@ Connection::~Connection()
|
||||
// on clean and unclean shutdowns. In unclean shutdown case when the
|
||||
// connection is broken, sync and async cleanup lists will filled with
|
||||
// callbacks. In the clean shutdown case both lists will be empty.
|
||||
Lock lock{m_loop->m_mutex};
|
||||
while (!m_sync_cleanup_fns.empty()) {
|
||||
m_sync_cleanup_fns.front()();
|
||||
m_sync_cleanup_fns.pop_front();
|
||||
CleanupList fn;
|
||||
fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
|
||||
Unlock(lock, fn.front());
|
||||
}
|
||||
while (!m_async_cleanup_fns.empty()) {
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
|
||||
m_async_cleanup_fns.pop_front();
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
m_loop.startAsyncThread(lock);
|
||||
m_loop.removeClient(lock);
|
||||
}
|
||||
|
||||
CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
|
||||
{
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
const Lock lock(m_loop->m_mutex);
|
||||
// Add cleanup callbacks to the front of list, so sync cleanup functions run
|
||||
// in LIFO order. This is a good approach because sync cleanup functions are
|
||||
// added as client objects are created, and it is natural to clean up
|
||||
@@ -128,13 +156,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
|
||||
|
||||
void Connection::removeSyncCleanup(CleanupIt it)
|
||||
{
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
const Lock lock(m_loop->m_mutex);
|
||||
m_sync_cleanup_fns.erase(it);
|
||||
}
|
||||
|
||||
void Connection::addAsyncCleanup(std::function<void()> fn)
|
||||
void EventLoop::addAsyncCleanup(std::function<void()> fn)
|
||||
{
|
||||
const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
|
||||
const Lock lock(m_mutex);
|
||||
// Add async cleanup callbacks to the back of the list. Unlike the sync
|
||||
// cleanup list, this list order is more significant because it determines
|
||||
// the order server objects are destroyed when there is a sudden disconnect,
|
||||
@@ -151,7 +179,8 @@ void Connection::addAsyncCleanup(std::function<void()> fn)
|
||||
// process, otherwise shared pointer counts of the CWallet objects (which
|
||||
// inherit from Chain::Notification) will not be 1 when WalletLoader
|
||||
// destructor runs and it will wait forever for them to be released.
|
||||
m_async_cleanup_fns.emplace(m_async_cleanup_fns.end(), std::move(fn));
|
||||
m_async_fns->emplace_back(std::move(fn));
|
||||
startAsyncThread();
|
||||
}
|
||||
|
||||
EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
|
||||
@@ -170,9 +199,9 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
|
||||
EventLoop::~EventLoop()
|
||||
{
|
||||
if (m_async_thread.joinable()) m_async_thread.join();
|
||||
const std::lock_guard<std::mutex> lock(m_mutex);
|
||||
const Lock lock(m_mutex);
|
||||
KJ_ASSERT(m_post_fn == nullptr);
|
||||
KJ_ASSERT(m_async_fns.empty());
|
||||
KJ_ASSERT(!m_async_fns);
|
||||
KJ_ASSERT(m_wait_fd == -1);
|
||||
KJ_ASSERT(m_post_fd == -1);
|
||||
KJ_ASSERT(m_num_clients == 0);
|
||||
@@ -188,6 +217,12 @@ void EventLoop::loop()
|
||||
g_thread_context.loop_thread = true;
|
||||
KJ_DEFER(g_thread_context.loop_thread = false);
|
||||
|
||||
{
|
||||
const Lock lock(m_mutex);
|
||||
assert(!m_async_fns);
|
||||
m_async_fns.emplace();
|
||||
}
|
||||
|
||||
kj::Own<kj::AsyncIoStream> wait_stream{
|
||||
m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
|
||||
int post_fd{m_post_fd};
|
||||
@@ -195,14 +230,14 @@ void EventLoop::loop()
|
||||
for (;;) {
|
||||
const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
|
||||
if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
Lock lock(m_mutex);
|
||||
if (m_post_fn) {
|
||||
Unlock(lock, *m_post_fn);
|
||||
m_post_fn = nullptr;
|
||||
m_cv.notify_all();
|
||||
} else if (done(lock)) {
|
||||
} else if (done()) {
|
||||
// Intentionally do not break if m_post_fn was set, even if done()
|
||||
// would return true, to ensure that the removeClient write(post_fd)
|
||||
// would return true, to ensure that the EventLoopRef write(post_fd)
|
||||
// call always succeeds and the loop does not exit between the time
|
||||
// that the done condition is set and the write call is made.
|
||||
break;
|
||||
@@ -213,76 +248,61 @@ void EventLoop::loop()
|
||||
log() << "EventLoop::loop bye.";
|
||||
wait_stream = nullptr;
|
||||
KJ_SYSCALL(::close(post_fd));
|
||||
const std::unique_lock<std::mutex> lock(m_mutex);
|
||||
const Lock lock(m_mutex);
|
||||
m_wait_fd = -1;
|
||||
m_post_fd = -1;
|
||||
m_async_fns.reset();
|
||||
m_cv.notify_all();
|
||||
}
|
||||
|
||||
void EventLoop::post(const std::function<void()>& fn)
|
||||
void EventLoop::post(kj::Function<void()> fn)
|
||||
{
|
||||
if (std::this_thread::get_id() == m_thread_id) {
|
||||
fn();
|
||||
return;
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
addClient(lock);
|
||||
m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
|
||||
Lock lock(m_mutex);
|
||||
EventLoopRef ref(*this, &lock);
|
||||
m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
|
||||
m_post_fn = &fn;
|
||||
int post_fd{m_post_fd};
|
||||
Unlock(lock, [&] {
|
||||
char buffer = 0;
|
||||
KJ_SYSCALL(write(post_fd, &buffer, 1));
|
||||
});
|
||||
m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
|
||||
removeClient(lock);
|
||||
m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
|
||||
}
|
||||
|
||||
void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
|
||||
|
||||
bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
|
||||
{
|
||||
m_num_clients -= 1;
|
||||
if (done(lock)) {
|
||||
m_cv.notify_all();
|
||||
int post_fd{m_post_fd};
|
||||
lock.unlock();
|
||||
char buffer = 0;
|
||||
KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
|
||||
void EventLoop::startAsyncThread()
|
||||
{
|
||||
assert (std::this_thread::get_id() == m_thread_id);
|
||||
if (m_async_thread.joinable()) {
|
||||
// Notify to wake up the async thread if it is already running.
|
||||
m_cv.notify_all();
|
||||
} else if (!m_async_fns.empty()) {
|
||||
} else if (!m_async_fns->empty()) {
|
||||
m_async_thread = std::thread([this] {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
while (true) {
|
||||
if (!m_async_fns.empty()) {
|
||||
addClient(lock);
|
||||
const std::function<void()> fn = std::move(m_async_fns.front());
|
||||
m_async_fns.pop_front();
|
||||
Lock lock(m_mutex);
|
||||
while (m_async_fns) {
|
||||
if (!m_async_fns->empty()) {
|
||||
EventLoopRef ref{*this, &lock};
|
||||
const std::function<void()> fn = std::move(m_async_fns->front());
|
||||
m_async_fns->pop_front();
|
||||
Unlock(lock, fn);
|
||||
if (removeClient(lock)) break;
|
||||
// Important to relock because of the wait() call below.
|
||||
ref.reset(/*relock=*/true);
|
||||
// Continue without waiting in case there are more async_fns
|
||||
continue;
|
||||
} else if (m_num_clients == 0) {
|
||||
break;
|
||||
}
|
||||
m_cv.wait(lock);
|
||||
m_cv.wait(lock.m_lock);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
bool EventLoop::done(std::unique_lock<std::mutex>& lock) const
|
||||
bool EventLoop::done() const
|
||||
{
|
||||
assert(m_num_clients >= 0);
|
||||
assert(lock.owns_lock());
|
||||
assert(lock.mutex() == &m_mutex);
|
||||
return m_num_clients == 0 && m_async_fns.empty();
|
||||
return m_num_clients == 0 && m_async_fns->empty();
|
||||
}
|
||||
|
||||
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
|
||||
@@ -293,18 +313,18 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
|
||||
thread = threads.emplace(
|
||||
std::piecewise_construct, std::forward_as_tuple(connection),
|
||||
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
|
||||
thread->second.setCleanup([&threads, &mutex, thread] {
|
||||
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
|
||||
// Note: it is safe to use the `thread` iterator in this cleanup
|
||||
// function, because the iterator would only be invalid if the map entry
|
||||
// was removed, and if the map entry is removed the ProxyClient<Thread>
|
||||
// destructor unregisters the cleanup.
|
||||
|
||||
// Connection is being destroyed before thread client is, so reset
|
||||
// thread client m_cleanup_it member so thread client destructor does not
|
||||
// try unregister this callback after connection is destroyed.
|
||||
thread->second.m_cleanup_it.reset();
|
||||
// thread client m_disconnect_cb member so thread client destructor does not
|
||||
// try to unregister this callback after connection is destroyed.
|
||||
// Remove connection pointer about to be destroyed from the map
|
||||
const std::unique_lock<std::mutex> lock(mutex);
|
||||
thread->second.m_disconnect_cb.reset();
|
||||
threads.erase(thread);
|
||||
});
|
||||
return {thread, true};
|
||||
@@ -315,16 +335,16 @@ ProxyClient<Thread>::~ProxyClient()
|
||||
// If thread is being destroyed before connection is destroyed, remove the
|
||||
// cleanup callback that was registered to handle the connection being
|
||||
// destroyed before the thread being destroyed.
|
||||
if (m_cleanup_it) {
|
||||
m_context.connection->removeSyncCleanup(*m_cleanup_it);
|
||||
if (m_disconnect_cb) {
|
||||
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
|
||||
}
|
||||
}
|
||||
|
||||
void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
|
||||
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
|
||||
{
|
||||
assert(fn);
|
||||
assert(!m_cleanup_it);
|
||||
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
|
||||
assert(!m_disconnect_cb);
|
||||
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
|
||||
}
|
||||
|
||||
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
|
||||
@@ -375,7 +395,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
|
||||
const std::string from = context.getParams().getName();
|
||||
std::promise<ThreadContext*> thread_context;
|
||||
std::thread thread([&thread_context, from, this]() {
|
||||
g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
|
||||
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
|
||||
g_thread_context.waiter = std::make_unique<Waiter>();
|
||||
thread_context.set_value(&g_thread_context);
|
||||
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
// Copyright (c) 2018-2019 The Bitcoin Core developers
|
||||
// Copyright (c) The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#include <mp/config.h>
|
||||
#include <mp/util.h>
|
||||
|
||||
#include <errno.h>
|
||||
#include <cerrno>
|
||||
#include <cstdio>
|
||||
#include <kj/common.h>
|
||||
#include <kj/string-tree.h>
|
||||
#include <pthread.h>
|
||||
#include <sstream>
|
||||
#include <stdio.h>
|
||||
#include <string>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
Reference in New Issue
Block a user