Files
bitcoin/include/mp/type-context.h
Ryan Ofsky 2478a15ef9 Squashed 'src/ipc/libmultiprocess/' changes from 1868a84451f..70f632bda8f
70f632bda8f Merge bitcoin-core/libmultiprocess#265: ci: set LC_ALL in shell scripts
8e8e564259a Merge bitcoin-core/libmultiprocess#249: fixes for race conditions on disconnects
05d34cc2ec3 ci: set LC_ALL in shell scripts
e606fd84a8c Merge bitcoin-core/libmultiprocess#264: ci: reduce nproc multipliers
ff0eed1bf18 refactor: Use loop variable in type-context.h
ff1d8ba172a refactor: Move type-context.h getParams() call closer to use
1dbc59a4aa3 race fix: m_on_cancel called after request finishes
1643d05ba07 test: m_on_cancel called after request finishes
f5509a31fcc race fix: getParams() called after request cancel
4a60c39f24a test: getParams() called after request cancel
f11ec29ed20 race fix: worker thread destroyed before it is initialized
a1d643348f4 test: worker thread destroyed before it is initialized
336023382c4 ci: reduce nproc multipliers
b090beb9651 Merge bitcoin-core/libmultiprocess#256: ci: cache gnu32 nix store
be8622816da ci: cache gnu32 nix store
975270b619c Merge bitcoin-core/libmultiprocess#263: ci: bump timeout factor to 40
09f10e5a598 ci: bump timeout factor to 40
db8f76ad290 Merge bitcoin-core/libmultiprocess#253: ci: run some Bitcoin Core CI jobs
55a9b557b19 ci: set Bitcoin Core CI test repetition
fb0fc84d556 ci: add TSan job with instrumented libc++
0f29c38725b ci: add Bitcoin Core IPC tests (ASan + macOS)
3f64320315d Merge bitcoin-core/libmultiprocess#262: ci: enable clang-tidy in macOS job, use nullptr
cd9f8bdc9f0 Merge bitcoin-core/libmultiprocess#258: log: add socket connected info message and demote destroy logs to debug
b5d6258a42f Merge bitcoin-core/libmultiprocess#255: fix: use unsigned char cast and sizeof in LogEscape escape sequence
d94688e2c32 Merge bitcoin-core/libmultiprocess#251: Improved CustomBuildField for std::optional in IPC/libmultiprocess
a9499fad755 mp: use nullptr with pthread_threadid_np
f499e37850f ci: enable clang-tidy in macOS job
98f1352159d log: add socket connected info message and demote destroy logs to debug
554a481ea73 fix: use unsigned char cast and sizeof in LogEscape escape sequence
1977b9f3f65 Use std::forward in CustomBuildField for std::optional to allow move semantics, resolves FIXME
22bec918c97 Merge bitcoin-core/libmultiprocess#247: type-map: Work around LLVM 22 "out of bounds index" error
8a5e3ae6ed2 Merge bitcoin-core/libmultiprocess#242: proxy-types: add CustomHasField hook to map Cap'n Proto values to null C++ values
e8d35246918 Merge bitcoin-core/libmultiprocess#246: doc: Bump version 8 > 9
97d877053b6 proxy-types: add CustomHasField hook for nullable decode paths
8c2f10252c9 refactor: add missing includes to mp/type-data.h
b1638aceb40 doc: Bump version 8 > 9
f61af487217 type-map: Work around LLVM 22 "out of bounds index" error

git-subtree-dir: src/ipc/libmultiprocess
git-subtree-split: 70f632bda8f80449b6240f98da768206a535a04e
2026-03-27 05:50:19 -04:00

230 lines
12 KiB
C++

