Squashed 'src/ipc/libmultiprocess/' changes from 13424cf2ecc1..a4f929696490

a4f929696490 Merge bitcoin-core/libmultiprocess#224: doc: fix typos
f4344ae87da0 Merge bitcoin-core/libmultiprocess#222: test, ci: Fix threadsanitizer errors in mptest
1434642b3804 doc: fix typos
73d22ba2e930 test: Fix tsan race in thread busy test
b74e1bba014d ci: Use tsan-instrumented cap'n proto in sanitizers job
c332774409ad test: Fix failing exception check in new thread busy test
ca3c05d56709 test: Use KJ_LOG instead of std::cout for logging
7eb1da120ab6 ci: Use tsan-instrumented libcxx in sanitizers job
ec86e4336e98 Merge bitcoin-core/libmultiprocess#220: Add log levels and advertise them to users via logging callback
515ce93ad349 Logging: Pass LogData struct to logging callback
213574ccc43d Logging: reclassify remaining log messages
e4de0412b430 Logging: Break out expensive log messages and classify them as Trace
408874a78fdc Logging: Use new logging macros
67b092d835cd Logging: Disable logging if messsage level is less than the requested level
d0a1ba7ebf21 Logging: add log levels to mirror Core's
463a8296d188 Logging: Disable moving or copying Logger
83a2e10c0b03 Logging: Add an EventLoop constructor to allow for user-specified log options
58cf47a7fc8c Merge bitcoin-core/libmultiprocess#221: test default PassField impl handles output parameters
db03a663f514 Merge bitcoin-core/libmultiprocess#214: Fix crash on simultaneous IPC calls using the same thread
afcc40b0f1e8 Merge bitcoin-core/libmultiprocess#213: util+doc: Clearer errors when attempting to run examples + polished docs
6db669628387 test In|Out parameter
29cf2ada75ea test default PassField impl handles output parameters
1238170f68e8 test: simultaneous IPC calls using same thread
eb069ab75d83 Fix crash on simultaneous IPC calls using the same thread
ec03a9639ab5 doc: Precision and typos
2b4348193551 doc: Where possible, remove links to ryanofsky/bitcoin/
286fe469c9c9 util: Add helpful error message when failing to execute file
47d79db8a552 Merge bitcoin-core/libmultiprocess#201: bug: fix mptest hang, ProxyClient<Thread> deadlock in disconnect handler
f15ae9c9b9fb Merge bitcoin-core/libmultiprocess#211: Add .gitignore
4a269b21b8c8 bug: fix ProxyClient<Thread> deadlock if disconnected as IPC call is returning
85df96482c49 Use try_emplace in SetThread instead of threads.find
ca9b380ea91a Use std::optional in ConnThreads to allow shortening locks
9b0799113557 doc: describe ThreadContext struct and synchronization requirements
d60db601ed9b proxy-io.h: add Waiter::m_mutex thread safety annotations
4e365b019a9f ci: Use -Wthread-safety not -Wthread-safety-analysis
15d7bafbb001 Add .gitignore
fe1cd8c76131 Merge bitcoin-core/libmultiprocess#208: ci: Test minimum cmake version in olddeps job
b713a0b7bfbc Merge bitcoin-core/libmultiprocess#207: ci: output CMake version in CI script
0f580397c913 ci: Test minimum cmake version in olddeps job
d603dcc0eef0 ci: output CMake version in CI script

