Merge bitcoin/bitcoin#26295: Replace global g_cs_orphans lock with local

7082ce3e88 scripted-diff: rename and de-globalise g_cs_orphans (Anthony Towns)
733d85f79c Move all g_cs_orphans locking to txorphanage (Anthony Towns)
a936f41a5d txorphanage: make m_peer_work_set private (Anthony Towns)
3614819864 txorphange: move orphan workset to txorphanage (Anthony Towns)
6f8e442ba6 net_processing: Localise orphan_work_set handling to ProcessOrphanTx (Anthony Towns)
0027174b39 net_processing: move ProcessOrphanTx docs to declaration (Anthony Towns)
9910ed755c net_processing: Pass a Peer& to ProcessOrphanTx (Anthony Towns)
89e2e0da0b net_processing: move extra transactions to msgproc mutex (Anthony Towns)
ff8d44d196 Remove unnecessary includes of txorphange.h (Anthony Towns)

Pull request description:

  Moves extra transactions to be under the `m_msgproc_mutex` lock rather than `g_cs_orphans` and refactors orphan handling so that the lock can be internal to the `TxOrphange` class.

ACKs for top commit:
  dergoegge:
    Code review ACK 7082ce3e88
  glozow:
    ACK 7082ce3e88 via code review and some [basic testing](https://github.com/glozow/bitcoin/blob/review-26295/src/test/orphanage_tests.cpp#L150). I think putting txorphanage in charge of handling peer work sets is the right direction.

Tree-SHA512: 1ec454c3a69ebd45ff652770d6a55c6b183db71aba4d12639ed70f525f0035e069a81d06e9b65b66e87929c607080a1c5e5dcd2ca91eaa2cf202dc6c02aa6818
This commit is contained in:
glozow
2022-11-28 10:51:15 +00:00
8 changed files with 124 additions and 97 deletions

View File

@@ -365,9 +365,6 @@ struct Peer {
/** Total number of addresses that were processed (excludes rate-limited ones). */
std::atomic<uint64_t> m_addr_processed{0};
/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);
/** Whether we've sent this peer a getheaders in response to an inv prior to initial-headers-sync completing */
bool m_inv_triggered_getheaders_before_sync GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
@@ -584,8 +581,17 @@ private:
*/
bool MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer);
void ProcessOrphanTx(std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
/**
* Reconsider orphan transactions after a parent has been accepted to the mempool.
*
* @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only one
* orphan will be reconsidered on each call of this function. This set
* may be added to if accepting an orphan causes its children to be
* reconsidered.
* @return True if there are still orphans in this peer's work set.
*/
bool ProcessOrphanTx(Peer& peer)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main);
/** Process a single headers message from a peer.
*
* @param[in] pfrom CNode of the peer
@@ -919,14 +925,14 @@ private:
/** Storage for orphan information */
TxOrphanage m_orphanage;
void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans);
void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
/** Orphan/conflicted/etc transactions that are kept for compact block reconstruction.
* The last -blockreconstructionextratxn/DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN of
* these are kept in a ring buffer */
std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_cs_orphans);
std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_msgproc_mutex);
/** Offset into vExtraTxnForCompact to insert the next tx */
size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0;
size_t vExtraTxnForCompactIt GUARDED_BY(g_msgproc_mutex) = 0;
/** Check whether the last unknown block a peer advertised is not yet known. */
void ProcessBlockAvailability(NodeId nodeid) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@@ -1490,7 +1496,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
for (const QueuedBlock& entry : state->vBlocksInFlight) {
mapBlocksInFlight.erase(entry.pindex->GetBlockHash());
}
WITH_LOCK(g_cs_orphans, m_orphanage.EraseForPeer(nodeid));
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
m_num_preferred_download_peers -= state->fPreferredDownload;
@@ -2880,33 +2886,24 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer,
return;
}
/**
* Reconsider orphan transactions after a parent has been accepted to the mempool.
*
* @param[in,out] orphan_work_set The set of orphan transactions to reconsider. Generally only one
* orphan will be reconsidered on each call of this function. This set
* may be added to if accepting an orphan causes its children to be
* reconsidered.
*/
void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set)
bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
{
AssertLockHeld(g_msgproc_mutex);
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
while (!orphan_work_set.empty()) {
const uint256 orphanHash = *orphan_work_set.begin();
orphan_work_set.erase(orphan_work_set.begin());
const auto [porphanTx, from_peer] = m_orphanage.GetTx(orphanHash);
if (porphanTx == nullptr) continue;
CTransactionRef porphanTx = nullptr;
NodeId from_peer = -1;
bool more = false;
while (CTransactionRef porphanTx = m_orphanage.GetTxToReconsider(peer.m_id, from_peer, more)) {
const MempoolAcceptResult result = m_chainman.ProcessTransaction(porphanTx);
const TxValidationState& state = result.m_state;
const uint256& orphanHash = porphanTx->GetHash();
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanHash, porphanTx->GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(*porphanTx, orphan_work_set);
m_orphanage.AddChildrenToWorkSet(*porphanTx, peer.m_id);
m_orphanage.EraseTx(orphanHash);
for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) {
AddToCompactExtraTransactions(removedTx);
@@ -2957,6 +2954,8 @@ void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set)
break;
}
}
return more;
}
bool PeerManagerImpl::PrepareBlockFilterRequest(CNode& node, Peer& peer,
@@ -3990,7 +3989,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
AddKnownTx(*peer, txid);
}
LOCK2(cs_main, g_cs_orphans);
LOCK(cs_main);
m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
@@ -4031,7 +4030,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
m_txrequest.ForgetTxHash(tx.GetHash());
m_txrequest.ForgetTxHash(tx.GetWitnessHash());
RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(tx, peer->m_orphan_work_set);
m_orphanage.AddChildrenToWorkSet(tx, peer->m_id);
pfrom.m_last_tx_time = GetTime<std::chrono::seconds>();
@@ -4045,7 +4044,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}
// Recursively process any orphan transactions that depended on this one
ProcessOrphanTx(peer->m_orphan_work_set);
ProcessOrphanTx(*peer);
}
else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS)
{
@@ -4226,7 +4225,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
bool fBlockReconstructed = false;
{
LOCK2(cs_main, g_cs_orphans);
LOCK(cs_main);
// If AcceptBlockHeader returned true, it set pindex
assert(pindex);
UpdateBlockAvailability(pfrom.GetId(), pindex->GetBlockHash());
@@ -4854,16 +4853,17 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
}
}
bool has_more_orphans;
{
LOCK2(cs_main, g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) {
ProcessOrphanTx(peer->m_orphan_work_set);
}
LOCK(cs_main);
has_more_orphans = ProcessOrphanTx(*peer);
}
if (pfrom->fDisconnect)
return false;
if (has_more_orphans) return true;
// this maintains the order of responses
// and prevents m_getdata_requests to grow unbounded
{
@@ -4871,11 +4871,6 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
if (!peer->m_getdata_requests.empty()) return true;
}
{
LOCK(g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) return true;
}
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend) return false;