// Copyright (c) The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef MP_PROXY_TYPE_CONTEXT_H
#define MP_PROXY_TYPE_CONTEXT_H
#include <mp/proxy-io.h>
#include <mp/util.h>
#include <kj/string.h>
namespace mp {
template <typename Output>
void CustomBuildField(TypeList<>,
Priority<1>,
ClientInvokeContext& invoke_context,
Output&& output,
typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
{
auto& connection = invoke_context.connection;
auto& thread_context = invoke_context.thread_context;
// Create local Thread::Server object corresponding to the current thread
// and pass a Thread::Client reference to it in the Context.callbackThread
// field so the function being called can make callbacks to this thread.
// Also store the Thread::Client reference in the callback_threads map so
// future calls over this connection can reuse it.
auto [callback_thread, _]{SetThread(
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
// Call remote ThreadMap.makeThread function so server will create a
// dedicated worker thread to run function calls from this thread. Store the
// Thread::Client reference it returns in the request_threads map.
auto make_request_thread{[&]{
// This code will only run if an IPC client call is being made for the
// first time on this thread. After the first call, subsequent calls
// will use the existing request thread. This code will also never run at
// all if the current thread is a request thread created for a different
// IPC client, because in that case PassField code (below) will have set
// request_thread to point to the calling thread.
auto request = connection.m_thread_map.makeThreadRequest();
request.setName(thread_context.thread_name);
return request.send().getResult(); // Nonblocking due to capnp request pipelining.
}};
auto [request_thread, _1]{SetThread(
GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
&connection, make_request_thread)};
auto context = output.init();
context.setThread(request_thread->second->m_client);
context.setCallbackThread(callback_thread->second->m_client);
}
//! PassField override for mp.Context arguments. Return asynchronously and call
//! function on other thread found in context.
template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
typename std::enable_if<
std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
kj::Promise<typename ServerContext::CallContext>>::type
{
auto& server = server_context.proxy_server;
EventLoop& loop = *server.m_context.loop;
int req = server_context.req;
// Keep a reference to the ProxyServer instance by assigning it to the self
// variable. ProxyServer instances are reference-counted and if the client
// drops its reference and the IPC call is canceled, this variable keeps the
// instance alive until the method finishes executing. The self variable
// needs to be destroyed on the event loop thread so it is freed in a sync()
// call below.
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req;
if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done());
ServerContext server_context{server, call_context, req};
// Before invoking the function, store a reference to the
// callbackThread provided by the client in the
// thread_local.request_threads map. This way, if this
// server thread needs to execute any RPCs that call back to
// the client, they will happen on the same client thread
// that is waiting for this function, just like what would
// happen if this were a normal function call made on the
// local stack.
//
// If the request_threads map already has an entry for this
// connection, it will be left unchanged, and it indicates
// that the current thread is an RPC client thread which is
// in the middle of an RPC call, and the current RPC call is
// a nested call from the remote thread handling that RPC
// call. In this case, the callbackThread value should point
// to the same thread already in the map, so there is no
// need to update the map.
auto& thread_context = g_thread_context;
auto& request_threads = thread_context.request_threads;
ConnThread request_thread;
bool inserted{false};
Mutex cancel_mutex;
Lock cancel_lock{cancel_mutex};
server_context.cancel_lock = &cancel_lock;
loop.sync([&] {
// Detect request being canceled before it executes.
if (cancel_monitor.m_canceled) {
server_context.request_canceled = true;
return;
}
// Detect request being canceled while it executes.
assert(!cancel_monitor.m_on_cancel);
cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() {
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
// Lock cancel_mutex here to block the event loop
// thread and prevent it from deleting the request's
// params and response structs while the execution
// thread is accessing them. Because this lock is
// released before the event loop thread does delete
// the structs, the mutex does not provide any
// protection from the event loop deleting the
// structs _before_ the execution thread acquires
// it. So in addition to locking the mutex, the
// execution thread always checks request_canceled
// as well before accessing the structs.
Lock cancel_lock{cancel_mutex};
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled. We know
// the request is not canceled currently because
// cancel_monitor.m_canceled was checked above and this
// code is running on the event loop thread.
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return Accessor::get(call_context.getParams()).getCallbackThread(); });
});
// If an entry was inserted into the request_threads map,
// remove it after calling fn.invoke. If an entry was not
// inserted, one already existed, meaning this must be a
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(
// Release the cancel lock before calling loop->sync and
// waiting for the event loop thread, because if a
// cancellation happened, it needs to run the on_cancel
// callback above. It's safe to release cancel_lock at
// this point because the fn.invoke() call below will be
// finished and no longer accessing the params or
// results structs.
cancel_lock.m_lock.unlock();
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
loop.sync([&] {
// Clear cancellation callback. At this point the
// method invocation finished and the result is
// either being returned, or discarded if a
// cancellation happened. So we do not need to be
// notified of cancellations after this point. Also
// we do not want to be notified because
// cancel_mutex and server_context could be out of
// scope when it happens.
cancel_monitor.m_on_cancel = nullptr;
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
// Waiter::m_mutex, so the ProxyClient<Thread>
// destructor is able to use EventLoop::mutex
// without violating lock order.
ConnThreads::node_type removed;
{
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
}
});
);
if (server_context.request_canceled) {
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
try {
fn.invoke(server_context, args...);
} catch (const InterruptException& e) {
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
}
})) {
MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
throw kj::mv(*exception);
}
return call_context;
// End of scope: if KJ_DEFER was reached, it runs here
};
// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto thread_client = context_arg.getThread();
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
MP_LOG(loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
} else {
MP_LOG(loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
});
// Use connection m_canceler object to cancel the result promise if the
// connection is destroyed. (By default Cap'n Proto does not cancel requests
// on disconnect, since it's possible clients might want to make requests
// and immediately disconnect without waiting for results, but not want the
// requests to be canceled.)
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
}
} // namespace mp
#endif // MP_PROXY_TYPE_CONTEXT_H