ipc: Use EventLoopRef instead of addClient/removeClient

Use EventLoopRef to avoid reference counting bugs and be more exception safe
and deal with removal of addClient/removeClient methods in
https://github.com/bitcoin-core/libmultiprocess/pull/160

A test update is also required due to
https://github.com/bitcoin-core/libmultiprocess/pull/160 to deal with changed
reference count semantics. In IpcPipeTest(), it is now necessary to destroy
the client Proxy object instead of just the client Connection object to
decrease the event loop reference count and allow the loop to exit so the test
does not hang on shutdown.
This commit is contained in:
Ryan Ofsky
2025-02-09 14:21:37 -05:00
parent 5c45bc989b
commit 9a9fb19536
2 changed files with 11 additions and 13 deletions

View File

@@ -41,10 +41,7 @@ class CapnpProtocol : public Protocol
public:
~CapnpProtocol() noexcept(true)
{
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
m_loop_ref.reset();
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
};
@@ -83,10 +80,7 @@ public:
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
{
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
}
m_loop_ref.emplace(*m_loop);
promise.set_value();
m_loop->loop();
m_loop.reset();
@@ -95,7 +89,12 @@ public:
}
Context m_context;
std::thread m_loop_thread;
//! EventLoop object which manages I/O events for all connections.
std::optional<mp::EventLoop> m_loop;
//! Reference to the same EventLoop. Increments the loops refcount on
//! creation, decrements on destruction. The loop thread exits when the
//! refcount reaches 0. Other IPC objects also hold their own EventLoopRef.
std::optional<mp::EventLoopRef> m_loop_ref;
};
} // namespace

View File

@@ -55,7 +55,6 @@ void IpcPipeTest()
{
// Setup: create FooImplementation object and listen for FooInterface requests
std::promise<std::unique_ptr<mp::ProxyClient<gen::FooInterface>>> foo_promise;
std::function<void()> disconnect_client;
std::thread thread([&]() {
mp::EventLoop loop("IpcPipeTest", [](bool raise, const std::string& log) { LogPrintf("LOG%i: %s\n", raise, log); });
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
@@ -63,9 +62,9 @@ void IpcPipeTest()
auto connection_client = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<mp::ProxyClient<gen::FooInterface>>(
connection_client->m_rpc_system->bootstrap(mp::ServerVatId().vat_id).castAs<gen::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
connection_client.get(), /* destroy_connection= */ true);
connection_client.release();
foo_promise.set_value(std::move(foo_client));
disconnect_client = [&] { loop.sync([&] { connection_client.reset(); }); };
auto connection_server = std::make_unique<mp::Connection>(loop, kj::mv(pipe.ends[1]), [&](mp::Connection& connection) {
auto foo_server = kj::heap<mp::ProxyServer<gen::FooInterface>>(std::make_shared<FooImplementation>(), connection);
@@ -106,8 +105,8 @@ void IpcPipeTest()
auto script2{foo->passScript(script1)};
BOOST_CHECK_EQUAL(HexStr(script1), HexStr(script2));
// Test cleanup: disconnect pipe and join thread
disconnect_client();
// Test cleanup: disconnect and join thread
foo.reset();
thread.join();
}