Files
bitcoin/src/ipc/capnp/protocol.cpp
Ryan Ofsky 2221c8814d depends: Update libmultiprocess library before converting to subtree
This should be the final update to the libmultiprocess package via the depends
system. It brings in the libmultiprocess cmake changes from
https://github.com/chaincodelabs/libmultiprocess/pull/136 needed to support
building as subtree. After this, a followup PR will add libmultiprocess as a
git subtree and depends will just use the git subtree instead of hardcoding its
own version hash.

Since there have been libmultiprocess API changes since the last update, this
commit also updates bitcoin code to be compatible with them.

This update brings in the following changes:

https://github.com/chaincodelabs/libmultiprocess/pull/121 ProxyClientBase: avoid static_cast to partially constructed object
https://github.com/chaincodelabs/libmultiprocess/pull/120 proxy-types.h: add static_assert to detect int/enum size mismatch
https://github.com/chaincodelabs/libmultiprocess/pull/127 ProxyClientBase: avoid static_cast to partially destructed object
https://github.com/chaincodelabs/libmultiprocess/pull/129 Fix "disconnected: write(m_post_fd, &buffer, 1): Broken pipe" EventLoop shutdown races.
https://github.com/chaincodelabs/libmultiprocess/pull/130 refactor: Add CleanupRun function to dedup clean list code
https://github.com/chaincodelabs/libmultiprocess/pull/131 doc: fix startAsyncThread comment
https://github.com/chaincodelabs/libmultiprocess/pull/133 Fix debian "libatomic not found" error in downstream builds
https://github.com/chaincodelabs/libmultiprocess/pull/94 c++ 20 cleanups
https://github.com/chaincodelabs/libmultiprocess/pull/135 refactor: proxy-types.h API cleanup
https://github.com/chaincodelabs/libmultiprocess/pull/136 cmake: Support being included with add_subdirectory
https://github.com/chaincodelabs/libmultiprocess/pull/137 doc: Fix broken markdown links
2025-01-26 14:13:47 -05:00

105 lines
3.2 KiB
C++

// Copyright (c) 2021 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <interfaces/init.h>
#include <ipc/capnp/context.h>
#include <ipc/capnp/init.capnp.h>
#include <ipc/capnp/init.capnp.proxy.h>
#include <ipc/capnp/protocol.h>
#include <ipc/exception.h>
#include <ipc/protocol.h>
#include <kj/async.h>
#include <logging.h>
#include <mp/proxy-io.h>
#include <mp/proxy-types.h>
#include <mp/util.h>
#include <util/threadnames.h>
#include <assert.h>
#include <errno.h>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <sys/socket.h>
#include <system_error>
#include <thread>
namespace ipc {
namespace capnp {
namespace {
void IpcLogFn(bool raise, std::string message)
{
LogDebug(BCLog::IPC, "%s\n", message);
if (raise) throw Exception(message);
}
class CapnpProtocol : public Protocol
{
public:
~CapnpProtocol() noexcept(true)
{
if (m_loop) {
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->removeClient(lock);
}
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
};
std::unique_ptr<interfaces::Init> connect(int fd, const char* exe_name) override
{
startLoop(exe_name);
return mp::ConnectStream<messages::Init>(*m_loop, fd);
}
void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override
{
startLoop(exe_name);
if (::listen(listen_fd, /*backlog=*/5) != 0) {
throw std::system_error(errno, std::system_category());
}
mp::ListenConnections<messages::Init>(*m_loop, listen_fd, init);
}
void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) override
{
assert(!m_loop);
mp::g_thread_context.thread_name = mp::ThreadName(exe_name);
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
if (ready_fn) ready_fn();
mp::ServeStream<messages::Init>(*m_loop, fd, init);
m_loop->loop();
m_loop.reset();
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
}
Context& context() override { return m_context; }
void startLoop(const char* exe_name)
{
if (m_loop) return;
std::promise<void> promise;
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
m_loop.emplace(exe_name, &IpcLogFn, &m_context);
{
std::unique_lock<std::mutex> lock(m_loop->m_mutex);
m_loop->addClient(lock);
}
promise.set_value();
m_loop->loop();
m_loop.reset();
});
promise.get_future().wait();
}
Context m_context;
std::thread m_loop_thread;
std::optional<mp::EventLoop> m_loop;
};
} // namespace
std::unique_ptr<Protocol> MakeCapnpProtocol() { return std::make_unique<CapnpProtocol>(); }
} // namespace capnp
} // namespace ipc