Merge bitcoin/bitcoin#26140: refactor: Move CNodeState members guarded by g_msgproc_mutex to Peer

3a060ae7b6 scripted-diff: Rename nUnconnectingHeaders and fPreferHeaders (dergoegge)
279c53d7e4 [net processing] Move m_recently_announced_invs from CNodeState to Peer (dergoegge)
938a8e2566 [net processing] Annotate m_recently_announced_invs as guarded by g_msgproc_mutex (dergoegge)
8a2cb1f749 [net processing] Move fPreferHeaders from CNodeState to Peer (dergoegge)
3605011e79 [net processing] Annotate fPreferHeaders as guarded by g_msgproc_mutex (dergoegge)
4b84e502f5 [net processing] Move m_headers_sync_timeout from CNodeState to Peer (dergoegge)
689b747fc3 [net processing] Annotate m_headers_sync_timeout as guarded by g_msgproc_mutex (dergoegge)
d8c0d1c345 [net processing] Move nUnconnectingHeaders from CNodeState to Peer (dergoegge)
5f80d8d1ee [net processing] Annotate nUnconnectingHeaders as guarded by g_msgproc_mutex (dergoegge)
1d87137227 [validation] Annotate ChainstateManager::m_best_header as guarded by cs_main (dergoegge)

Pull request description:

  `nUnconnectingHeaders`, `m_headers_sync_timeout`, `fPreferHeaders` and  `m_recently_announced_headers` are currently all `CNodeState` members even though they are only ever accessed from the message processing thread (therefore sufficiently guarded exclusively by `g_msgproc_mutex`). `CNodeState` exists purely to hold validation-specific state guarded by `cs_main` that is accessed by multiple threads.

  This PR adds thread-safety annotations for the above mentioned `CNodeState` members and moves them to `Peer`.

ACKs for top commit:
  glozow:
    code review ACK 3a060ae7b6, as in I am convinced these members shouldn't be guarded by cs_main and belong in Peer/TxRelay. clang checked the annotations for me.
  hebasto:
    ACK 3a060ae7b6

Tree-SHA512: 2db27c03f2c6ed36ad7dfbb4f862eeed3c3e57f845cf8abb9e7cada36f976257311892020bbcff513fbe662a881c93270e3a126946ceb0c3f94213b546bcaa81
This commit is contained in:
glozow
2023-03-30 17:10:33 +01:00
3 changed files with 62 additions and 54 deletions

View File

