Merge bitcoin/bitcoin#26551: p2p: Track orphans by who provided them

c58c249a5b694c88122589fedbef4e2f13f08bb4 net_processing: indicate more work to do when orphans are ready to reconsider (Anthony Towns)
ecb0a3e4259b81d6bb74d59a58eb65552c17d8d8 net_processing: Don't process tx after processing orphans (Anthony Towns)
c5837757068bf8ea3e5b6fdad82f69d1deb81545 net_processing: only process orphans before messages (Anthony Towns)
be2304676bedcd15debcdc694549fdd2b255ba62 txorphange: Drop redundant originator arg from GetTxToReconsider (Anthony Towns)
a4fe09973aa82210b98dcb4e4e9f11ef59780f9b txorphanage: index workset by originating peer (Anthony Towns)

Pull request description:

  We currently process orphans by assigning them to the peer that provided a missing parent; instead assign them to the peer that provided the orphan in the first place. This prevents a peer from being able to marginally delay another peer's transactions and also simplifies the internal API slightly. Because we're now associating orphan processing with the peer that provided the orphan originally, we no longer process orphans immediately after receiving the parent, but defer until a future call to `ProcessMessage`.

  Based on #26295

ACKs for top commit:
  naumenkogs:
    utACK c58c249a5b694c88122589fedbef4e2f13f08bb4
  glozow:
    ACK c58c249a5b694c88122589fedbef4e2f13f08bb4
  mzumsande:
    Code Review ACK c58c249a5b694c88122589fedbef4e2f13f08bb4

Tree-SHA512: 3186c346f21e60440266a2a80a9d23d7b96071414e14b2b3bfe50457c04c18b1eab109c3d8c2a7726a6b10a2eda1f0512510a52c102da112820a26f5d96f12de
This commit is contained in:
glozow 2023-01-26 10:36:13 +00:00
commit 77a36033b5
No known key found for this signature in database
GPG Key ID: BA03F4DBE0C63FB4
4 changed files with 57 additions and 49 deletions

View File

