From 86cff8bf18f2c6344a25ad8b81cf366201a73c36 Mon Sep 17 00:00:00 2001 From: Greg Sanders Date: Tue, 9 May 2023 10:30:23 -0400 Subject: [PATCH 1/6] alias BlockDownloadMap for mapBlocksInFlight --- src/net_processing.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 0e3f7435c87..c2bdb3e29cb 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -899,7 +899,8 @@ private: */ void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - std::map::iterator> > mapBlocksInFlight GUARDED_BY(cs_main); + typedef std::map::iterator>> BlockDownloadMap; + BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_main); /** When our tip was last updated. */ std::atomic m_last_tip_update{0s}; @@ -1117,7 +1118,7 @@ std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::micros bool PeerManagerImpl::IsBlockRequested(const uint256& hash) { - return mapBlocksInFlight.find(hash) != mapBlocksInFlight.end(); + return mapBlocksInFlight.count(hash); } void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional from_peer) @@ -1161,7 +1162,7 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st assert(state != nullptr); // Short-circuit most stuff in case it is from the same node - std::map::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash); + BlockDownloadMap::iterator itInFlight = mapBlocksInFlight.find(hash); if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) { if (pit) { *pit = &itInFlight->second.second; @@ -4274,7 +4275,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, nodestate->m_last_block_announcement = GetTime(); } - std::map::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash()); + BlockDownloadMap::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash()); bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end(); if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here @@ -4433,7 +4434,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, { LOCK(cs_main); - std::map::iterator> >::iterator it = mapBlocksInFlight.find(resp.blockhash); + BlockDownloadMap::iterator it = mapBlocksInFlight.find(resp.blockhash); if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock || it->second.first != pfrom.GetId()) { LogPrint(BCLog::NET, "Peer %d sent us block transactions for block we weren't expecting\n", pfrom.GetId()); From a90595478dcf4e443cd15bbb822d485dc42bdb18 Mon Sep 17 00:00:00 2001 From: Greg Sanders Date: Tue, 9 May 2023 10:46:51 -0400 Subject: [PATCH 2/6] Remove nBlocksInFlight --- src/net_processing.cpp | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index c2bdb3e29cb..8138e8e9852 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -431,7 +431,6 @@ struct CNodeState { std::list vBlocksInFlight; //! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty. std::chrono::microseconds m_downloading_since{0us}; - int nBlocksInFlight{0}; //! Whether we consider this a preferred download peer. bool fPreferredDownload{false}; /** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */ @@ -1145,8 +1144,7 @@ void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optionalvBlocksInFlight.erase(list_it); - state->nBlocksInFlight--; - if (state->nBlocksInFlight == 0) { + if (state->vBlocksInFlight.empty()) { // Last validated block on the queue was received. m_peers_downloading_from--; } @@ -1175,8 +1173,7 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st std::list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), {&block, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); - state->nBlocksInFlight++; - if (state->nBlocksInFlight == 1) { + if (state->vBlocksInFlight.size() == 1) { // We're starting a block download (batch) from this peer. state->m_downloading_since = GetTime(); m_peers_downloading_from++; @@ -1520,7 +1517,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) m_txrequest.DisconnectedPeer(nodeid); if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid); m_num_preferred_download_peers -= state->fPreferredDownload; - m_peers_downloading_from -= (state->nBlocksInFlight != 0); + m_peers_downloading_from -= (!state->vBlocksInFlight.empty()); assert(m_peers_downloading_from >= 0); m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; assert(m_outbound_peers_with_protect_from_disconnect >= 0); @@ -2681,7 +2678,7 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c std::vector vGetData; // Download as much as possible, from earliest to latest. for (const CBlockIndex *pindex : reverse_iterate(vToFetch)) { - if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (nodestate->vBlocksInFlight.size() >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { // Can't download any more from this peer break; } @@ -4301,7 +4298,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // We want to be a bit conservative just to be extra careful about DoS // possibilities in compact block processing... if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) { - if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || + if ((!fAlreadyInFlight && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { std::list::iterator* queuedBlockIt = nullptr; if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) { @@ -5044,14 +5041,14 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // valid headers chain with at least as much work as our tip. CNodeState *node_state = State(pnode->GetId()); if (node_state == nullptr || - (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->nBlocksInFlight == 0)) { + (now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->vBlocksInFlight.empty())) { pnode->fDisconnect = true; LogPrint(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n", pnode->GetId(), count_seconds(pnode->m_last_block_time)); return true; } else { LogPrint(BCLog::NET, "keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), count_seconds(pnode->m_connected), node_state->nBlocksInFlight); + pnode->GetId(), count_seconds(pnode->m_connected), node_state->vBlocksInFlight.size()); } return false; }); @@ -5091,13 +5088,13 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now) // Also don't disconnect any peer we're trying to download a // block from. CNodeState &state = *State(pnode->GetId()); - if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.nBlocksInFlight == 0) { + if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.vBlocksInFlight.empty()) { LogPrint(BCLog::NET, "disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->GetId(), oldest_block_announcement); pnode->fDisconnect = true; return true; } else { LogPrint(BCLog::NET, "keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n", - pnode->GetId(), count_seconds(pnode->m_connected), state.nBlocksInFlight); + pnode->GetId(), count_seconds(pnode->m_connected), state.vBlocksInFlight.size()); return false; } }); @@ -5817,10 +5814,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Message: getdata (blocks) // std::vector vGetData; - if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector vToDownload; NodeId staller = -1; - FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller); + FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.vBlocksInFlight.size(), vToDownload, staller); for (const CBlockIndex *pindex : vToDownload) { uint32_t nFetchFlags = GetFetchFlags(*peer); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); @@ -5828,7 +5825,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), pindex->nHeight, pto->GetId()); } - if (state.nBlocksInFlight == 0 && staller != -1) { + if (state.vBlocksInFlight.empty() && staller != -1) { if (State(staller)->m_stalling_since == 0us) { State(staller)->m_stalling_since = current_time; LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); From cce96182ba2457335868c65dc16b081c3dee32ee Mon Sep 17 00:00:00 2001 From: Greg Sanders Date: Tue, 9 May 2023 12:56:49 -0400 Subject: [PATCH 3/6] Convert mapBlocksInFlight to a multimap --- src/net_processing.cpp | 114 ++++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 40 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 8138e8e9852..81480148151 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -898,7 +898,8 @@ private: */ void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main); - typedef std::map::iterator>> BlockDownloadMap; + /* Multimap used to preserve insertion order */ + typedef std::multimap::iterator>> BlockDownloadMap; BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_main); /** When our tip was last updated. */ @@ -1122,34 +1123,40 @@ bool PeerManagerImpl::IsBlockRequested(const uint256& hash) void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional from_peer) { - auto it = mapBlocksInFlight.find(hash); - if (it == mapBlocksInFlight.end()) { - // Block was not requested + auto range = mapBlocksInFlight.equal_range(hash); + if (range.first == range.second) { + // Block was not requested from any peer return; } - auto [node_id, list_it] = it->second; + // Currently we don't request more than one peer for same block + Assume(mapBlocksInFlight.count(hash) == 1); - if (from_peer && node_id != *from_peer) { - // Block was requested by another peer - return; + while (range.first != range.second) { + auto [node_id, list_it] = range.first->second; + + if (from_peer && *from_peer != node_id) { + range.first++; + continue; + } + + CNodeState *state = State(node_id); + assert(state != nullptr); + + if (state->vBlocksInFlight.begin() == list_it) { + // First block on the queue was received, update the start download time for the next one + state->m_downloading_since = std::max(state->m_downloading_since, GetTime()); + } + state->vBlocksInFlight.erase(list_it); + + if (state->vBlocksInFlight.empty()) { + // Last validated block on the queue for this peer was received. + m_peers_downloading_from--; + } + state->m_stalling_since = 0us; + + range.first = mapBlocksInFlight.erase(range.first); } - - CNodeState *state = State(node_id); - assert(state != nullptr); - - if (state->vBlocksInFlight.begin() == list_it) { - // First block on the queue was received, update the start download time for the next one - state->m_downloading_since = std::max(state->m_downloading_since, GetTime()); - } - state->vBlocksInFlight.erase(list_it); - - if (state->vBlocksInFlight.empty()) { - // Last validated block on the queue was received. - m_peers_downloading_from--; - } - state->m_stalling_since = 0us; - mapBlocksInFlight.erase(it); } bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list::iterator** pit) @@ -1160,12 +1167,13 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st assert(state != nullptr); // Short-circuit most stuff in case it is from the same node - BlockDownloadMap::iterator itInFlight = mapBlocksInFlight.find(hash); - if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) { - if (pit) { - *pit = &itInFlight->second.second; + for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { + if (range.first->second.first == nodeid) { + if (pit) { + *pit = &range.first->second.second; + } + return false; } - return false; } // Make sure it's not listed somewhere already. @@ -1178,7 +1186,7 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st state->m_downloading_since = GetTime(); m_peers_downloading_from++; } - itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first; + auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))); if (pit) { *pit = &itInFlight->second.second; } @@ -1381,7 +1389,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co } } else if (waitingfor == -1) { // This is the first already-in-flight block. - waitingfor = mapBlocksInFlight[pindex->GetBlockHash()].first; + waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first; } } } @@ -1511,7 +1519,15 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) nSyncStarted--; for (const QueuedBlock& entry : state->vBlocksInFlight) { - mapBlocksInFlight.erase(entry.pindex->GetBlockHash()); + auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash()); + while (range.first != range.second) { + auto [node_id, list_it] = range.first->second; + if (node_id != nodeid) { + range.first++; + } else { + range.first = mapBlocksInFlight.erase(range.first); + } + } } m_orphanage.EraseForPeer(nodeid); m_txrequest.DisconnectedPeer(nodeid); @@ -4272,12 +4288,21 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, nodestate->m_last_block_announcement = GetTime(); } - BlockDownloadMap::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash()); - bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end(); - if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here return; + auto range_flight = mapBlocksInFlight.equal_range(pindex->GetBlockHash()); + bool fAlreadyInFlight = range_flight.first != range_flight.second; + bool in_flight_same_peer{false}; + + while (range_flight.first != range_flight.second) { + if (range_flight.first->second.first == pfrom.GetId()) { + in_flight_same_peer = true; + break; + } + range_flight.first++; + } + if (pindex->nChainWork <= m_chainman.ActiveChain().Tip()->nChainWork || // We know something better pindex->nTx != 0) { // We had this block at some point, but pruned it if (fAlreadyInFlight) { @@ -4299,7 +4324,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // possibilities in compact block processing... if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) { if ((!fAlreadyInFlight && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || - (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { + in_flight_same_peer) { std::list::iterator* queuedBlockIt = nullptr; if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) @@ -4431,14 +4456,23 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, { LOCK(cs_main); - BlockDownloadMap::iterator it = mapBlocksInFlight.find(resp.blockhash); - if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock || - it->second.first != pfrom.GetId()) { + bool expected_blocktxn = false; + auto range_flight = mapBlocksInFlight.equal_range(resp.blockhash); + while (range_flight.first != range_flight.second) { + auto [node_id, block_it] = range_flight.first->second; + if (node_id == pfrom.GetId() && block_it->partialBlock) { + expected_blocktxn = true; + break; + } + range_flight.first++; + } + + if (!expected_blocktxn) { LogPrint(BCLog::NET, "Peer %d sent us block transactions for block we weren't expecting\n", pfrom.GetId()); return; } - PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock; + PartiallyDownloadedBlock& partialBlock = *range_flight.first->second.second->partialBlock; ReadStatus status = partialBlock.FillBlock(*pblock, resp.txn); if (status == READ_STATUS_INVALID) { RemoveBlockRequest(resp.blockhash, pfrom.GetId()); // Reset in-flight state in case Misbehaving does not result in a disconnect From 13f9b20b4cb2f3f26e81184a77e9cf1f626d4f57 Mon Sep 17 00:00:00 2001 From: Greg Sanders Date: Tue, 9 May 2023 13:35:02 -0400 Subject: [PATCH 4/6] Only request full blocks from the peer we thought had the block in-flight This is a change in behavior so that if for some reason we request a block from a peer, we don't allow an unsolicited CMPCT_BLOCK announcement for that same block to cause a request for a full block from the uninvited peer (as some type of request is already outstanding from the original peer) --- src/net_processing.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 81480148151..78ff9c8cb9a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4305,7 +4305,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (pindex->nChainWork <= m_chainman.ActiveChain().Tip()->nChainWork || // We know something better pindex->nTx != 0) { // We had this block at some point, but pruned it - if (fAlreadyInFlight) { + if (in_flight_same_peer) { // We requested this block for some reason, but our mempool will probably be useless // so we just grab the block via normal getdata std::vector vInv(1); @@ -4384,7 +4384,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } } else { - if (fAlreadyInFlight) { + if (in_flight_same_peer) { // We requested this block, but its far into the future, so our // mempool will probably be useless - request the block normally std::vector vInv(1); From 03423f8bd12b95a06a4a9d8377e781625dd38aae Mon Sep 17 00:00:00 2001 From: Greg Sanders Date: Tue, 16 May 2023 15:36:38 -0400 Subject: [PATCH 5/6] Support up to 3 parallel compact block txn fetchings A single outbound slot is required, so if the first two slots are taken by inbound in-flights, the node will reject additional unless they are coming from outbound. This means in the case where a fast sybil peer is attempting to stall out a node, a single high bandwidth outbound peer can mitigate the attack. --- src/net.h | 2 + src/net_processing.cpp | 122 ++++++++++++++++++++++++++++------------- src/net_processing.h | 2 + src/rpc/blockchain.cpp | 2 +- 4 files changed, 90 insertions(+), 38 deletions(-) diff --git a/src/net.h b/src/net.h index 908b16f35e9..83fe0427d43 100644 --- a/src/net.h +++ b/src/net.h @@ -200,7 +200,9 @@ public: int nVersion; std::string cleanSubVer; bool fInbound; + // We requested high bandwidth connection to peer bool m_bip152_highbandwidth_to; + // Peer requested high bandwidth connection bool m_bip152_highbandwidth_from; int m_starting_height; uint64_t nSendBytes; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 78ff9c8cb9a..f08e771f63a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -876,6 +876,9 @@ private: /** Have we requested this block from a peer */ bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Have we requested this block from an outbound peer */ + bool IsBlockRequestedFromOutbound(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Remove this block from our tracked requested blocks. Called if: * - the block has been received from a peer * - the request for the block has timed out @@ -1121,6 +1124,17 @@ bool PeerManagerImpl::IsBlockRequested(const uint256& hash) return mapBlocksInFlight.count(hash); } +bool PeerManagerImpl::IsBlockRequestedFromOutbound(const uint256& hash) +{ + for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { + auto [nodeid, block_it] = range.first->second; + CNodeState& nodestate = *Assert(State(nodeid)); + if (!nodestate.m_is_inbound) return true; + } + + return false; +} + void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional from_peer) { auto range = mapBlocksInFlight.equal_range(hash); @@ -1129,8 +1143,8 @@ void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optionalsecond; @@ -1140,20 +1154,19 @@ void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optionalvBlocksInFlight.begin() == list_it) { + if (state.vBlocksInFlight.begin() == list_it) { // First block on the queue was received, update the start download time for the next one - state->m_downloading_since = std::max(state->m_downloading_since, GetTime()); + state.m_downloading_since = std::max(state.m_downloading_since, GetTime()); } - state->vBlocksInFlight.erase(list_it); + state.vBlocksInFlight.erase(list_it); - if (state->vBlocksInFlight.empty()) { + if (state.vBlocksInFlight.empty()) { // Last validated block on the queue for this peer was received. m_peers_downloading_from--; } - state->m_stalling_since = 0us; + state.m_stalling_since = 0us; range.first = mapBlocksInFlight.erase(range.first); } @@ -1166,6 +1179,8 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st CNodeState *state = State(nodeid); assert(state != nullptr); + Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK); + // Short-circuit most stuff in case it is from the same node for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) { if (range.first->second.first == nodeid) { @@ -1176,8 +1191,8 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st } } - // Make sure it's not listed somewhere already. - RemoveBlockRequest(hash, std::nullopt); + // Make sure it's not being fetched already from same peer. + RemoveBlockRequest(hash, nodeid); std::list::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), {&block, std::unique_ptr(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)}); @@ -1774,11 +1789,10 @@ std::optional PeerManagerImpl::FetchBlock(NodeId peer_id, const CBl LOCK(cs_main); - // Mark block as in-flight unless it already is (for this peer). - // If the peer does not send us a block, vBlocksInFlight remains non-empty, - // causing us to timeout and disconnect. - // If a block was already in-flight for a different peer, its BLOCKTXN - // response will be dropped. + // Forget about all prior requests + RemoveBlockRequest(block_index.GetBlockHash(), std::nullopt); + + // Mark block as in-flight if (!BlockRequested(peer_id, block_index)) return "Already requested from this peer"; // Construct message to request the block @@ -4292,12 +4306,15 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; auto range_flight = mapBlocksInFlight.equal_range(pindex->GetBlockHash()); - bool fAlreadyInFlight = range_flight.first != range_flight.second; - bool in_flight_same_peer{false}; + size_t already_in_flight = std::distance(range_flight.first, range_flight.second); + bool requested_block_from_this_peer{false}; + + // Multimap ensures ordering of outstanding requests. It's either empty or first in line. + bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId()); while (range_flight.first != range_flight.second) { if (range_flight.first->second.first == pfrom.GetId()) { - in_flight_same_peer = true; + requested_block_from_this_peer = true; break; } range_flight.first++; @@ -4305,7 +4322,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (pindex->nChainWork <= m_chainman.ActiveChain().Tip()->nChainWork || // We know something better pindex->nTx != 0) { // We had this block at some point, but pruned it - if (in_flight_same_peer) { + if (requested_block_from_this_peer) { // We requested this block for some reason, but our mempool will probably be useless // so we just grab the block via normal getdata std::vector vInv(1); @@ -4316,15 +4333,15 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } // If we're not close to tip yet, give up and let parallel block fetch work its magic - if (!fAlreadyInFlight && !CanDirectFetch()) { + if (!already_in_flight && !CanDirectFetch()) { return; } // We want to be a bit conservative just to be extra careful about DoS // possibilities in compact block processing... if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) { - if ((!fAlreadyInFlight && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || - in_flight_same_peer) { + if ((already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || + requested_block_from_this_peer) { std::list::iterator* queuedBlockIt = nullptr; if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) @@ -4343,11 +4360,16 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, Misbehaving(*peer, 100, "invalid compact block"); return; } else if (status == READ_STATUS_FAILED) { - // Duplicate txindexes, the block is now in-flight, so just request it - std::vector vInv(1); - vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash); - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); - return; + if (first_in_flight) { + // Duplicate txindexes, the block is now in-flight, so just request it + std::vector vInv(1); + vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + return; + } else { + // Give up for this peer and wait for other peer(s) + RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId()); + } } BlockTransactionsRequest req; @@ -4361,9 +4383,24 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, txn.blockhash = blockhash; blockTxnMsg << txn; fProcessBLOCKTXN = true; - } else { + } else if (first_in_flight) { + // We will try to round-trip any compact blocks we get on failure, + // as long as it's first... req.blockhash = pindex->GetBlockHash(); m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); + } else if (pfrom.m_bip152_highbandwidth_to && + (!pfrom.IsInboundConn() || + IsBlockRequestedFromOutbound(blockhash) || + already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK - 1)) { + // ... or it's a hb relay peer and: + // - peer is outbound, or + // - we already have an outbound attempt in flight(so we'll take what we can get), or + // - it's not the final parallel download slot (which we may reserve for first outbound) + req.blockhash = pindex->GetBlockHash(); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); + } else { + // Give up for this peer and wait for other peer(s) + RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId()); } } else { // This block is either already in flight from a different @@ -4384,7 +4421,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } } else { - if (in_flight_same_peer) { + if (requested_block_from_this_peer) { // We requested this block, but its far into the future, so our // mempool will probably be useless - request the block normally std::vector vInv(1); @@ -4456,18 +4493,23 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, { LOCK(cs_main); - bool expected_blocktxn = false; auto range_flight = mapBlocksInFlight.equal_range(resp.blockhash); + size_t already_in_flight = std::distance(range_flight.first, range_flight.second); + bool requested_block_from_this_peer{false}; + + // Multimap ensures ordering of outstanding requests. It's either empty or first in line. + bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId()); + while (range_flight.first != range_flight.second) { auto [node_id, block_it] = range_flight.first->second; if (node_id == pfrom.GetId() && block_it->partialBlock) { - expected_blocktxn = true; + requested_block_from_this_peer = true; break; } range_flight.first++; } - if (!expected_blocktxn) { + if (!requested_block_from_this_peer) { LogPrint(BCLog::NET, "Peer %d sent us block transactions for block we weren't expecting\n", pfrom.GetId()); return; } @@ -4479,10 +4521,16 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, Misbehaving(*peer, 100, "invalid compact block/non-matching block transactions"); return; } else if (status == READ_STATUS_FAILED) { - // Might have collided, fall back to getdata now :( - std::vector invs; - invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(*peer), resp.blockhash)); - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); + if (first_in_flight) { + // Might have collided, fall back to getdata now :( + std::vector invs; + invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(*peer), resp.blockhash)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); + } else { + RemoveBlockRequest(resp.blockhash, pfrom.GetId()); + LogPrint(BCLog::NET, "Peer %d sent us a compact block but it failed to reconstruct, waiting on first download to complete\n", pfrom.GetId()); + return; + } } else { // Block is either okay, or possibly we received // READ_STATUS_CHECKBLOCK_FAILED. diff --git a/src/net_processing.h b/src/net_processing.h index af9a02139b1..deebb24c94e 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -22,6 +22,8 @@ static const bool DEFAULT_PEERBLOOMFILTERS = false; static const bool DEFAULT_PEERBLOCKFILTERS = false; /** Threshold for marking a node to be discouraged, e.g. disconnected and added to the discouragement filter. */ static const int DISCOURAGEMENT_THRESHOLD{100}; +/** Maximum number of outstanding CMPCTBLOCK requests for the same block. */ +static const unsigned int MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK = 3; struct CNodeStateStats { int nSyncHeight = -1; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 72866532d2a..4c25dbc3451 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -428,7 +428,7 @@ static RPCHelpMan getblockfrompeer() "getblockfrompeer", "Attempt to fetch block from a given peer.\n\n" "We must have the header for this block, e.g. using submitheader.\n" - "Subsequent calls for the same block and a new peer will cause the response from the previous peer to be ignored.\n" + "Subsequent calls for the same block may cause the response from the previous peer to be ignored.\n" "Peers generally ignore requests for a stale block that they never fully verified, or one that is more than a month old.\n" "When a peer does not respond with a block, we will disconnect.\n" "Note: The block could be re-pruned as soon as it is received.\n\n" From d7f359b35e8b1e9acc4d397de262cd9ba9bbcb83 Mon Sep 17 00:00:00 2001 From: Greg Sanders Date: Tue, 16 May 2023 15:37:44 -0400 Subject: [PATCH 6/6] Add tests for parallel compact block downloads --- test/functional/p2p_compactblocks.py | 88 +++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 3 deletions(-) diff --git a/test/functional/p2p_compactblocks.py b/test/functional/p2p_compactblocks.py index 23eeea50bc6..d6c06fdeedd 100755 --- a/test/functional/p2p_compactblocks.py +++ b/test/functional/p2p_compactblocks.py @@ -105,6 +105,10 @@ class TestP2PConn(P2PInterface): self.last_message.pop("headers", None) self.last_message.pop("cmpctblock", None) + def clear_getblocktxn(self): + with p2p_lock: + self.last_message.pop("getblocktxn", None) + def get_headers(self, locator, hashstop): msg = msg_getheaders() msg.locator.vHave = locator @@ -745,7 +749,7 @@ class CompactBlocksTest(BitcoinTestFramework): peer.get_headers(locator=[int(tip, 16)], hashstop=0) peer.send_and_ping(msg_sendcmpct(announce=True, version=2)) - def test_compactblock_reconstruction_multiple_peers(self, stalling_peer, delivery_peer): + def test_compactblock_reconstruction_stalling_peer(self, stalling_peer, delivery_peer): node = self.nodes[0] assert len(self.utxos) @@ -823,12 +827,85 @@ class CompactBlocksTest(BitcoinTestFramework): hb_test_node.send_and_ping(msg_sendcmpct(announce=False, version=2)) assert_highbandwidth_states(self.nodes[0], hb_to=True, hb_from=False) + def test_compactblock_reconstruction_parallel_reconstruction(self, stalling_peer, delivery_peer, inbound_peer, outbound_peer): + """ All p2p connections are inbound except outbound_peer. We test that ultimate parallel slot + can only be taken by an outbound node unless prior attempts were done by an outbound + """ + node = self.nodes[0] + assert len(self.utxos) + + def announce_cmpct_block(node, peer, txn_count): + utxo = self.utxos.pop(0) + block = self.build_block_with_transactions(node, utxo, txn_count) + + cmpct_block = HeaderAndShortIDs() + cmpct_block.initialize_from_block(block) + msg = msg_cmpctblock(cmpct_block.to_p2p()) + peer.send_and_ping(msg) + with p2p_lock: + assert "getblocktxn" in peer.last_message + return block, cmpct_block + + for name, peer in [("delivery", delivery_peer), ("inbound", inbound_peer), ("outbound", outbound_peer)]: + self.log.info(f"Setting {name} as high bandwidth peer") + block, cmpct_block = announce_cmpct_block(node, peer, 1) + msg = msg_blocktxn() + msg.block_transactions.blockhash = block.sha256 + msg.block_transactions.transactions = block.vtx[1:] + peer.send_and_ping(msg) + assert_equal(int(node.getbestblockhash(), 16), block.sha256) + peer.clear_getblocktxn() + + # Test the simple parallel download case... + for num_missing in [1, 5, 20]: + + # Remaining low-bandwidth peer is stalling_peer, who announces first + assert_equal([peer['bip152_hb_to'] for peer in node.getpeerinfo()], [False, True, True, True]) + + block, cmpct_block = announce_cmpct_block(node, stalling_peer, num_missing) + + delivery_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p())) + with p2p_lock: + # The second peer to announce should still get a getblocktxn + assert "getblocktxn" in delivery_peer.last_message + assert int(node.getbestblockhash(), 16) != block.sha256 + + inbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p())) + with p2p_lock: + # The third inbound peer to announce should *not* get a getblocktxn + assert "getblocktxn" not in inbound_peer.last_message + assert int(node.getbestblockhash(), 16) != block.sha256 + + outbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p())) + with p2p_lock: + # The third peer to announce should get a getblocktxn if outbound + assert "getblocktxn" in outbound_peer.last_message + assert int(node.getbestblockhash(), 16) != block.sha256 + + # Second peer completes the compact block first + msg = msg_blocktxn() + msg.block_transactions.blockhash = block.sha256 + msg.block_transactions.transactions = block.vtx[1:] + delivery_peer.send_and_ping(msg) + assert_equal(int(node.getbestblockhash(), 16), block.sha256) + + # Nothing bad should happen if we get a late fill from the first peer... + stalling_peer.send_and_ping(msg) + self.utxos.append([block.vtx[-1].sha256, 0, block.vtx[-1].vout[0].nValue]) + + delivery_peer.clear_getblocktxn() + inbound_peer.clear_getblocktxn() + outbound_peer.clear_getblocktxn() + + def run_test(self): self.wallet = MiniWallet(self.nodes[0]) # Setup the p2p connections self.segwit_node = self.nodes[0].add_p2p_connection(TestP2PConn()) self.additional_segwit_node = self.nodes[0].add_p2p_connection(TestP2PConn()) + self.onemore_inbound_node = self.nodes[0].add_p2p_connection(TestP2PConn()) + self.outbound_node = self.nodes[0].add_outbound_p2p_connection(TestP2PConn(), p2p_idx=3, connection_type="outbound-full-relay") # We will need UTXOs to construct transactions in later tests. self.make_utxos() @@ -838,6 +915,8 @@ class CompactBlocksTest(BitcoinTestFramework): self.log.info("Testing SENDCMPCT p2p message... ") self.test_sendcmpct(self.segwit_node) self.test_sendcmpct(self.additional_segwit_node) + self.test_sendcmpct(self.onemore_inbound_node) + self.test_sendcmpct(self.outbound_node) self.log.info("Testing compactblock construction...") self.test_compactblock_construction(self.segwit_node) @@ -860,8 +939,11 @@ class CompactBlocksTest(BitcoinTestFramework): self.log.info("Testing handling of incorrect blocktxn responses...") self.test_incorrect_blocktxn_response(self.segwit_node) - self.log.info("Testing reconstructing compact blocks from all peers...") - self.test_compactblock_reconstruction_multiple_peers(self.segwit_node, self.additional_segwit_node) + self.log.info("Testing reconstructing compact blocks with a stalling peer...") + self.test_compactblock_reconstruction_stalling_peer(self.segwit_node, self.additional_segwit_node) + + self.log.info("Testing reconstructing compact blocks from multiple peers...") + self.test_compactblock_reconstruction_parallel_reconstruction(stalling_peer=self.segwit_node, inbound_peer=self.onemore_inbound_node, delivery_peer=self.additional_segwit_node, outbound_peer=self.outbound_node) # Test that if we submitblock to node1, we'll get a compact block # announcement to all peers.