From 1e78f566d575a047a6f0b762bc79601e0208d103 Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Wed, 7 Sep 2022 13:57:18 +1000 Subject: [PATCH 1/5] net: add NetEventsInterface::g_msgproc_mutex There are many cases where we assume message processing is single-threaded in order for how we access node-related memory to be safe. Add an explicit mutex that we can use to document this, which allows the compiler to catch any cases where we try to access that memory from other threads and break that assumption. --- src/net.cpp | 4 ++++ src/net.h | 7 +++++-- src/net_processing.cpp | 12 +++++++++--- src/net_processing.h | 2 +- src/test/denialofservice_tests.cpp | 6 ++++++ src/test/fuzz/process_message.cpp | 2 ++ src/test/fuzz/process_messages.cpp | 2 ++ src/test/fuzz/util.h | 2 +- src/test/net_tests.cpp | 2 ++ src/test/util/net.h | 5 +++-- 10 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 6659b642466..02b4ecb593e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1978,8 +1978,12 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai } } +Mutex NetEventsInterface::g_msgproc_mutex; + void CConnman::ThreadMessageHandler() { + LOCK(NetEventsInterface::g_msgproc_mutex); + SetSyscallSandboxPolicy(SyscallSandboxPolicy::MESSAGE_HANDLER); while (!flagInterruptMsgProc) { diff --git a/src/net.h b/src/net.h index 66a228b3ec8..03d8354dd03 100644 --- a/src/net.h +++ b/src/net.h @@ -629,6 +629,9 @@ private: class NetEventsInterface { public: + /** Mutex for anything that is only accessed via the msg processing thread */ + static Mutex g_msgproc_mutex; + /** Initialize a peer (setup state, queue any initial messages) */ virtual void InitializeNode(CNode& node, ServiceFlags our_services) = 0; @@ -642,7 +645,7 @@ public: * @param[in] interrupt Interrupt condition for processing threads * @return True if there is more work to be done */ - virtual bool ProcessMessages(CNode* pnode, std::atomic& interrupt) = 0; + virtual bool ProcessMessages(CNode* pnode, std::atomic& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; /** * Send queued protocol messages to a given node. @@ -650,7 +653,7 @@ public: * @param[in] pnode The node which we are sending messages to. * @return True if there is more work to be done */ - virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0; + virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; protected: diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 74700580ad7..334c6596e92 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -515,9 +515,9 @@ public: void InitializeNode(CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex); bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex); /** Implement PeerManager */ void StartScheduledTasks(CScheduler& scheduler) override; @@ -532,7 +532,7 @@ public: void UnitTestMisbehaving(NodeId peer_id, int howmuch) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), howmuch, ""); }; void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex); void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override; private: @@ -3135,6 +3135,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) { + AssertLockHeld(g_msgproc_mutex); + LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); PeerRef peer = GetPeerRef(pfrom.GetId()); @@ -4748,6 +4750,8 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interruptMsgProc) { + AssertLockHeld(g_msgproc_mutex); + bool fMoreWork = false; PeerRef peer = GetPeerRef(pfrom->GetId()); @@ -5240,6 +5244,8 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) bool PeerManagerImpl::SendMessages(CNode* pto) { + AssertLockHeld(g_msgproc_mutex); + PeerRef peer = GetPeerRef(pto->GetId()); if (!peer) return false; const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); diff --git a/src/net_processing.h b/src/net_processing.h index 0a882b1e53b..0d0842fa8ef 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -84,7 +84,7 @@ public: /** Process a single message from a peer. Public for fuzz testing */ virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, - const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) = 0; + const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; /** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */ virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0; diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 7889156d515..4563e72cb0a 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -45,6 +45,8 @@ BOOST_FIXTURE_TEST_SUITE(denialofservice_tests, TestingSetup) // work. BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) { + LOCK(NetEventsInterface::g_msgproc_mutex); + ConnmanTestMsg& connman = static_cast(*m_node.connman); // Disable inactivity checks for this test to avoid interference connman.SetPeerConnectTimeout(99999s); @@ -274,6 +276,8 @@ BOOST_AUTO_TEST_CASE(block_relay_only_eviction) BOOST_AUTO_TEST_CASE(peer_discouragement) { + LOCK(NetEventsInterface::g_msgproc_mutex); + auto banman = std::make_unique(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME); auto connman = std::make_unique(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman); auto peerLogic = PeerManager::make(*connman, *m_node.addrman, banman.get(), @@ -386,6 +390,8 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) BOOST_AUTO_TEST_CASE(DoS_bantime) { + LOCK(NetEventsInterface::g_msgproc_mutex); + auto banman = std::make_unique(m_args.GetDataDirBase() / "banlist", nullptr, DEFAULT_MISBEHAVING_BANTIME); auto connman = std::make_unique(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman); auto peerLogic = PeerManager::make(*connman, *m_node.addrman, banman.get(), diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 272c9e6cdca..aca4e9bebea 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -73,6 +73,8 @@ void fuzz_target(FuzzBufferType buffer, const std::string& LIMIT_TO_MESSAGE_TYPE SetMockTime(1610000000); // any time to successfully reset ibd chainstate.ResetIbd(); + LOCK(NetEventsInterface::g_msgproc_mutex); + const std::string random_message_type{fuzzed_data_provider.ConsumeBytesAsString(CMessageHeader::COMMAND_SIZE).c_str()}; if (!LIMIT_TO_MESSAGE_TYPE.empty() && random_message_type != LIMIT_TO_MESSAGE_TYPE) { return; diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 12e682416c5..11678920c60 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -40,6 +40,8 @@ FUZZ_TARGET_INIT(process_messages, initialize_process_messages) SetMockTime(1610000000); // any time to successfully reset ibd chainstate.ResetIbd(); + LOCK(NetEventsInterface::g_msgproc_mutex); + std::vector peers; const auto num_peers_to_add = fuzzed_data_provider.ConsumeIntegralInRange(1, 3); for (int i = 0; i < num_peers_to_add; ++i) { diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index 6d652c922b3..36d55079cbb 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -328,7 +328,7 @@ auto ConsumeNode(FuzzedDataProvider& fuzzed_data_provider, const std::optional ConsumeNodeAsUniquePtr(FuzzedDataProvider& fdp, const std::optional& node_id_in = std::nullopt) { return ConsumeNode(fdp, node_id_in); } -void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept; +void FillNode(FuzzedDataProvider& fuzzed_data_provider, ConnmanTestMsg& connman, CNode& node) noexcept EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex); class FuzzedFileProvider { diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index f6642d32184..281dd0f21c0 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -805,6 +805,8 @@ BOOST_AUTO_TEST_CASE(LocalAddress_BasicLifecycle) BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message) { + LOCK(NetEventsInterface::g_msgproc_mutex); + // Tests the following scenario: // * -bind=3.4.5.6:20001 is specified // * we make an outbound connection to a peer diff --git a/src/test/util/net.h b/src/test/util/net.h index b339bee32a6..73543de4ca7 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -44,9 +44,10 @@ struct ConnmanTestMsg : public CConnman { ServiceFlags remote_services, ServiceFlags local_services, int32_t version, - bool relay_txs); + bool relay_txs) + EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex); - void ProcessMessagesOnce(CNode& node) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); } + void ProcessMessagesOnce(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex) { m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); } void NodeReceiveMsgBytes(CNode& node, Span msg_bytes, bool& complete) const; From bf12abe4542f2cf658516ea7e7fbbff6864c2208 Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Tue, 13 Sep 2022 12:22:18 +1000 Subject: [PATCH 2/5] net: drop cs_sendProcessing SendMessages() is now protected g_msgproc_mutex; so this additional per-node mutex is redundant. --- src/net.cpp | 5 +--- src/net.h | 4 +-- src/net_processing.cpp | 2 +- src/test/denialofservice_tests.cpp | 42 +++++++----------------------- src/test/fuzz/process_message.cpp | 5 +--- src/test/fuzz/process_messages.cpp | 5 +--- src/test/net_tests.cpp | 5 +--- src/test/util/net.cpp | 10 ++----- 8 files changed, 18 insertions(+), 60 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 02b4ecb593e..fca5aeac213 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2005,10 +2005,7 @@ void CConnman::ThreadMessageHandler() if (flagInterruptMsgProc) return; // Send messages - { - LOCK(pnode->cs_sendProcessing); - m_msgproc->SendMessages(pnode); - } + m_msgproc->SendMessages(pnode); if (flagInterruptMsgProc) return; diff --git a/src/net.h b/src/net.h index 03d8354dd03..1bb9754a484 100644 --- a/src/net.h +++ b/src/net.h @@ -377,8 +377,6 @@ public: std::list vProcessMsg GUARDED_BY(cs_vProcessMsg); size_t nProcessQueueSize GUARDED_BY(cs_vProcessMsg){0}; - RecursiveMutex cs_sendProcessing; - uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; std::atomic m_last_send{0s}; @@ -653,7 +651,7 @@ public: * @param[in] pnode The node which we are sending messages to. * @return True if there is more work to be done */ - virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; + virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; protected: diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 334c6596e92..387e7732c15 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -516,7 +516,7 @@ public: void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex); - bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(pto->cs_sendProcessing) + bool SendMessages(CNode* pto) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex); /** Implement PeerManager */ diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 4563e72cb0a..7150698e644 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -82,10 +82,8 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) } // Test starts here - { - LOCK(dummyNode1.cs_sendProcessing); - BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders - } + BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders + { LOCK(dummyNode1.cs_vSend); BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); @@ -95,20 +93,14 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) int64_t nStartTime = GetTime(); // Wait 21 minutes SetMockTime(nStartTime+21*60); - { - LOCK(dummyNode1.cs_sendProcessing); - BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders - } + BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders { LOCK(dummyNode1.cs_vSend); BOOST_CHECK(dummyNode1.vSendMsg.size() > 0); } // Wait 3 more minutes SetMockTime(nStartTime+24*60); - { - LOCK(dummyNode1.cs_sendProcessing); - BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in disconnect - } + BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in disconnect BOOST_CHECK(dummyNode1.fDisconnect == true); peerman.FinalizeNode(dummyNode1); @@ -312,10 +304,8 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) nodes[0]->fSuccessfullyConnected = true; connman->AddTestNode(*nodes[0]); peerLogic->UnitTestMisbehaving(nodes[0]->GetId(), DISCOURAGEMENT_THRESHOLD); // Should be discouraged - { - LOCK(nodes[0]->cs_sendProcessing); - BOOST_CHECK(peerLogic->SendMessages(nodes[0])); - } + BOOST_CHECK(peerLogic->SendMessages(nodes[0])); + BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(nodes[0]->fDisconnect); BOOST_CHECK(!banman->IsDiscouraged(other_addr)); // Different address, not discouraged @@ -334,10 +324,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) nodes[1]->fSuccessfullyConnected = true; connman->AddTestNode(*nodes[1]); peerLogic->UnitTestMisbehaving(nodes[1]->GetId(), DISCOURAGEMENT_THRESHOLD - 1); - { - LOCK(nodes[1]->cs_sendProcessing); - BOOST_CHECK(peerLogic->SendMessages(nodes[1])); - } + BOOST_CHECK(peerLogic->SendMessages(nodes[1])); // [0] is still discouraged/disconnected. BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(nodes[0]->fDisconnect); @@ -345,10 +332,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) BOOST_CHECK(!banman->IsDiscouraged(addr[1])); BOOST_CHECK(!nodes[1]->fDisconnect); peerLogic->UnitTestMisbehaving(nodes[1]->GetId(), 1); // [1] reaches discouragement threshold - { - LOCK(nodes[1]->cs_sendProcessing); - BOOST_CHECK(peerLogic->SendMessages(nodes[1])); - } + BOOST_CHECK(peerLogic->SendMessages(nodes[1])); // Expect both [0] and [1] to be discouraged/disconnected now. BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(nodes[0]->fDisconnect); @@ -371,10 +355,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) nodes[2]->fSuccessfullyConnected = true; connman->AddTestNode(*nodes[2]); peerLogic->UnitTestMisbehaving(nodes[2]->GetId(), DISCOURAGEMENT_THRESHOLD); - { - LOCK(nodes[2]->cs_sendProcessing); - BOOST_CHECK(peerLogic->SendMessages(nodes[2])); - } + BOOST_CHECK(peerLogic->SendMessages(nodes[2])); BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(banman->IsDiscouraged(addr[1])); BOOST_CHECK(banman->IsDiscouraged(addr[2])); @@ -417,10 +398,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) dummyNode.fSuccessfullyConnected = true; peerLogic->UnitTestMisbehaving(dummyNode.GetId(), DISCOURAGEMENT_THRESHOLD); - { - LOCK(dummyNode.cs_sendProcessing); - BOOST_CHECK(peerLogic->SendMessages(&dummyNode)); - } + BOOST_CHECK(peerLogic->SendMessages(&dummyNode)); BOOST_CHECK(banman->IsDiscouraged(addr)); peerLogic->FinalizeNode(dummyNode); diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index aca4e9bebea..f6a35da4fc0 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -94,10 +94,7 @@ void fuzz_target(FuzzBufferType buffer, const std::string& LIMIT_TO_MESSAGE_TYPE GetTime(), std::atomic{false}); } catch (const std::ios_base::failure&) { } - { - LOCK(p2p_node.cs_sendProcessing); - g_setup->m_node.peerman->SendMessages(&p2p_node); - } + g_setup->m_node.peerman->SendMessages(&p2p_node); SyncWithValidationInterfaceQueue(); g_setup->m_node.connman->StopNodes(); } diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 11678920c60..1df1717ec36 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -72,10 +72,7 @@ FUZZ_TARGET_INIT(process_messages, initialize_process_messages) connman.ProcessMessagesOnce(random_node); } catch (const std::ios_base::failure&) { } - { - LOCK(random_node.cs_sendProcessing); - g_setup->m_node.peerman->SendMessages(&random_node); - } + g_setup->m_node.peerman->SendMessages(&random_node); } SyncWithValidationInterfaceQueue(); g_setup->m_node.connman->StopNodes(); diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index 281dd0f21c0..5de986c5f37 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -891,10 +891,7 @@ BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message) } }; - { - LOCK(peer.cs_sendProcessing); - m_node.peerman->SendMessages(&peer); - } + m_node.peerman->SendMessages(&peer); BOOST_CHECK(sent); diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 21273ac5c1c..2e3e16e6818 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -44,10 +44,7 @@ void ConnmanTestMsg::Handshake(CNode& node, (void)connman.ReceiveMsgFrom(node, msg_version); node.fPauseSend = false; connman.ProcessMessagesOnce(node); - { - LOCK(node.cs_sendProcessing); - peerman.SendMessages(&node); - } + peerman.SendMessages(&node); if (node.fDisconnect) return; assert(node.nVersion == version); assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION)); @@ -60,10 +57,7 @@ void ConnmanTestMsg::Handshake(CNode& node, (void)connman.ReceiveMsgFrom(node, msg_verack); node.fPauseSend = false; connman.ProcessMessagesOnce(node); - { - LOCK(node.cs_sendProcessing); - peerman.SendMessages(&node); - } + peerman.SendMessages(&node); assert(node.fSuccessfullyConnected == true); } } From a66a7ccb822f0f1f554d27d04735b7fb585d3b71 Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Tue, 13 Sep 2022 12:24:45 +1000 Subject: [PATCH 3/5] net_processing: add thread safety annotations for Peer members accessed only via the msgproc thread --- src/net_processing.cpp | 53 ++++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 387e7732c15..ce32a7d236b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -264,10 +264,10 @@ struct Peer { /** The feerate in the most recent BIP133 `feefilter` message sent to the peer. * It is *not* a p2p protocol violation for the peer to send us * transactions with a lower fee rate than this. See BIP133. */ - CAmount m_fee_filter_sent{0}; + CAmount m_fee_filter_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; /** Timestamp after which we will send the next BIP133 `feefilter` message * to the peer. */ - std::chrono::microseconds m_next_send_feefilter{0}; + std::chrono::microseconds m_next_send_feefilter GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; struct TxRelay { mutable RecursiveMutex m_bloom_filter_mutex; @@ -298,7 +298,7 @@ struct Peer { std::atomic m_last_mempool_req{0s}; /** The next time after which we will send an `inv` message containing * transaction announcements to this peer. */ - std::chrono::microseconds m_next_inv_send_time{0}; + std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0}; /** Minimum fee rate with which to filter transaction announcements to this node. See BIP133. */ std::atomic m_fee_filter_received{0}; @@ -319,7 +319,7 @@ struct Peer { }; /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ - std::vector m_addrs_to_send; + std::vector m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex); /** Probabilistic filter to track recent addr messages relayed with this * peer. Used to avoid relaying redundant addresses to this peer. * @@ -329,7 +329,7 @@ struct Peer { * * Presence of this filter must correlate with m_addr_relay_enabled. **/ - std::unique_ptr m_addr_known; + std::unique_ptr m_addr_known GUARDED_BY(NetEventsInterface::g_msgproc_mutex); /** Whether we are participating in address relay with this connection. * * We set this bool to true for outbound peers (other than @@ -346,7 +346,7 @@ struct Peer { * initialized.*/ std::atomic_bool m_addr_relay_enabled{false}; /** Whether a getaddr request to this peer is outstanding. */ - bool m_getaddr_sent{false}; + bool m_getaddr_sent GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Guards address sending timers. */ mutable Mutex m_addr_send_times_mutex; /** Time point to send the next ADDR message to this peer. */ @@ -357,12 +357,12 @@ struct Peer { * messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */ std::atomic_bool m_wants_addrv2{false}; /** Whether this peer has already sent us a getaddr message. */ - bool m_getaddr_recvd{false}; + bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Number of addresses that can be processed from this peer. Start at 1 to * permit self-announcement. */ - double m_addr_token_bucket{1.0}; + double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0}; /** When m_addr_token_bucket was last updated */ - std::chrono::microseconds m_addr_token_timestamp{GetTime()}; + std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime()}; /** Total number of addresses that were dropped due to rate limiting. */ std::atomic m_addr_rate_limited{0}; /** Total number of addresses that were processed (excludes rate-limited ones). */ @@ -372,7 +372,7 @@ struct Peer { std::set m_orphan_work_set GUARDED_BY(g_cs_orphans); /** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */ - bool m_inv_triggered_getheaders_before_sync{false}; + bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Protects m_getdata_requests **/ Mutex m_getdata_requests_mutex; @@ -380,7 +380,7 @@ struct Peer { std::deque m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); /** Time of the last getheaders message to this peer */ - NodeClock::time_point m_last_getheaders_timestamp{}; + NodeClock::time_point m_last_getheaders_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){}; /** Protects m_headers_sync **/ Mutex m_headers_sync_mutex; @@ -537,7 +537,7 @@ public: private: /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ - void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex); /** If we have extra outbound peers, try to disconnect the one with the oldest block announcement */ void EvictExtraOutboundPeers(std::chrono::seconds now) EXCLUSIVE_LOCKS_REQUIRED(cs_main); @@ -601,7 +601,7 @@ private: void ProcessHeadersMessage(CNode& pfrom, Peer& peer, std::vector&& headers, bool via_compact_block) - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, g_msgproc_mutex); /** Various helpers for headers processing, invoked by ProcessHeadersMessage() */ /** Return true if headers are continuous and have valid proof-of-work (DoS points assigned on failure) */ bool CheckHeadersPoW(const std::vector& headers, const Consensus::Params& consensusParams, Peer& peer); @@ -610,7 +610,7 @@ private: /** Deal with state tracking and headers sync for peers that send the * occasional non-connecting header (this can happen due to BIP 130 headers * announcements for blocks interacting with the 2hr (MAX_FUTURE_BLOCK_TIME) rule). */ - void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector& headers); + void HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer, const std::vector& headers) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Return true if the headers connect to each other, false otherwise */ bool CheckHeadersAreContinuous(const std::vector& headers) const; /** Try to continue a low-work headers sync that has already begun. @@ -633,7 +633,7 @@ private: */ bool IsContinuationOfLowWorkHeadersSync(Peer& peer, CNode& pfrom, std::vector& headers) - EXCLUSIVE_LOCKS_REQUIRED(peer.m_headers_sync_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(peer.m_headers_sync_mutex, !m_headers_presync_mutex, g_msgproc_mutex); /** Check work on a headers chain to be processed, and if insufficient, * initiate our anti-DoS headers sync mechanism. * @@ -649,7 +649,7 @@ private: bool TryLowWorkHeadersSync(Peer& peer, CNode& pfrom, const CBlockIndex* chain_start_header, std::vector& headers) - EXCLUSIVE_LOCKS_REQUIRED(!peer.m_headers_sync_mutex, !m_peer_mutex, !m_headers_presync_mutex); + EXCLUSIVE_LOCKS_REQUIRED(!peer.m_headers_sync_mutex, !m_peer_mutex, !m_headers_presync_mutex, g_msgproc_mutex); /** Return true if the given header is an ancestor of * m_chainman.m_best_header or our current tip */ @@ -659,7 +659,7 @@ private: * We don't issue a getheaders message if we have a recent one outstanding. * This returns true if a getheaders is actually sent, and false otherwise. */ - bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer); + bool MaybeSendGetHeaders(CNode& pfrom, const CBlockLocator& locator, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Potentially fetch blocks from this peer upon receipt of a new headers tip */ void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex* pindexLast); /** Update peer state based on received headers message */ @@ -683,10 +683,10 @@ private: void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); /** Send `addr` messages on a regular schedule. */ - void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time); + void MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Send a single `sendheaders` message, after we have completed headers sync with a peer. */ - void MaybeSendSendHeaders(CNode& node, Peer& peer); + void MaybeSendSendHeaders(CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); /** Relay (gossip) an address to a few randomly chosen nodes. * @@ -695,10 +695,10 @@ private: * @param[in] fReachable Whether the address' network is reachable. We relay unreachable * addresses less. */ - void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex); /** Send `feefilter` message. */ - void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time); + void MaybeSendFeefilter(CNode& node, Peer& peer, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); const CChainParams& m_chainparams; CConnman& m_connman; @@ -1010,7 +1010,10 @@ private: * @return True if address relay is enabled with peer * False if address relay is disallowed */ - bool SetupAddressRelay(const CNode& node, Peer& peer); + bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + + void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); + void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); }; const CNodeState* PeerManagerImpl::State(NodeId pnode) const EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -1036,13 +1039,13 @@ static bool IsAddrCompatible(const Peer& peer, const CAddress& addr) return peer.m_wants_addrv2 || addr.IsAddrV1Compatible(); } -static void AddAddressKnown(Peer& peer, const CAddress& addr) +void PeerManagerImpl::AddAddressKnown(Peer& peer, const CAddress& addr) { assert(peer.m_addr_known); peer.m_addr_known->insert(addr.GetKey()); } -static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) +void PeerManagerImpl::PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& insecure_rand) { // Known checking here is only to save space from duplicates. // Before sending, we'll filter it again for known addresses that were @@ -5103,7 +5106,7 @@ void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::micros // Remove addr records that the peer already knows about, and add new // addrs to the m_addr_known filter on the same pass. - auto addr_already_known = [&peer](const CAddress& addr) { + auto addr_already_known = [&peer](const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) { bool ret = peer.m_addr_known->contains(addr.GetKey()); if (!ret) peer.m_addr_known->insert(addr.GetKey()); return ret; From 0ae7987f68211729d87c9255889f4d9d1aa6a37f Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Tue, 13 Sep 2022 12:27:20 +1000 Subject: [PATCH 4/5] net_processing: add thread safety annotations for PeerManagerImpl members accessed only via the msgproc thread --- src/net_processing.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index ce32a7d236b..32a9571e117 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -751,7 +751,7 @@ private: int nSyncStarted GUARDED_BY(cs_main) = 0; /** Hash of the last block we received via INV */ - uint256 m_last_block_inv_triggering_headers_sync{}; + uint256 m_last_block_inv_triggering_headers_sync GUARDED_BY(g_msgproc_mutex){}; /** * Sources of received blocks, saved to be able punish them when processing From d575a675cc884b1bebdb6197f2ef45c51788d4a3 Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Wed, 7 Sep 2022 14:38:18 +1000 Subject: [PATCH 5/5] net_processing: add thread safety annotation for m_highest_fast_announce --- src/net_processing.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 32a9571e117..10952d81117 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -863,7 +863,7 @@ private: std::atomic_bool m_headers_presync_should_signal{false}; /** Height of the highest block announced using BIP 152 high-bandwidth mode. */ - int m_highest_fast_announce{0}; + int m_highest_fast_announce GUARDED_BY(::cs_main){0}; /** Have we requested this block from a peer */ bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main);