@ -584,14 +584,17 @@ private:
/** /**
* Reconsider orphan transactions after a parent has been accepted to the mempool. * Reconsider orphan transactions after a parent has been accepted to the mempool.
* *
* @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only one * @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only
* orphan will be reconsidered on each call of this function. This set * one orphan will be reconsidered on each call of this function. If an
* may be added to if accepting an orphan causes its children to be * accepted orphan has orphaned children, those will need to be
* reconsidered. * reconsidered, creating more work, possibly for other peers.
* @return True if there are still orphans in this peer's work set. * @return True if meaningful work was done (an orphan was accepted/rejected).
* If no meaningful work was done, then the work set for this peer
* will be empty.
*/ */
bool ProcessOrphanTx(Peer& peer) bool ProcessOrphanTx(Peer& peer)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main); EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
/** Process a single headers message from a peer. /** Process a single headers message from a peer.
* *
* @param[in] pfrom CNode of the peer * @param[in] pfrom CNode of the peer
@ -2897,13 +2900,11 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer,
bool PeerManagerImpl::ProcessOrphanTx(Peer& peer) bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
{ {
AssertLockHeld(g_msgproc_mutex); AssertLockHeld(g_msgproc_mutex);
AssertLockHeld(cs_main); LOCK(cs_main);
CTransactionRef porphanTx = nullptr; CTransactionRef porphanTx = nullptr;
NodeId from_peer = -1;
bool more = false;
while (CTransactionRef porphanTx = m_orphanage.GetTxToReconsider(peer.m_id, from_peer, more)) { while (CTransactionRef porphanTx = m_orphanage.GetTxToReconsider(peer.m_id)) {
const MempoolAcceptResult result = m_chainman.ProcessTransaction(porphanTx); const MempoolAcceptResult result = m_chainman.ProcessTransaction(porphanTx);
const TxValidationState& state = result.m_state; const TxValidationState& state = result.m_state;
const uint256& orphanHash = porphanTx->GetHash(); const uint256& orphanHash = porphanTx->GetHash();
@ -2911,20 +2912,20 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanHash, porphanTx->GetWitnessHash()); RelayTransaction(orphanHash, porphanTx->GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(*porphanTx, peer.m_id); m_orphanage.AddChildrenToWorkSet(*porphanTx);
m_orphanage.EraseTx(orphanHash); m_orphanage.EraseTx(orphanHash);
for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) { for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) {
AddToCompactExtraTransactions(removedTx); AddToCompactExtraTransactions(removedTx);
} }
break; return true;
} else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) { } else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) {
if (state.IsInvalid()) { if (state.IsInvalid()) {
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s from peer=%d. %s\n", LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s from peer=%d. %s\n",
orphanHash.ToString(), orphanHash.ToString(),
from_peer, peer.m_id,
state.ToString()); state.ToString());
// Maybe punish peer that gave us an invalid orphan tx // Maybe punish peer that gave us an invalid orphan tx
MaybePunishNodeForTx(from_peer, state); MaybePunishNodeForTx(peer.m_id, state);
} }
// Has inputs but not accepted to mempool // Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee // Probably non-standard or insufficient fee
@ -2959,11 +2960,11 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
} }
} }
m_orphanage.EraseTx(orphanHash); m_orphanage.EraseTx(orphanHash);
break; return true;
} }
} }
return more; return false;
} }
bool PeerManagerImpl::PrepareBlockFilterRequest(CNode& node, Peer& peer, bool PeerManagerImpl::PrepareBlockFilterRequest(CNode& node, Peer& peer,
@ -4033,7 +4034,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
m_txrequest.ForgetTxHash(tx.GetHash()); m_txrequest.ForgetTxHash(tx.GetHash());
m_txrequest.ForgetTxHash(tx.GetWitnessHash()); m_txrequest.ForgetTxHash(tx.GetWitnessHash());
RelayTransaction(tx.GetHash(), tx.GetWitnessHash()); RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(tx, peer->m_id); m_orphanage.AddChildrenToWorkSet(tx);
pfrom.m_last_tx_time = GetTime<std::chrono::seconds>(); pfrom.m_last_tx_time = GetTime<std::chrono::seconds>();
@ -4045,9 +4046,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) { for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) {
AddToCompactExtraTransactions(removedTx); AddToCompactExtraTransactions(removedTx);
} }
// Recursively process any orphan transactions that depended on this one
ProcessOrphanTx(*peer);
} }
else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS) else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS)
{ {
@ -4856,16 +4854,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
} }
} }
bool has_more_orphans; const bool processed_orphan = ProcessOrphanTx(*peer);
{
LOCK(cs_main);
has_more_orphans = ProcessOrphanTx(*peer);
}
if (pfrom->fDisconnect) if (pfrom->fDisconnect)
return false; return false;
if (has_more_orphans) return true; if (processed_orphan) return true;
// this maintains the order of responses // this maintains the order of responses
// and prevents m_getdata_requests to grow unbounded // and prevents m_getdata_requests to grow unbounded
@ -4911,6 +4905,12 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
LOCK(peer->m_getdata_requests_mutex); LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) fMoreWork = true; if (!peer->m_getdata_requests.empty()) fMoreWork = true;
} }
// Does this peer has an orphan ready to reconsider?
// (Note: we may have provided a parent for an orphan provided
// by another peer that was already processed; in that case,
// the extra work may not be noticed, possibly resulting in an
// unnecessary 100ms delay)
if (m_orphanage.HaveTxToReconsider(peer->m_id)) fMoreWork = true;
} catch (const std::exception& e) { } catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name()); LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name());
} catch (...) { } catch (...) {

View File

@ -85,16 +85,12 @@ FUZZ_TARGET_INIT(txorphan, initialize_orphanage)
CallOneOf( CallOneOf(
fuzzed_data_provider, fuzzed_data_provider,
[&] { [&] {
orphanage.AddChildrenToWorkSet(*tx, peer_id); orphanage.AddChildrenToWorkSet(*tx);
}, },
[&] { [&] {
{ {
NodeId originator; CTransactionRef ref = orphanage.GetTxToReconsider(peer_id);
bool more = true; if (ref) {
CTransactionRef ref = orphanage.GetTxToReconsider(peer_id, originator, more);
if (!ref) {
Assert(!more);
} else {
bool have_tx = orphanage.HaveTx(GenTxid::Txid(ref->GetHash())) || orphanage.HaveTx(GenTxid::Wtxid(ref->GetHash())); bool have_tx = orphanage.HaveTx(GenTxid::Txid(ref->GetHash())) || orphanage.HaveTx(GenTxid::Wtxid(ref->GetHash()));
Assert(have_tx); Assert(have_tx);
} }

View File

@ -145,17 +145,19 @@ void TxOrphanage::LimitOrphans(unsigned int max_orphans)
if (nEvicted > 0) LogPrint(BCLog::MEMPOOL, "orphanage overflow, removed %u tx\n", nEvicted); if (nEvicted > 0) LogPrint(BCLog::MEMPOOL, "orphanage overflow, removed %u tx\n", nEvicted);
} }
void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx, NodeId peer) void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx)
{ {
LOCK(m_mutex); LOCK(m_mutex);
// Get this peer's work set, emplacing an empty set it didn't exist
std::set<uint256>& orphan_work_set = m_peer_work_set.try_emplace(peer).first->second;
for (unsigned int i = 0; i < tx.vout.size(); i++) { for (unsigned int i = 0; i < tx.vout.size(); i++) {
const auto it_by_prev = m_outpoint_to_orphan_it.find(COutPoint(tx.GetHash(), i)); const auto it_by_prev = m_outpoint_to_orphan_it.find(COutPoint(tx.GetHash(), i));
if (it_by_prev != m_outpoint_to_orphan_it.end()) { if (it_by_prev != m_outpoint_to_orphan_it.end()) {
for (const auto& elem : it_by_prev->second) { for (const auto& elem : it_by_prev->second) {
// Get this source peer's work set, emplacing an empty set if it didn't exist
// (note: if this peer wasn't still connected, we would have removed the orphan tx already)
std::set<uint256>& orphan_work_set = m_peer_work_set.try_emplace(elem->second.fromPeer).first->second;
// Add this tx to the work set
orphan_work_set.insert(elem->first); orphan_work_set.insert(elem->first);
} }
} }
@ -172,7 +174,7 @@ bool TxOrphanage::HaveTx(const GenTxid& gtxid) const
} }
} }
CTransactionRef TxOrphanage::GetTxToReconsider(NodeId peer, NodeId& originator, bool& more) CTransactionRef TxOrphanage::GetTxToReconsider(NodeId peer)
{ {
LOCK(m_mutex); LOCK(m_mutex);
@ -185,16 +187,25 @@ CTransactionRef TxOrphanage::GetTxToReconsider(NodeId peer, NodeId& originator,
const auto orphan_it = m_orphans.find(txid); const auto orphan_it = m_orphans.find(txid);
if (orphan_it != m_orphans.end()) { if (orphan_it != m_orphans.end()) {
more = !work_set.empty();
originator = orphan_it->second.fromPeer;
return orphan_it->second.tx; return orphan_it->second.tx;
} }
} }
} }
more = false;
return nullptr; return nullptr;
} }
bool TxOrphanage::HaveTxToReconsider(NodeId peer)
{
LOCK(m_mutex);
auto work_set_it = m_peer_work_set.find(peer);
if (work_set_it != m_peer_work_set.end()) {
auto& work_set = work_set_it->second;
return !work_set.empty();
}
return false;
}
void TxOrphanage::EraseForBlock(const CBlock& block) void TxOrphanage::EraseForBlock(const CBlock& block)
{ {
LOCK(m_mutex); LOCK(m_mutex);

View File

@ -27,13 +27,11 @@ public:
bool HaveTx(const GenTxid& gtxid) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); bool HaveTx(const GenTxid& gtxid) const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/** Extract a transaction from a peer's work set /** Extract a transaction from a peer's work set
* Returns nullptr and sets more to false if there are no transactions * Returns nullptr if there are no transactions to work on.
* to work on. Otherwise returns the transaction reference, removes * Otherwise returns the transaction reference, and removes
* the transaction from the work set, and populates its arguments with * it from the work set.
* the originating peer, and whether there are more orphans for this peer
* to work on after this tx.
*/ */
CTransactionRef GetTxToReconsider(NodeId peer, NodeId& originator, bool& more) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); CTransactionRef GetTxToReconsider(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/** Erase an orphan by txid */ /** Erase an orphan by txid */
int EraseTx(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); int EraseTx(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
@ -47,8 +45,11 @@ public:
/** Limit the orphanage to the given maximum */ /** Limit the orphanage to the given maximum */
void LimitOrphans(unsigned int max_orphans) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); void LimitOrphans(unsigned int max_orphans) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/** Add any orphans that list a particular tx as a parent into a peer's work set */ /** Add any orphans that list a particular tx as a parent into the from peer's work set */
void AddChildrenToWorkSet(const CTransaction& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); void AddChildrenToWorkSet(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);;
/** Does this peer have any work to do? */
bool HaveTxToReconsider(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);;
/** Return how many entries exist in the orphange */ /** Return how many entries exist in the orphange */
size_t Size() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) size_t Size() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
@ -72,7 +73,7 @@ protected:
* -maxorphantx/DEFAULT_MAX_ORPHAN_TRANSACTIONS */ * -maxorphantx/DEFAULT_MAX_ORPHAN_TRANSACTIONS */
std::map<uint256, OrphanTx> m_orphans GUARDED_BY(m_mutex); std::map<uint256, OrphanTx> m_orphans GUARDED_BY(m_mutex);
/** Which peer provided a parent tx of orphans that need to be reconsidered */ /** Which peer provided the orphans that need to be reconsidered */
std::map<NodeId, std::set<uint256>> m_peer_work_set GUARDED_BY(m_mutex); std::map<NodeId, std::set<uint256>> m_peer_work_set GUARDED_BY(m_mutex);
using OrphanMap = decltype(m_orphans); using OrphanMap = decltype(m_orphans);