git-subtree-dir: src/ipc/libmultiprocess
git-subtree-split: a4f92969649018ca70f949a09148bccfeaecd99a
This commit is contained in:
Ryan Ofsky
2025-10-07 10:12:08 -04:00
parent a334bbe9b7
commit abcd4c4ff9
20 changed files with 458 additions and 165 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
# CMake artifacts
/*build*
# Git artifacts
*.patch

View File

@@ -2,7 +2,7 @@ CI_DESC="CI job using LLVM-based libraries and tools (clang, libc++, clang-tidy,
CI_DIR=build-llvm
NIX_ARGS=(--arg enableLibcxx true)
export CXX=clang++
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter"
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter"
CMAKE_ARGS=(
-G Ninja
-DMP_ENABLE_CLANG_TIDY=ON

View File

@@ -1,5 +1,5 @@
CI_DESC="CI job using old Cap'n Proto version"
CI_DESC="CI job using old Cap'n Proto and cmake versions"
CI_DIR=build-olddeps
export CXXFLAGS="-Werror -Wall -Wextra -Wpedantic -Wno-unused-parameter -Wno-error=array-bounds"
NIX_ARGS=(--argstr capnprotoVersion "0.7.1")
NIX_ARGS=(--argstr capnprotoVersion "0.7.1" --argstr cmakeVersion "3.12.4")
BUILD_ARGS=(-k)

View File

@@ -1,7 +1,8 @@
CI_DESC="CI job running ThreadSanitizer"
CI_DIR=build-sanitize
NIX_ARGS=(--arg enableLibcxx true --argstr libcxxSanitizers "Thread" --argstr capnprotoSanitizers "thread")
export CXX=clang++
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread"
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety -Wno-unused-parameter -fsanitize=thread"
CMAKE_ARGS=()
BUILD_ARGS=(-k -j4)
BUILD_TARGETS=(mptest)

View File

@@ -17,6 +17,21 @@ fi
[ -n "${CI_CLEAN-}" ] && rm -rf "${CI_DIR}"
cmake -B "$CI_DIR" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
cmake --build "$CI_DIR" -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
ctest --test-dir "$CI_DIR" --output-on-failure
cmake --version
cmake_ver=$(cmake --version | awk '/version/{print $3; exit}')
ver_ge() { [ "$(printf '%s\n' "$2" "$1" | sort -V | head -n1)" = "$2" ]; }
src_dir=$PWD
mkdir -p "$CI_DIR"
cd "$CI_DIR"
cmake "$src_dir" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
if ver_ge "$cmake_ver" "3.15"; then
cmake --build . -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
else
# Older versions of cmake can only build one target at a time with --target,
# and do not support -t shortcut
for t in "${BUILD_TARGETS[@]}"; do
cmake --build . --target "$t" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
done
fi
ctest --output-on-failure

View File

@@ -2,8 +2,8 @@
Given an interface description of an object with one or more methods, libmultiprocess generates:
* A C++ `ProxyClient` class with an implementation of each interface method that sends a request over a socket, waits for a response, and returns the result.
* A C++ `ProxyServer` class that listens for requests over a socket and calls a wrapped C++ object implementing the same interface to actually execute the requests.
* A C++ `ProxyClient` class template specialization with an implementation of each interface method that sends a request over a socket, waits for a response, and returns the result.
* A C++ `ProxyServer` class template specialization that listens for requests over a socket and calls a wrapped C++ object implementing the same interface to actually execute the requests.
The function call ⇆ request translation supports input and output arguments, standard types like `unique_ptr`, `vector`, `map`, and `optional`, and bidirectional calls between processes through interface pointer and `std::function` arguments.
@@ -15,7 +15,7 @@ Libmultiprocess acts as a pure wrapper or layer over the underlying protocol. Cl
### Internals
The `ProxyClient` and `ProxyServer` generated classes are not directly exposed to the user, as described in [usage.md](usage.md). Instead, they wrap c++ interfaces and appear to the user as pointers to an interface. They are first instantiated when calling `ConnectStream` and `ServeStream` respectively for creating the `InitInterface`. These methods establish connections through sockets, internally creating `Connection` objects wrapping a `capnp::RpcSystem` configured for client and server mode respectively.
The `ProxyClient` and `ProxyServer` generated classes are not directly exposed to the user, as described in [usage.md](usage.md). Instead, they wrap C++ interfaces and appear to the user as pointers to an interface. They are first instantiated when calling `ConnectStream` and `ServeStream` respectively for creating the `InitInterface`. These methods establish connections through sockets, internally creating `Connection` objects wrapping a `capnp::RpcSystem` configured for client and server mode respectively.
The `InitInterface` interface will typically have methods which return other interfaces, giving the connecting process the ability to call other functions in the serving process. Interfaces can also have methods accepting other interfaces as parameters, giving serving processes the ability to call back and invoke functions in connecting processes. Creating new interfaces does not create new connections, and typically many interface objects will share the same connection.
@@ -23,13 +23,13 @@ Both `ConnectStream` and `ServeStream` also require an instantiation of the `Eve
When a generated method on the `ProxyClient` is called, it calls `clientInvoke` with the capnp-translated types. `clientInvoke` creates a self-executing promise (`kj::TaskSet`) that drives the execution of the request and gives ownership of it to the `EventLoop`. `clientInvoke` blocks until a response is received, or until there is a call from the server that needs to run on the same client thread, using a `Waiter` object.
On the server side, the `capnp::RpcSystem` receives the capnp request and invokes the corresponding c++ method through the corresponding `ProxyServer` and the heavily templated `serverInvoke` triggering a `ServerCall`. Its return values from the actual c++ methods are copied into capnp responses by `ServerRet` and exceptions are caught and copied by `ServerExcept`. The two are connected through `ServerField`. The main method driving execution of a request is `PassField`, which is invoked through `ServerField`. Instantiated interfaces, or capabilities in capnp speak, are tracked and owned by the server's `capnp::RpcSystem`.
On the server side, the `capnp::RpcSystem` receives the capnp request and invokes the corresponding C++ method through the corresponding `ProxyServer` and the heavily templated `serverInvoke` triggering a `ServerCall`. The return values from the actual C++ methods are copied into capnp responses by `ServerRet` and exceptions are caught and copied by `ServerExcept`. The two are connected through `ServerField`. The main method driving execution of a request is `PassField`, which is invoked through `ServerField`. Instantiated interfaces, or capabilities in capnp speak, are tracked and owned by the server's `capnp::RpcSystem`.
## Interface descriptions
As explained in the [usage](usage.md) document, interface descriptions need to be consumed both by the _libmultiprocess_ code generator, and by C++ code that calls and implements the interfaces. The C++ code only needs to know about C++ arguments and return types, while the code generator only needs to know about capnp arguments and return types, but both need to know class and method names, so the corresponding `.h` and `.capnp` source files contain some of the same information, and have to be kept in sync manually when methods or parameters change. Despite the redundancy, reconciling the interface definitions is designed to be _straightforward_ and _safe_. _Straightforward_ because there is no need to write manual serialization code or use awkward intermediate types like [`UniValue`](https://github.com/bitcoin/bitcoin/blob/master/src/univalue/include/univalue.h) instead of native types. _Safe_ because if there are any inconsistencies between API and data definitions (even minor ones like using a narrow int data type for a wider int API input), there are errors at build time instead of errors or bugs at runtime.
In the future, it would be possible to combine API and data definitions together using [C++ attributes](https://en.cppreference.com/w/cpp/language/attributes). To do this we would add attributes to the API definition files, and then generate the data definitions from the API definitions and attributes. I didn't take this approach mostly because it would be extra work, but also because until c++ standardizes reflection, this would require either hooking into compiler APIs like https://github.com/RosettaCommons/binder, or parsing c++ code manually like http://www.swig.org/.
In the future, it would be possible to combine API and data definitions together using [C++ attributes](https://en.cppreference.com/w/cpp/language/attributes). To do this we would add attributes to the API definition files, and then generate the data definitions from the API definitions and attributes. I didn't take this approach mostly because it would be extra work, but also because until C++ standardizes reflection, this would require either hooking into compiler APIs like https://github.com/RosettaCommons/binder, or parsing C++ code manually like http://www.swig.org/.
## What is `kj`?
@@ -39,6 +39,6 @@ basis in this library to construct the event-loop necessary to service IPC reque
## Future directions
_libmultiprocess_ uses the [Cap'n Proto](https://capnproto.org) interface description language and protocol, but it could be extended or changed to use a different IDL/protocol like [gRPC](https://grpc.io). The nice thing about _Cap'n Proto_ compared to _gRPC_ and most other lower level protocols is that it allows interface pointers (_Services_ in gRPC parlance) to be passed as method arguments and return values, so object references and bidirectional requests work out of the box. Supporting a lower-level protocol would require writing adding maps and tracking code to proxy objects.
_libmultiprocess_ uses the [Cap'n Proto](https://capnproto.org) interface description language and protocol, but it could be extended or changed to use a different IDL/protocol like [gRPC](https://grpc.io). The nice thing about _Cap'n Proto_ compared to _gRPC_ and most other lower level protocols is that it allows interface pointers (_Services_ in gRPC parlance) to be passed as method arguments and return values, so object references and bidirectional requests work out of the box. Supporting a lower-level protocol would require adding maps and tracking code to proxy objects.
_libmultiprocess_ is currently compatible with sandboxing but could add platform-specific sandboxing support or integration with a sandboxing library like [SAPI](https://github.com/google/sandboxed-api).

View File

@@ -4,9 +4,9 @@
_libmultiprocess_ is a library and code generator that allows calling C++ class interfaces across different processes. For an interface to be available from other processes, it needs two definitions:
- An **API definition** declaring how the interface is called. Included examples: [calculator.h](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/calculator.h), [printer.h](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/printer.h), [init.h](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/init.h). Bitcoin examples: [node.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/node.h), [wallet.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/wallet.h), [echo.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/echo.h), [init.h](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/interfaces/init.h).
- An **API definition** declaring how the interface is called. Included examples: [calculator.h](../example/calculator.h), [printer.h](../example/printer.h), [init.h](../example/init.h). Bitcoin examples: [node.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/node.h), [wallet.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/wallet.h), [echo.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/echo.h), [init.h](https://github.com/bitcoin/bitcoin/blob/master/src/interfaces/init.h).
- A **data definition** declaring how interface calls get sent across the wire. Included examples: [calculator.capnp](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/calculator.capnp), [printer.capnp](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/printer.capnp), [init.capnp](https://github.com/bitcoin-core/libmultiprocess/blob/master/example/init.capnp). Bitcoin examples: [node.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/node.capnp), [wallet.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/wallet.capnp), [echo.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/echo.capnp), [init.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/init.capnp).
- A **data definition** declaring how interface calls get sent across the wire. Included examples: [calculator.capnp](../example/calculator.capnp), [printer.capnp](../example/printer.capnp), [init.capnp](../example/init.capnp). Bitcoin examples: [node.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/node.capnp), [wallet.capnp](https://github.com/ryanofsky/bitcoin/blob/ipc-export/src/ipc/capnp/wallet.capnp), [echo.capnp](https://github.com/bitcoin/bitcoin/blob/master/src/ipc/capnp/echo.capnp), [init.capnp](https://github.com/bitcoin/bitcoin/blob/master/src/ipc/capnp/init.capnp).
The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.

View File

@@ -9,6 +9,7 @@
#include <charconv>
#include <cstring>
#include <fstream>
#include <functional>
#include <iostream>
#include <kj/async.h>
#include <kj/common.h>
@@ -37,6 +38,7 @@ public:
}
};
// Exercises deprecated log callback signature
static void LogPrint(bool raise, const std::string& message)
{
if (raise) throw std::runtime_error(message);

View File

@@ -35,10 +35,10 @@ static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const s
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, fd), pid);
}
static void LogPrint(bool raise, const std::string& message)
static void LogPrint(mp::LogMessage log_data)
{
if (raise) throw std::runtime_error(message);
std::ofstream("debug.log", std::ios_base::app) << message << std::endl;
if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl;
}
int main(int argc, char** argv)

View File

@@ -32,10 +32,10 @@ public:
std::unique_ptr<Printer> makePrinter() override { return std::make_unique<PrinterImpl>(); }
};
static void LogPrint(bool raise, const std::string& message)
static void LogPrint(mp::LogMessage log_data)
{
if (raise) throw std::runtime_error(message);
std::ofstream("debug.log", std::ios_base::app) << message << std::endl;
if (log_data.level == mp::Log::Raise) throw std::runtime_error(log_data.message);
std::ofstream("debug.log", std::ios_base::app) << log_data.message << std::endl;
}
int main(int argc, char** argv)

View File

@@ -66,8 +66,6 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
ProxyClient(const ProxyClient&) = delete;
~ProxyClient();
void setDisconnectCallback(const std::function<void()>& fn);
//! Reference to callback function that is run if there is a sudden
//! disconnect and the Connection object is destroyed before this
//! ProxyClient<Thread> object. The callback will destroy this object and
@@ -100,36 +98,29 @@ public:
EventLoop& m_loop;
};
using LogFn = std::function<void(bool raise, std::string message)>;
class Logger
{
public:
Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
~Logger() noexcept(false)
{
if (m_fn) m_fn(m_raise, m_buffer.str());
}
template <typename T>
friend Logger& operator<<(Logger& logger, T&& value)
{
if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
return logger;
}
template <typename T>
friend Logger& operator<<(Logger&& logger, T&& value)
{
return logger << std::forward<T>(value);
}
bool m_raise;
LogFn& m_fn;
std::ostringstream m_buffer;
//! Log flags. Update stringify function if changed!
enum class Log {
Trace = 0,
Debug,
Info,
Warning,
Error,
Raise,
};
kj::StringPtr KJ_STRINGIFY(Log flags);
struct LogMessage {
//! Message to be logged
std::string message;
//! The severity level of this message
Log level;
};
using LogFn = std::function<void(LogMessage)>;
struct LogOptions {
//! External logging callback.
@@ -138,8 +129,60 @@ struct LogOptions {
//! Maximum number of characters to use when representing
//! request and response structs as strings.
size_t max_chars{200};
//! Messages with a severity level less than log_level will not be
//! reported.
Log log_level{Log::Trace};
};
class Logger
{
public:
Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
Logger(Logger&&) = delete;
Logger& operator=(Logger&&) = delete;
Logger(const Logger&) = delete;
Logger& operator=(const Logger&) = delete;
~Logger() noexcept(false)
{
if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
}
template <typename T>
friend Logger& operator<<(Logger& logger, T&& value)
{
if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
return logger;
}
template <typename T>
friend Logger& operator<<(Logger&& logger, T&& value)
{
return logger << std::forward<T>(value);
}
explicit operator bool() const
{
return enabled();
}
private:
bool enabled() const
{
return m_options.log_fn && m_log_level >= m_options.log_level;
}
const LogOptions& m_options;
Log m_log_level;
std::ostringstream m_buffer;
};
#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
std::string LongThreadName(const char* exe_name);
//! Event loop implementation.
@@ -170,8 +213,19 @@ std::string LongThreadName(const char* exe_name);
class EventLoop
{
public:
//! Construct event loop object.
EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
//! Construct event loop object with default logging options.
EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
: EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
//! Construct event loop object with specified logging options.
EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
//! Backwards-compatible constructor for previous (deprecated) logging callback signature
EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
: EventLoop(exe_name,
LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
context){}
~EventLoop();
//! Run event loop. Does not return until shutdown. This should only be
@@ -212,15 +266,6 @@ public:
//! Check if loop should exit.
bool done() const MP_REQUIRES(m_mutex);
Logger log()
{
Logger logger(false, m_log_opts.log_fn);
logger << "{" << LongThreadName(m_exe_name) << "} ";
return logger;
}
Logger logPlain() { return {false, m_log_opts.log_fn}; }
Logger raise() { return {true, m_log_opts.log_fn}; }
//! Process name included in thread names so combined debug output from
//! multiple processes is easier to understand.
const char* m_exe_name;
@@ -283,18 +328,19 @@ struct Waiter
Waiter() = default;
template <typename Fn>
void post(Fn&& fn)
bool post(Fn&& fn)
{
const std::unique_lock<std::mutex> lock(m_mutex);
assert(!m_fn);
const Lock lock(m_mutex);
if (m_fn) return false;
m_fn = std::forward<Fn>(fn);
m_cv.notify_all();
return true;
}
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
void wait(Lock& lock, Predicate pred)
{
m_cv.wait(lock, [&] {
m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
// Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
// a lost-wakeup bug. A new m_fn and m_cv notification might be sent
// after the fn() call and before the lock.lock() call in this loop
@@ -317,9 +363,9 @@ struct Waiter
//! mutexes than necessary. This mutex can be held at the same time as
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
//! EventLoop::m_mutex is locked second.
std::mutex m_mutex;
Mutex m_mutex;
std::condition_variable m_cv;
std::optional<kj::Function<void()>> m_fn;
std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
};
//! Object holding network & rpc state associated with either an incoming server
@@ -544,29 +590,73 @@ void ProxyServerBase<Interface, Impl>::invokeDestroy()
CleanupRun(m_context.cleanup_fns);
}
using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
//! Map from Connection to local or remote thread handle which will be used over
//! that connection. This map will typically only contain one entry, but can
//! contain multiple if a single thread makes IPC calls over multiple
//! connections. A std::optional value type is used to avoid the map needing to
//! be locked while ProxyClient<Thread> objects are constructed, see
//! ThreadContext "Synchronization note" below.
using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
using ConnThread = ConnThreads::iterator;
// Retrieve ProxyClient<Thread> object associated with this connection from a
// map, or create a new one and insert it into the map. Return map iterator and
// inserted bool.
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
//! The thread_local ThreadContext g_thread_context struct provides information
//! about individual threads and a way of communicating between them. Because
//! it's a thread local struct, each ThreadContext instance is initialized by
//! the thread that owns it.
//!
//! ThreadContext is used for any client threads created externally which make
//! IPC calls, and for server threads created by
//! ProxyServer<ThreadMap>::makeThread() which execute IPC calls for clients.
//!
//! In both cases, the struct holds information like the thread name, and a
//! Waiter object where the EventLoop can post incoming IPC requests to execute
//! on the thread. The struct also holds ConnThread maps associating the thread
//! with local and remote ProxyClient<Thread> objects.
struct ThreadContext
{
//! Identifying string for debug.
std::string thread_name;
//! Waiter object used to allow client threads blocked waiting for a server
//! response to execute callbacks made from the client's corresponding
//! server thread.
//! Waiter object used to allow remote clients to execute code on this
//! thread. For server threads created by
//! ProxyServer<ThreadMap>::makeThread(), this is initialized in that
//! function. Otherwise, for client threads created externally, this is
//! initialized the first time the thread tries to make an IPC call. Having
//! a waiter is necessary for threads making IPC calls in case a server they
//! are calling expects them to execute a callback during the call, before
//! it sends a response.
//!
//! For IPC client threads, the Waiter pointer is never cleared and the Waiter
//! just gets destroyed when the thread does. For server threads created by
//! makeThread(), this pointer is set to null in the ~ProxyServer<Thread> as
//! a signal for the thread to exit and destroy itself. In both cases, the
//! same Waiter object is used across different calls and only created and
//! destroyed once for the lifetime of the thread.
std::unique_ptr<Waiter> waiter = nullptr;
//! When client is making a request to a server, this is the
//! `callbackThread` argument it passes in the request, used by the server
//! in case it needs to make callbacks into the client that need to execute
//! while the client is waiting. This will be set to a local thread object.
ConnThreads callback_threads;
//!
//! Synchronization note: The callback_thread and request_thread maps are
//! only ever accessed internally by this thread's destructor and externally
//! by Cap'n Proto event loop threads. Since it's possible for IPC client
//! threads to make calls over different connections that could have
//! different event loops, these maps are guarded by Waiter::m_mutex in case
//! different event loop threads add or remove map entries simultaneously.
//! However, individual ProxyClient<Thread> objects in the maps will only be
//! associated with one event loop and guarded by EventLoop::m_mutex. So
//! Waiter::m_mutex does not need to be held while accessing individual
//! ProxyClient<Thread> instances, and may even need to be released to
//! respect lock order and avoid locking Waiter::m_mutex before
//! EventLoop::m_mutex.
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
//! When client is making a request to a server, this is the `thread`
//! argument it passes in the request, used to control which thread on
@@ -575,7 +665,9 @@ struct ThreadContext
//! by makeThread. If a client call is being made from a thread currently
//! handling a server request, this will be set to the `callbackThread`
//! request thread argument passed in that request.
ConnThreads request_threads;
//!
//! Synchronization note: \ref callback_threads note applies here as well.
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
//! Whether this thread is a capnp event loop thread. Not really used except
//! to assert false if there's an attempt to execute a blocking operation
@@ -598,7 +690,7 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] {
loop.log() << "IPC client: unexpected network disconnect.";
MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
delete connection_ptr;
});
});
@@ -621,7 +713,7 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
});
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}

View File

@@ -568,7 +568,7 @@ template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_context.connection) {
client.m_context.loop->log() << "IPC client destroy " << typeid(client).name();
MP_LOG(*client.m_context.loop, Log::Info) << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
@@ -577,7 +577,7 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_context.loop->log() << "IPC server destroy " << typeid(server).name();
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server destroy " << typeid(server).name();
}
//! Entry point called by generated client code that looks like:
@@ -605,7 +605,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info)
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
@@ -617,7 +617,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
const char* disconnected = nullptr;
proxy_client.m_context.loop->sync([&]() {
if (!proxy_client.m_context.connection) {
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
const Lock lock(thread_context.waiter->m_mutex);
done = true;
disconnected = "IPC client method called after disconnect.";
thread_context.waiter->m_cv.notify_all();
@@ -629,22 +629,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
invoke_context.emplace(*proxy_client.m_context.connection, thread_context);
IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug)
<< "{" << thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
<< TypeName<typename Request::Params>();
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace)
<< "send data: " << LogEscape(request.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Debug)
<< "{" << thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
<< TypeName<typename Request::Results>();
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Trace)
<< "recv data: " << LogEscape(response.toString(), proxy_client.m_context.loop->m_log_opts.max_chars);
try {
IterateFields().handleChain(
*invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
} catch (...) {
exception = std::current_exception();
}
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
const Lock lock(thread_context.waiter->m_mutex);
done = true;
thread_context.waiter->m_cv.notify_all();
},
@@ -653,20 +657,20 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
disconnected = "IPC client method call interrupted by disconnect.";
} else {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.loop->logPlain()
MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Info)
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
const Lock lock(thread_context.waiter->m_mutex);
done = true;
thread_context.waiter->m_cv.notify_all();
}));
});
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
Lock lock(thread_context.waiter->m_mutex);
thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
if (!kj_exception.empty()) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << kj_exception;
if (disconnected) MP_LOGPLAIN(*proxy_client.m_context.loop, Log::Raise) << disconnected;
}
//! Invoke callable `fn()` that may return void. If it does return void, replace
@@ -700,8 +704,10 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;
int req = ++server_reqs;
server.m_context.loop->log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars);
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>();
MP_LOG(*server.m_context.loop, Log::Trace) << "request data: "
<< LogEscape(params.toString(), server.m_context.loop->m_log_opts.max_chars);
try {
using ServerContext = ServerInvokeContext<Server, CallContext>;
@@ -717,14 +723,15 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_context.loop->log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars);
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server send response #" << req << " " << TypeName<Results>();
MP_LOG(*server.m_context.loop, Log::Trace) << "response data: "
<< LogEscape(call_context.getResults().toString(), server.m_context.loop->m_log_opts.max_chars);
});
} catch (const std::exception& e) {
server.m_context.loop->log() << "IPC server unhandled exception: " << e.what();
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_context.loop->log() << "IPC server unhandled exception";
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server unhandled exception";
throw;
}
}

View File

@@ -25,7 +25,7 @@ void CustomBuildField(TypeList<>,
// Also store the Thread::Client reference in the callback_threads map so
// future calls over this connection can reuse it.
auto [callback_thread, _]{SetThread(
thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
// Call remote ThreadMap.makeThread function so server will create a
@@ -43,12 +43,12 @@ void CustomBuildField(TypeList<>,
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
}};
auto [request_thread, _1]{SetThread(
thread_context.request_threads, thread_context.waiter->m_mutex,
GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
&connection, make_request_thread)};
auto context = output.init();
context.setThread(request_thread->second.m_client);
context.setCallbackThread(callback_thread->second.m_client);
context.setThread(request_thread->second->m_client);
context.setCallbackThread(callback_thread->second->m_client);
}
//! PassField override for mp.Context arguments. Return asynchronously and call
@@ -89,29 +89,39 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// need to update the map.
auto& thread_context = g_thread_context;
auto& request_threads = thread_context.request_threads;
auto [request_thread, inserted]{SetThread(
request_threads, thread_context.waiter->m_mutex,
server.m_context.connection,
[&] { return context_arg.getCallbackThread(); })};
ConnThread request_thread;
bool inserted;
server.m_context.loop->sync([&] {
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
});
// If an entry was inserted into the requests_threads map,
// If an entry was inserted into the request_threads map,
// remove it after calling fn.invoke. If an entry was not
// inserted, one already existed, meaning this must be a
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(if (erase_thread) {
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
// Call erase here with a Connection* argument instead
// of an iterator argument, because the `request_thread`
// iterator may be invalid if the connection is closed
// during this function call. More specifically, the
// iterator may be invalid because SetThread adds a
// cleanup callback to the Connection destructor that
// erases the thread from the map, and also because the
// ProxyServer<Thread> destructor calls
// request_threads.clear().
request_threads.erase(server.m_context.connection);
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
// Waiter::m_mutex, so the ProxyClient<Thread>
// destructor is able to use EventLoop::mutex
// without violating lock order.
ConnThreads::node_type removed;
{
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
});
});
fn.invoke(server_context, args...);
}
@@ -140,11 +150,16 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.loop->log()
MP_LOG(*server.m_context.loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
throw std::runtime_error("thread busy");
}
} else {
server.m_context.loop->log()
MP_LOG(*server.m_context.loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}

View File

@@ -182,6 +182,17 @@ public:
std::unique_lock<std::mutex> m_lock;
};
template<typename T>
struct GuardedRef
{
Mutex& mutex;
T& ref MP_GUARDED_BY(mutex);
};
// CTAD for Clang 16: GuardedRef{mutex, x} -> GuardedRef<decltype(x)>
template <class U>
GuardedRef(Mutex&, U&) -> GuardedRef<U>;
//! Analog to std::lock_guard that unlocks instead of locks.
template <typename Lock>
struct UnlockGuard

View File

@@ -3,11 +3,19 @@
, enableLibcxx ? false # Whether to use libc++ toolchain and libraries instead of libstdc++
, minimal ? false # Whether to create minimal shell without extra tools (faster when cross compiling)
, capnprotoVersion ? null
, capnprotoSanitizers ? null # Optional sanitizers to build cap'n proto with
, cmakeVersion ? null
, libcxxSanitizers ? null # Optional LLVM_USE_SANITIZER value to use for libc++, see https://llvm.org/docs/CMake.html
}:
let
lib = pkgs.lib;
llvm = crossPkgs.llvmPackages_20;
llvmBase = crossPkgs.llvmPackages_21;
llvm = llvmBase // lib.optionalAttrs (libcxxSanitizers != null) {
libcxx = llvmBase.libcxx.override {
devExtraCmakeFlags = [ "-DLLVM_USE_SANITIZER=${libcxxSanitizers}" ];
};
};
capnprotoHashes = {
"0.7.0" = "sha256-Y/7dUOQPDHjniuKNRw3j8dG1NI9f/aRWpf8V0WzV9k8=";
"0.7.1" = "sha256-3cBpVmpvCXyqPUXDp12vCFCk32ZXWpkdOliNH37UwWE=";
@@ -34,15 +42,36 @@ let
} // (lib.optionalAttrs (lib.versionOlder capnprotoVersion "0.10") {
env = { }; # Drop -std=c++20 flag forced by nixpkgs
}));
capnproto = capnprotoBase.override (lib.optionalAttrs enableLibcxx { clangStdenv = llvm.libcxxStdenv; });
capnproto = (capnprotoBase.overrideAttrs (old: lib.optionalAttrs (capnprotoSanitizers != null) {
env = (old.env or { }) // {
CXXFLAGS =
lib.concatStringsSep " " [
(old.env.CXXFLAGS or "")
"-fsanitize=${capnprotoSanitizers}"
"-fno-omit-frame-pointer"
"-g"
];
};
})).override (lib.optionalAttrs enableLibcxx { clangStdenv = llvm.libcxxStdenv; });
clang = if enableLibcxx then llvm.libcxxClang else llvm.clang;
clang-tools = llvm.clang-tools.override { inherit enableLibcxx; };
cmakeHashes = {
"3.12.4" = "sha256-UlVYS/0EPrcXViz/iULUcvHA5GecSUHYS6raqbKOMZQ=";
};
cmakeBuild = if cmakeVersion == null then pkgs.cmake else (pkgs.cmake.overrideAttrs (old: {
version = cmakeVersion;
src = pkgs.fetchurl {
url = "https://cmake.org/files/v${lib.versions.majorMinor cmakeVersion}/cmake-${cmakeVersion}.tar.gz";
hash = lib.attrByPath [cmakeVersion] "" cmakeHashes;
};
patches = [];
})).override { isMinimalBuild = true; };
in crossPkgs.mkShell {
buildInputs = [
capnproto
];
nativeBuildInputs = with pkgs; [
cmake
cmakeBuild
include-what-you-use
ninja
] ++ lib.optionals (!minimal) [

View File

@@ -12,6 +12,7 @@
#include <atomic>
#include <capnp/capability.h>
#include <capnp/common.h> // IWYU pragma: keep
#include <capnp/rpc.h>
#include <condition_variable>
#include <functional>
@@ -23,9 +24,9 @@
#include <kj/debug.h>
#include <kj/function.h>
#include <kj/memory.h>
#include <kj/string.h>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <string>
@@ -42,7 +43,7 @@ thread_local ThreadContext g_thread_context;
void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
{
KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
m_loop.log() << "Uncaught exception in daemonized task.";
MP_LOG(m_loop, Log::Error) << "Uncaught exception in daemonized task.";
}
EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
@@ -81,6 +82,11 @@ ProxyContext::ProxyContext(Connection* connection) : connection(connection), loo
Connection::~Connection()
{
// Connection destructor is always called on the event loop thread. If this
// is a local disconnect, it will trigger I/O, so this needs to run on the
// event loop thread, and if there was a remote disconnect, this is called
// by an onDisconnect callback directly from the event loop thread.
assert(std::this_thread::get_id() == m_loop->m_thread_id);
// Shut down RPC system first, since this will garbage collect any
// ProxyServer objects that were not freed before the connection was closed.
// Typically all ProxyServer objects associated with this connection will be
@@ -156,6 +162,9 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
void Connection::removeSyncCleanup(CleanupIt it)
{
// Require cleanup functions to be removed on the event loop thread to avoid
// needing to deal with them being removed in the middle of a disconnect.
assert(std::this_thread::get_id() == m_loop->m_thread_id);
const Lock lock(m_loop->m_mutex);
m_sync_cleanup_fns.erase(it);
}
@@ -183,13 +192,13 @@ void EventLoop::addAsyncCleanup(std::function<void()> fn)
startAsyncThread();
}
EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
: m_exe_name(exe_name),
m_io_context(kj::setupAsyncIo()),
m_task_set(new kj::TaskSet(m_error_handler)),
m_log_opts(std::move(log_opts)),
m_context(context)
{
m_log_opts.log_fn = log_fn;
int fds[2];
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
m_wait_fd = fds[0];
@@ -243,9 +252,9 @@ void EventLoop::loop()
break;
}
}
log() << "EventLoop::loop done, cancelling event listeners.";
MP_LOG(*this, Log::Info) << "EventLoop::loop done, cancelling event listeners.";
m_task_set.reset();
log() << "EventLoop::loop bye.";
MP_LOG(*this, Log::Info) << "EventLoop::loop bye.";
wait_stream = nullptr;
KJ_SYSCALL(::close(post_fd));
const Lock lock(m_mutex);
@@ -305,29 +314,34 @@ bool EventLoop::done() const
return m_num_clients == 0 && m_async_fns->empty();
}
std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
{
const std::unique_lock<std::mutex> lock(mutex);
auto thread = threads.find(connection);
if (thread != threads.end()) return {thread, false};
thread = threads.emplace(
std::piecewise_construct, std::forward_as_tuple(connection),
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
// Note: it is safe to use the `thread` iterator in this cleanup
// function, because the iterator would only be invalid if the map entry
// was removed, and if the map entry is removed the ProxyClient<Thread>
// destructor unregisters the cleanup.
assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
ConnThread thread;
bool inserted;
{
const Lock lock(threads.mutex);
std::tie(thread, inserted) = threads.ref.try_emplace(connection);
}
if (inserted) {
thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] {
// Note: it is safe to use the `thread` iterator in this cleanup
// function, because the iterator would only be invalid if the map entry
// was removed, and if the map entry is removed the ProxyClient<Thread>
// destructor unregisters the cleanup.
// Connection is being destroyed before thread client is, so reset
// thread client m_disconnect_cb member so thread client destructor does not
// try to unregister this callback after connection is destroyed.
// Remove connection pointer about to be destroyed from the map
const std::unique_lock<std::mutex> lock(mutex);
thread->second.m_disconnect_cb.reset();
threads.erase(thread);
});
return {thread, true};
// Connection is being destroyed before thread client is, so reset
// thread client m_disconnect_cb member so thread client destructor does not
// try to unregister this callback after connection is destroyed.
thread->second->m_disconnect_cb.reset();
// Remove connection pointer about to be destroyed from the map
const Lock lock(threads.mutex);
threads.ref.erase(thread);
});
}
return {thread, inserted};
}
ProxyClient<Thread>::~ProxyClient()
@@ -336,17 +350,18 @@ ProxyClient<Thread>::~ProxyClient()
// cleanup callback that was registered to handle the connection being
// destroyed before the thread being destroyed.
if (m_disconnect_cb) {
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
// Remove disconnect callback on the event loop thread with
// loop->sync(), so if the connection is broken there is not a race
// between this thread trying to remove the callback and the disconnect
// handler attempting to call it.
m_context.loop->sync([&]() {
if (m_disconnect_cb) {
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
}
});
}
}
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
{
assert(fn);
assert(!m_disconnect_cb);
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
}
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
: m_thread_context(thread_context), m_thread(std::move(thread))
{
@@ -364,7 +379,7 @@ ProxyServer<Thread>::~ProxyServer()
assert(m_thread_context.waiter.get());
std::unique_ptr<Waiter> waiter;
{
const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
const Lock lock(m_thread_context.waiter->m_mutex);
//! Reset thread context waiter pointer, as shutdown signal for done
//! lambda passed as waiter->wait() argument in makeThread code below.
waiter = std::move(m_thread_context.waiter);
@@ -398,7 +413,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
Lock lock(g_thread_context.waiter->m_mutex);
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
@@ -416,4 +431,16 @@ std::string LongThreadName(const char* exe_name)
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
}
kj::StringPtr KJ_STRINGIFY(Log v)
{
switch (v) {
case Log::Trace: return "Trace";
case Log::Debug: return "Debug";
case Log::Info: return "Info";
case Log::Warning: return "Warning";
case Log::Error: return "Error";
case Log::Raise: return "Raise";
}
return "<Log?>";
}
} // namespace mp

View File

@@ -7,6 +7,8 @@
#include <cerrno>
#include <cstdio>
#include <filesystem>
#include <iostream>
#include <kj/common.h>
#include <kj/string-tree.h>
#include <pthread.h>
@@ -29,6 +31,8 @@
#include <pthread_np.h>
#endif // HAVE_PTHREAD_GETTHREADID_NP
namespace fs = std::filesystem;
namespace mp {
namespace {
@@ -138,6 +142,9 @@ void ExecProcess(const std::vector<std::string>& args)
argv.push_back(nullptr);
if (execvp(argv[0], argv.data()) != 0) {
perror("execvp failed");
if (errno == ENOENT && !args.empty()) {
std::cerr << "Missing executable: " << fs::weakly_canonical(args.front()) << '\n';
}
_exit(1);
}
}

View File

@@ -13,6 +13,8 @@ $Proxy.includeTypes("mp/test/foo-types.h");
interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
add @0 (a :Int32, b :Int32) -> (result :Int32);
addOut @19 (a :Int32, b :Int32) -> (ret :Int32);
addInOut @20 (x :Int32, sum :Int32) -> (sum :Int32);
mapSize @1 (map :List(Pair(Text, Text))) -> (result :Int32);
pass @2 (arg :FooStruct) -> (result :FooStruct);
raise @3 (arg :FooStruct) -> (error :FooStruct $Proxy.exception("mp::test::FooStruct"));

View File

@@ -62,6 +62,8 @@ class FooImplementation
{
public:
int add(int a, int b) { return a + b; }
void addOut(int a, int b, int& out) { out = a + b; }
void addInOut(int x, int& sum) { sum += x; }
int mapSize(const std::map<std::string, std::string>& map) { return map.size(); }
FooStruct pass(FooStruct foo) { return foo; }
void raise(FooStruct foo) { throw foo; }

View File

@@ -5,26 +5,32 @@
#include <mp/test/foo.capnp.h>
#include <mp/test/foo.capnp.proxy.h>
#include <atomic>
#include <capnp/capability.h>
#include <capnp/rpc.h>
#include <condition_variable>
#include <cstring>
#include <functional>
#include <future>
#include <iostream>
#include <kj/async.h>
#include <kj/async-io.h>
#include <kj/common.h>
#include <kj/debug.h>
#include <kj/exception.h>
#include <kj/memory.h>
#include <kj/string.h>
#include <kj/test.h>
#include <memory>
#include <mp/proxy.h>
#include <mp/proxy.capnp.h>
#include <mp/proxy-io.h>
#include <mp/util.h>
#include <optional>
#include <set>
#include <stdexcept>
#include <string>
#include <string_view>
#include <system_error>
#include <thread>
#include <utility>
#include <vector>
@@ -60,9 +66,10 @@ public:
TestSetup(bool client_owns_connection = true)
: thread{[&] {
EventLoop loop("mptest", [](bool raise, const std::string& log) {
std::cout << "LOG" << raise << ": " << log << "\n";
if (raise) throw std::runtime_error(log);
EventLoop loop("mptest", [](mp::LogMessage log) {
// Info logs are not printed by default, but will be shown with `mptest --verbose`
KJ_LOG(INFO, log.level, log.message);
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@@ -113,6 +120,11 @@ KJ_TEST("Call FooInterface methods")
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);
int ret;
foo->addOut(3, 4, ret);
KJ_EXPECT(ret == 7);
foo->addInOut(3, ret);
KJ_EXPECT(ret == 10);
FooStruct in;
in.name = "name";
@@ -297,5 +309,71 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value();
}
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
{
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
std::promise<void> signal;
foo->initThreadMap();
// Use callFnAsync() to get the client to set up the request_thread
// that will be used for the test.
setup.server->m_impl->m_fn = [&] {};
foo->callFnAsync();
ThreadContext& tc{g_thread_context};
Thread::Client *callback_thread, *request_thread;
foo->m_context.loop->sync([&] {
Lock lock(tc.waiter->m_mutex);
callback_thread = &tc.callback_threads.at(foo->m_context.connection)->m_client;
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
});
setup.server->m_impl->m_fn = [&] {
try
{
signal.get_future().get();
}
catch (const std::future_error& e)
{
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
}
};
auto client{foo->m_client};
bool caught_thread_busy = false;
// NOTE: '3' was chosen because it was the lowest number
// of simultaneous calls required to reliably catch a "thread busy" error
std::atomic<size_t> running{3};
foo->m_context.loop->sync([&]
{
for (size_t i = 0; i < running; i++)
{
auto request{client.callFnAsyncRequest()};
auto context{request.initContext()};
context.setCallbackThread(*callback_thread);
context.setThread(*request_thread);
foo->m_context.loop->m_task_set->add(request.send().then(
[&](auto&& results) {
running -= 1;
tc.waiter->m_cv.notify_all();
},
[&](kj::Exception&& e) {
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
"remote exception: std::exception: thread busy");
caught_thread_busy = true;
running -= 1;
signal.set_value();
tc.waiter->m_cv.notify_all();
}
));
}
});
{
Lock lock(tc.waiter->m_mutex);
tc.waiter->wait(lock, [&running] { return running == 0; });
}
KJ_EXPECT(caught_thread_busy);
}
} // namespace test
} // namespace mp