@@ -135,7 +135,7 @@ static constexpr double BLOCK_DOWNLOAD_TIMEOUT_PER_PEER = 0.5;
/** Maximum number of headers to announce when relaying blocks with headers message.*/
static const unsigned int MAX_BLOCKS_TO_ANNOUNCE = 8;
/** Maximum number of unconnecting headers announcements before DoS score */
static const int MAX_UNCONNECTING_HEADERS = 10;
static const int MAX_NUM_UNCONNECTING_HEADERS_MSGS = 10;
/** Minimum blocks required to signal NODE_NETWORK_LIMITED */
static const unsigned int NODE_NETWORK_LIMITED_MIN_BLOCKS = 288;
/** Average delay between local address broadcasts */
@@ -278,6 +278,9 @@ struct Peer {
/** A bloom filter for which transactions to announce to the peer. See BIP37. */
std::unique_ptr<CBloomFilter> m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr};
/** A rolling bloom filter of all announced tx CInvs to this peer */
CRollingBloomFilter m_recently_announced_invs GUARDED_BY(NetEventsInterface::g_msgproc_mutex){INVENTORY_MAX_RECENT_RELAY, 0.000001};
mutable RecursiveMutex m_tx_inventory_mutex;
/** A filter of all the txids and wtxids that the peer has announced to
* us or we have announced to the peer. We use this to avoid announcing
@@ -314,6 +317,10 @@ struct Peer {
{
return WITH_LOCK(m_tx_relay_mutex, return m_tx_relay.get());
};
const TxRelay* GetTxRelay() const EXCLUSIVE_LOCKS_REQUIRED(!m_tx_relay_mutex)
{
return WITH_LOCK(m_tx_relay_mutex, return m_tx_relay.get());
};
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send GUARDED_BY(NetEventsInterface::g_msgproc_mutex);
@@ -385,13 +392,22 @@ struct Peer {
/** Whether we've sent our peer a sendheaders message. **/
std::atomic<bool> m_sent_sendheaders{false};
/** Length of current-streak of unconnecting headers announcements */
int m_num_unconnecting_headers_msgs GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
/** When to potentially disconnect peer for stalling headers download */
std::chrono::microseconds m_headers_sync_timeout GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0us};
/** Whether this peer wants invs or headers (when possible) for block announcements */
bool m_prefers_headers GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false};
explicit Peer(NodeId id, ServiceFlags our_services)
: m_id{id}
, m_our_services{our_services}
{}
private:
Mutex m_tx_relay_mutex;
mutable Mutex m_tx_relay_mutex;
/** Transaction relay data. May be a nullptr. */
std::unique_ptr<TxRelay> m_tx_relay GUARDED_BY(m_tx_relay_mutex);
@@ -414,12 +430,8 @@ struct CNodeState {
const CBlockIndex* pindexLastCommonBlock{nullptr};
//! The best header we have sent our peer.
const CBlockIndex* pindexBestHeaderSent{nullptr};
//! Length of current-streak of unconnecting headers announcements
int nUnconnectingHeaders{0};
//! Whether we've started headers synchronization with this peer.
bool fSyncStarted{false};
//! When to potentially disconnect peer for stalling headers download
std::chrono::microseconds m_headers_sync_timeout{0us};
//! Since when we're stalling block download progress (in microseconds), or 0.
std::chrono::microseconds m_stalling_since{0us};
std::list<QueuedBlock> vBlocksInFlight;
@@ -428,8 +440,6 @@ struct CNodeState {
int nBlocksInFlight{0};
//! Whether we consider this a preferred download peer.
bool fPreferredDownload{false};
//! Whether this peer wants invs or headers (when possible) for block announcements.
bool fPreferHeaders{false};
/** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */
bool m_requested_hb_cmpctblocks{false};
/** Whether this peer will send us cmpctblocks if we request them. */
@@ -478,9 +488,6 @@ struct CNodeState {
//! Whether this peer is an inbound connection
const bool m_is_inbound;
//! A rolling bloom filter of all announced tx CInvs to this peer.
CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001};
CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {}
};
@@ -666,7 +673,8 @@ private:
/** Potentially fetch blocks from this peer upon receipt of a new headers tip */
void HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, const CBlockIndex& last_header);
/** Update peer state based on received headers message */
void UpdatePeerStateForReceivedHeaders(CNode& pfrom, const CBlockIndex& last_header, bool received_new_header, bool may_have_more_headers);
void UpdatePeerStateForReceivedHeaders(CNode& pfrom, Peer& peer, const CBlockIndex& last_header, bool received_new_header, bool may_have_more_headers)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void SendBlockTransactions(CNode& pfrom, Peer& peer, const CBlock& block, const BlockTransactionsRequest& req);
@@ -900,10 +908,12 @@ private:
std::atomic<std::chrono::seconds> m_last_tip_update{0s};
/** Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). */
CTransactionRef FindTxForGetData(const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main);
CTransactionRef FindTxForGetData(const Peer& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now)
LOCKS_EXCLUDED(cs_main) EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex);
void ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic<bool>& interruptMsgProc)
EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex, peer.m_getdata_requests_mutex) LOCKS_EXCLUDED(::cs_main);
EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex, peer.m_getdata_requests_mutex, NetEventsInterface::g_msgproc_mutex)
LOCKS_EXCLUDED(::cs_main);
/** Process a new block. Perform any post-processing housekeeping */
void ProcessBlock(CNode& node, const std::shared_ptr<const CBlock>& block, bool force_processing, bool min_pow_checked);
@@ -2248,7 +2258,7 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv&
}
}
CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now)
CTransactionRef PeerManagerImpl::FindTxForGetData(const Peer& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now)
{
auto txinfo = m_mempool.info(gtxid);
if (txinfo.tx) {
@@ -2263,7 +2273,7 @@ CTransactionRef PeerManagerImpl::FindTxForGetData(const CNode& peer, const GenTx
{
LOCK(cs_main);
// Otherwise, the transaction must have been announced recently.
if (State(peer.GetId())->m_recently_announced_invs.contains(gtxid.GetHash())) {
if (Assume(peer.GetTxRelay())->m_recently_announced_invs.contains(gtxid.GetHash())) {
// If it was, it can be relayed from either the mempool...
if (txinfo.tx) return std::move(txinfo.tx);
// ... or the relay pool.
@@ -2306,7 +2316,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
continue;
}
CTransactionRef tx = FindTxForGetData(pfrom, ToGenTxid(inv), mempool_req, now);
CTransactionRef tx = FindTxForGetData(peer, ToGenTxid(inv), mempool_req, now);
if (tx) {
// WTX and WITNESS_TX imply we serialize with witness
int nSendFlags = (inv.IsMsgTx() ? SERIALIZE_TRANSACTION_NO_WITNESS : 0);
@@ -2330,8 +2340,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
for (const uint256& parent_txid : parent_ids_to_add) {
// Relaying a transaction with a recent but unconfirmed parent.
if (WITH_LOCK(tx_relay->m_tx_inventory_mutex, return !tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) {
LOCK(cs_main);
State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid);
tx_relay->m_recently_announced_invs.insert(parent_txid);
}
}
} else {
@@ -2430,36 +2439,35 @@ arith_uint256 PeerManagerImpl::GetAntiDoSWorkThreshold()
*
* We'll send a getheaders message in response to try to connect the chain.
*
* The peer can send up to MAX_UNCONNECTING_HEADERS in a row that
* The peer can send up to MAX_NUM_UNCONNECTING_HEADERS_MSGS in a row that
* don't connect before given DoS points.
*
* Once a headers message is received that is valid and does connect,
* nUnconnectingHeaders gets reset back to 0.
* m_num_unconnecting_headers_msgs gets reset back to 0.
*/
void PeerManagerImpl::HandleFewUnconnectingHeaders(CNode& pfrom, Peer& peer,
const std::vector<CBlockHeader>& headers)
{
LOCK(cs_main);
CNodeState *nodestate = State(pfrom.GetId());
nodestate->nUnconnectingHeaders++;
peer.m_num_unconnecting_headers_msgs++;
// Try to fill in the missing headers.
if (MaybeSendGetHeaders(pfrom, GetLocator(m_chainman.m_best_header), peer)) {
LogPrint(BCLog::NET, "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n",
const CBlockIndex* best_header{WITH_LOCK(cs_main, return m_chainman.m_best_header)};
if (MaybeSendGetHeaders(pfrom, GetLocator(best_header), peer)) {
LogPrint(BCLog::NET, "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, m_num_unconnecting_headers_msgs=%d)\n",
headers[0].GetHash().ToString(),
headers[0].hashPrevBlock.ToString(),
m_chainman.m_best_header->nHeight,
pfrom.GetId(), nodestate->nUnconnectingHeaders);
best_header->nHeight,
pfrom.GetId(), peer.m_num_unconnecting_headers_msgs);
}
// Set hashLastUnknownBlock for this peer, so that if we
// eventually get the headers - even from a different peer -
// we can use this peer to download.
UpdateBlockAvailability(pfrom.GetId(), headers.back().GetHash());
WITH_LOCK(cs_main, UpdateBlockAvailability(pfrom.GetId(), headers.back().GetHash()));
// The peer may just be broken, so periodically assign DoS points if this
// condition persists.
if (nodestate->nUnconnectingHeaders % MAX_UNCONNECTING_HEADERS == 0) {
Misbehaving(peer, 20, strprintf("%d non-connecting headers", nodestate->nUnconnectingHeaders));
if (peer.m_num_unconnecting_headers_msgs % MAX_NUM_UNCONNECTING_HEADERS_MSGS == 0) {
Misbehaving(peer, 20, strprintf("%d non-connecting headers", peer.m_num_unconnecting_headers_msgs));
}
}
@@ -2707,15 +2715,16 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c
* whether that header was new and whether the headers message was full,
* update the state we keep for the peer.
*/
void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom,
void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(CNode& pfrom, Peer& peer,
const CBlockIndex& last_header, bool received_new_header, bool may_have_more_headers)
{
if (peer.m_num_unconnecting_headers_msgs > 0) {
LogPrint(BCLog::NET, "peer=%d: resetting m_num_unconnecting_headers_msgs (%d -> 0)\n", pfrom.GetId(), peer.m_num_unconnecting_headers_msgs);
}
peer.m_num_unconnecting_headers_msgs = 0;
LOCK(cs_main);
CNodeState *nodestate = State(pfrom.GetId());
if (nodestate->nUnconnectingHeaders > 0) {
LogPrint(BCLog::NET, "peer=%d: resetting nUnconnectingHeaders (%d -> 0)\n", pfrom.GetId(), nodestate->nUnconnectingHeaders);
}
nodestate->nUnconnectingHeaders = 0;
UpdateBlockAvailability(pfrom.GetId(), last_header.GetBlockHash());
@@ -2900,7 +2909,7 @@ void PeerManagerImpl::ProcessHeadersMessage(CNode& pfrom, Peer& peer,
}
}
UpdatePeerStateForReceivedHeaders(pfrom, *pindexLast, received_new_header, nCount == MAX_HEADERS_RESULTS);
UpdatePeerStateForReceivedHeaders(pfrom, peer, *pindexLast, received_new_header, nCount == MAX_HEADERS_RESULTS);
// Consider immediately downloading blocks.
HeadersDirectFetchBlocks(pfrom, peer, *pindexLast);
@@ -3444,8 +3453,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}
if (msg_type == NetMsgType::SENDHEADERS) {
LOCK(cs_main);
State(pfrom.GetId())->fPreferHeaders = true;
peer->m_prefers_headers = true;
return;
}
@@ -5426,7 +5434,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), peer->m_starting_height);
state.fSyncStarted = true;
state.m_headers_sync_timeout = current_time + HEADERS_DOWNLOAD_TIMEOUT_BASE +
peer->m_headers_sync_timeout = current_time + HEADERS_DOWNLOAD_TIMEOUT_BASE +
(
// Convert HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER to microseconds before scaling
// to maintain precision
@@ -5451,7 +5459,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// add all to the inv queue.
LOCK(peer->m_block_inv_mutex);
std::vector<CBlock> vHeaders;
bool fRevertToInv = ((!state.fPreferHeaders &&
bool fRevertToInv = ((!peer->m_prefers_headers &&
(!state.m_requested_hb_cmpctblocks || peer->m_blocks_for_headers_relay.size() > 1)) ||
peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE);
const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery
@@ -5528,7 +5536,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::CMPCTBLOCK, cmpctblock));
}
state.pindexBestHeaderSent = pBestIndex;
} else if (state.fPreferHeaders) {
} else if (peer->m_prefers_headers) {
if (vHeaders.size() > 1) {
LogPrint(BCLog::NET, "%s: %u headers, range (%s, %s), to peer=%d\n", __func__,
vHeaders.size(),
@@ -5682,7 +5690,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
State(pto->GetId())->m_recently_announced_invs.insert(hash);
tx_relay->m_recently_announced_invs.insert(hash);
vInv.push_back(inv);
nRelayedTransactions++;
{
@@ -5753,10 +5761,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
}
}
// Check for headers sync timeouts
if (state.fSyncStarted && state.m_headers_sync_timeout < std::chrono::microseconds::max()) {
if (state.fSyncStarted && peer->m_headers_sync_timeout < std::chrono::microseconds::max()) {
// Detect whether this is a stalling initial-headers-sync peer
if (m_chainman.m_best_header->Time() <= GetAdjustedTime() - 24h) {
if (current_time > state.m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) {
if (current_time > peer->m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) {
// Disconnect a peer (without NetPermissionFlags::NoBan permission) if it is our only sync peer,
// and we have others we could be using instead.
// Note: If all our peers are inbound, then we won't
@@ -5775,13 +5783,13 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// this peer (eventually).
state.fSyncStarted = false;
nSyncStarted--;
state.m_headers_sync_timeout = 0us;
peer->m_headers_sync_timeout = 0us;
}
}
} else {
// After we've caught up once, reset the timeout so we can't trigger
// disconnect later.
state.m_headers_sync_timeout = std::chrono::microseconds::max();
peer->m_headers_sync_timeout = std::chrono::microseconds::max();
}
}

View File

@@ -997,7 +997,7 @@ public:
std::set<CBlockIndex*> m_failed_blocks;
/** Best header we've seen so far (used for getheaders queries' starting points). */
CBlockIndex* m_best_header = nullptr;
CBlockIndex* m_best_header GUARDED_BY(::cs_main){nullptr};
//! The total number of bytes available for us to use across all in-memory
//! coins caches. This will be split somehow across chainstates.

View File

@@ -546,15 +546,15 @@ class SendHeadersTest(BitcoinTestFramework):
blocks = []
# Now we test that if we repeatedly don't send connecting headers, we
# don't go into an infinite loop trying to get them to connect.
MAX_UNCONNECTING_HEADERS = 10
for _ in range(MAX_UNCONNECTING_HEADERS + 1):
MAX_NUM_UNCONNECTING_HEADERS_MSGS = 10
for _ in range(MAX_NUM_UNCONNECTING_HEADERS_MSGS + 1):
blocks.append(create_block(tip, create_coinbase(height), block_time))
blocks[-1].solve()
tip = blocks[-1].sha256
block_time += 1
height += 1
for i in range(1, MAX_UNCONNECTING_HEADERS):
for i in range(1, MAX_NUM_UNCONNECTING_HEADERS_MSGS):
# Send a header that doesn't connect, check that we get a getheaders.
with p2p_lock:
test_node.last_message.pop("getheaders", None)
@@ -568,8 +568,8 @@ class SendHeadersTest(BitcoinTestFramework):
blocks = blocks[2:]
# Now try to see how many unconnecting headers we can send
# before we get disconnected. Should be 5*MAX_UNCONNECTING_HEADERS
for i in range(5 * MAX_UNCONNECTING_HEADERS - 1):
# before we get disconnected. Should be 5*MAX_NUM_UNCONNECTING_HEADERS_MSGS
for i in range(5 * MAX_NUM_UNCONNECTING_HEADERS_MSGS - 1):
# Send a header that doesn't connect, check that we get a getheaders.
with p2p_lock:
test_node.last_message.pop("getheaders", None)