Merge bitcoin/bitcoin#34059: refactor: Use NodeClock::time_point for m_addr_token_timestamp

faaea7895f refactor: Use current_time over redundant call to Now() (MarcoFalke)
3333c5023f refactor: Use NodeClock::time_point for m_addr_token_timestamp (MarcoFalke)
fa55723b8f move-only: Extract ProcessAddrs() helper (MarcoFalke)

Pull request description:

  It is a bit confusing to have some code use the deprecated `GetTime`, which returns a duration and not a time point, and other code to use `NodeClock` time points.

  Fix one place `m_addr_token_timestamp` to use `NodeClock::time_point`.

  Also:

  * Extract a `ProcessAddrs` helper, similar to the other `Process*()` helpers, to cut down the `ProcessMessage` with a massive scope.
  * Rename the confusing `current_a_time` to `now_seconds`. (The `a` in this context refers to the removed "adjusted" time, see commit fadd8b2676, which removed adjusted time here)

ACKs for top commit:
  l0rinc:
    ACK faaea7895f
  ajtowns:
    reACK faaea7895f
  sedited:
    Re-ACK faaea7895f

Tree-SHA512: 67ad13e9d7b88e08e3d723e6b7cd598b38df2a004f5c2338b24f2992e25ae9d8fb8e5325c9c94171e551fe86d87e3e3ec1fe6baae64edbf6b5c125f408ee64e4
This commit is contained in:
merge-script
2026-03-20 09:37:30 +01:00

View File

@@ -379,7 +379,7 @@ struct Peer {
* permit self-announcement. */
double m_addr_token_bucket GUARDED_BY(NetEventsInterface::g_msgproc_mutex){1.0};
/** When m_addr_token_bucket was last updated */
std::chrono::microseconds m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){GetTime<std::chrono::microseconds>()};
NodeClock::time_point m_addr_token_timestamp GUARDED_BY(NetEventsInterface::g_msgproc_mutex){NodeClock::now()};
/** Total number of addresses that were dropped due to rate limiting. */
std::atomic<uint64_t> m_addr_rate_limited{0};
/** Total number of addresses that were processed (excludes rate-limited ones). */
@@ -1083,6 +1083,9 @@ private:
*/
bool SetupAddressRelay(const CNode& node, Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector<CAddress>&& vAddr, const std::atomic<bool>& interruptMsgProc)
EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_peer_mutex);
void AddAddressKnown(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);
@@ -4026,90 +4029,8 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string
};
std::vector<CAddress> vAddr;
vRecv >> ser_params(vAddr);
if (!SetupAddressRelay(pfrom, peer)) {
LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}
if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}
// Store the new addresses
std::vector<CAddress> vAddrOk;
const auto current_a_time{Now<NodeSeconds>()};
// Update/increment addr rate limiting bucket.
const auto current_time{GetTime<std::chrono::microseconds>()};
if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff = std::max(current_time - peer.m_addr_token_timestamp, 0us);
const double increment = Ticks<SecondsDouble>(time_diff) * MAX_ADDR_RATE_PER_SECOND;
peer.m_addr_token_bucket = std::min<double>(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer.m_addr_token_timestamp = current_time;
const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;
// Apply rate limiting.
if (peer.m_addr_token_bucket < 1.0) {
if (rate_limited) {
++num_rate_limit;
continue;
}
} else {
peer.m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices))
continue;
if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_a_time + 10min) {
addr.nTime = current_a_time - 5 * 24h;
}
AddAddressKnown(peer, addr);
if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) {
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
const bool reachable{g_reachable_nets.Contains(addr)};
if (addr.nTime > current_a_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
RelayAddress(pfrom.GetId(), addr, reachable);
}
// Do not store addresses outside our network
if (reachable) {
vAddrOk.push_back(addr);
}
}
peer.m_addr_processed += num_proc;
peer.m_addr_rate_limited += num_rate_limit;
LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId());
m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h);
if (vAddr.size() < 1000) peer.m_getaddr_sent = false;
// AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements
if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) {
LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg());
pfrom.fDisconnect = true;
}
ProcessAddrs(msg_type, pfrom, peer, std::move(vAddr), interruptMsgProc);
return;
}
@@ -5701,6 +5622,93 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer)
return true;
}
void PeerManagerImpl::ProcessAddrs(std::string_view msg_type, CNode& pfrom, Peer& peer, std::vector<CAddress>&& vAddr, const std::atomic<bool>& interruptMsgProc)
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
if (!SetupAddressRelay(pfrom, peer)) {
LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}
if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size()));
return;
}
// Store the new addresses
std::vector<CAddress> vAddrOk;
// Update/increment addr rate limiting bucket.
const auto current_time{NodeClock::now()};
if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) {
// Don't increment bucket if it's already full
const auto time_diff{current_time - peer.m_addr_token_timestamp};
const double increment{std::max(Ticks<SecondsDouble>(time_diff), 0.0) * MAX_ADDR_RATE_PER_SECOND};
peer.m_addr_token_bucket = std::min<double>(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET);
}
peer.m_addr_token_timestamp = current_time;
const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr);
uint64_t num_proc = 0;
uint64_t num_rate_limit = 0;
std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
for (CAddress& addr : vAddr)
{
if (interruptMsgProc)
return;
// Apply rate limiting.
if (peer.m_addr_token_bucket < 1.0) {
if (rate_limited) {
++num_rate_limit;
continue;
}
} else {
peer.m_addr_token_bucket -= 1.0;
}
// We only bother storing full nodes, though this may include
// things which we would not make an outbound connection to, in
// part because we may make feeler connections to them.
if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices))
continue;
if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_time + 10min) {
addr.nTime = std::chrono::time_point_cast<std::chrono::seconds>(current_time - 5 * 24h);
}
AddAddressKnown(peer, addr);
if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) {
// Do not process banned/discouraged addresses beyond remembering we received them
continue;
}
++num_proc;
const bool reachable{g_reachable_nets.Contains(addr)};
if (addr.nTime > current_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) {
// Relay to a limited number of other nodes
RelayAddress(pfrom.GetId(), addr, reachable);
}
// Do not store addresses outside our network
if (reachable) {
vAddrOk.push_back(addr);
}
}
peer.m_addr_processed += num_proc;
peer.m_addr_rate_limited += num_rate_limit;
LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
vAddr.size(), num_proc, num_rate_limit, pfrom.GetId());
m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h);
if (vAddr.size() < 1000) peer.m_getaddr_sent = false;
// AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements
if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) {
LogDebug(BCLog::NET, "addrfetch connection completed, %s", pfrom.DisconnectMsg());
pfrom.fDisconnect = true;
}
}
bool PeerManagerImpl::SendMessages(CNode& node)
{
AssertLockNotHeld(m_tx_download_mutex);