From 897e342d6ec320c286753daef01d6eb9839e2c4d Mon Sep 17 00:00:00 2001 From: dergoegge Date: Tue, 14 Mar 2023 17:58:59 +0100 Subject: [PATCH] [net] Encapsulate CNode message polling --- src/net.cpp | 14 ++++++++++++++ src/net.h | 8 ++++++++ src/net_processing.cpp | 19 +++++++------------ 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 0a9600f9acc..80cccb1c74f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2812,6 +2812,20 @@ void CNode::MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) fPauseRecv = nProcessQueueSize > recv_flood_size; } +std::optional> CNode::PollMessage(size_t recv_flood_size) +{ + LOCK(cs_vProcessMsg); + if (vProcessMsg.empty()) return std::nullopt; + + std::list msgs; + // Just take one message + msgs.splice(msgs.begin(), vProcessMsg, vProcessMsg.begin()); + nProcessQueueSize -= msgs.front().m_raw_message_size; + fPauseRecv = nProcessQueueSize > recv_flood_size; + + return std::make_pair(std::move(msgs.front()), !vProcessMsg.empty()); +} + bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; diff --git a/src/net.h b/src/net.h index 8af100a26f0..a67b2af3ebf 100644 --- a/src/net.h +++ b/src/net.h @@ -426,6 +426,14 @@ public: void MarkReceivedMsgsForProcessing(unsigned int recv_flood_size) EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg); + /** Poll the next message from the processing queue of this connection. + * + * Returns std::nullopt if the processing queue is empty, or a pair + * consisting of the message and a bool that indicates if the processing + * queue has more entries. */ + std::optional> PollMessage(size_t recv_flood_size) + EXCLUSIVE_LOCKS_REQUIRED(!cs_vProcessMsg); + bool IsOutboundOrBlockRelayConn() const { switch (m_conn_type) { case ConnectionType::OUTBOUND_FULL_RELAY: diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 25c65c70903..1bff41514e9 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4854,8 +4854,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt { AssertLockHeld(g_msgproc_mutex); - bool fMoreWork = false; - PeerRef peer = GetPeerRef(pfrom->GetId()); if (peer == nullptr) return false; @@ -4883,17 +4881,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) return false; - std::list msgs; - { - LOCK(pfrom->cs_vProcessMsg); - if (pfrom->vProcessMsg.empty()) return false; - // Just take one message - msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); - pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size; - pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize(); - fMoreWork = !pfrom->vProcessMsg.empty(); + auto poll_result{pfrom->PollMessage(m_connman.GetReceiveFloodSize())}; + if (!poll_result) { + // No message to process + return false; } - CNetMessage& msg(msgs.front()); + + CNetMessage& msg{poll_result->first}; + bool fMoreWork = poll_result->second; TRACE6(net, inbound_message, pfrom->GetId(),