move-only: Extract ProcessAddrs() helper

This can be reviewed via the git options:
--color-moved=dimmed-zebra  --color-moved-ws=ignore-all-space
This commit is contained in:
MarcoFalke
2025-12-12 14:53:42 +01:00
parent ff7cdf633e
commit fa55723b8f

View File

@@ -1083,6 +1083,9 @@ private:
*/
bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector<CAddress>&& vAddr, const std::atomic<bool>& interruptMsgProc)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_peer_mutex);
void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
@@ -4026,90 +4029,8 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string
};
std::vector<CAddress> vAddr;
vRecv >> ser_params(vAddr);
if (!SetupAddressRelay(pfrom, peer)) {
LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}
if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}
// Store the new addresses
std::vector<CAddress> vAddrOk;
const auto current_a_time{Now<NodeSeconds>()};
// Update/increment addr rate limiting bucket.
const auto current_time{GetTime<std::chrono::microseconds>()};
if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - peer.m_addr_token_timestamp, 0us);
const double increment = Ticks<SecondsDouble>(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer.m_addr_token_bucket = std::min<double>(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer.m_addr_token_timestamp = current_time;
const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;
// Apply rate limiting.
if (peer.m_addr_token_bucket < 1.0) {
if (rate_limited) {
++num_rate_limit;
continue;
}
} else {
peer.m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices))
continue;
if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_a_time + 10min) {
addr.nTime = current_a_time - 5 * 24h;
}
AddAddressKnown(peer, addr);
if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) {
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
const bool reachable{g_reachable_nets.Contains(addr)};
if (addr.nTime > current_a_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
RelayAddress(pfrom.GetId(), addr, reachable);
}
// Do not store addresses outside our network
if (reachable) {
vAddrOk.push_back(addr);
}
}
peer.m_addr_processed += num_proc;
peer.m_addr_rate_limited += num_rate_limit;
LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId());
m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h);
if (vAddr.size() < 1000) peer.m_getaddr_sent = false;
// AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements
if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) {
LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg());
pfrom.fDisconnect = true;
}
ProcessAddrs(msg_type, pfrom, peer, std::move(vAddr), interruptMsgProc);
return;
}
@@ -5701,6 +5622,94 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
return true;
}
void PeerManagerImpl::ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector<CAddress>&& vAddr, const std::atomic<bool>& interruptMsgProc)
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
if (!SetupAddressRelay(pfrom, peer)) {
LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}
if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}
// Store the new addresses
std::vector<CAddress> vAddrOk;
const auto current_a_time{Now<NodeSeconds>()};
// Update/increment addr rate limiting bucket.
const auto current_time{GetTime<std::chrono::microseconds>()};
if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - peer.m_addr_token_timestamp, 0us);
const double increment = Ticks<SecondsDouble>(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer.m_addr_token_bucket = std::min<double>(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer.m_addr_token_timestamp = current_time;
const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;
// Apply rate limiting.
if (peer.m_addr_token_bucket < 1.0) {
if (rate_limited) {
++num_rate_limit;
continue;
}
} else {
peer.m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices))
continue;
if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_a_time + 10min) {
addr.nTime = current_a_time - 5 * 24h;
}
AddAddressKnown(peer, addr);
if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) {
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
const bool reachable{g_reachable_nets.Contains(addr)};
if (addr.nTime > current_a_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
RelayAddress(pfrom.GetId(), addr, reachable);
}
// Do not store addresses outside our network
if (reachable) {
vAddrOk.push_back(addr);
}
}
peer.m_addr_processed += num_proc;
peer.m_addr_rate_limited += num_rate_limit;
LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId());
m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h);
if (vAddr.size() < 1000) peer.m_getaddr_sent = false;
// AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements
if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) {
LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg());
pfrom.fDisconnect = true;
}
}
bool PeerManagerImpl::SendMessages(CNode& node)
{
AssertLockNotHeld(m_tx_download_mutex);