discovery: integrate async queue in ProcessRemoteAnnouncement

In this commit, we complete the integration of the asynchronous
timestamp range queue by modifying ProcessRemoteAnnouncement to use
the new queuing mechanism instead of calling ApplyGossipFilter
synchronously.

This change ensures that when a peer sends a GossipTimestampRange
message, it is queued for asynchronous processing rather than
blocking the gossiper's main message processing loop. The modification
prevents the peer's readHandler from blocking on potentially slow
gossip filter operations, maintaining connection stability during
periods of high synchronization activity.

If the queue is full when attempting to enqueue a message, we log
a warning but return success to prevent peer disconnection. This
design choice prioritizes connection stability over guaranteed
delivery of every gossip filter request, which is acceptable since
peers can always resend timestamp range messages if needed.
This commit is contained in:
Olaoluwa Osuntokun
2025-07-21 11:52:15 -05:00
parent 7fb289f24f
commit f3ba372441

View File

@@ -399,6 +399,10 @@ type Config struct {
// MsgBurstBytes is the allotted burst amount in bytes. This is the
// number of starting tokens in our token bucket algorithm.
MsgBurstBytes uint64
// FilterConcurrency is the maximum number of concurrent gossip filter
// applications that can be processed.
FilterConcurrency int
}
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
@@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
IsStillZombieChannel: cfg.IsStillZombieChannel,
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
FilterConcurrency: cfg.FilterConcurrency,
})
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
@@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
if err := syncer.ApplyGossipFilter(ctx, m); err != nil {
log.Warnf("Unable to apply gossip filter for peer=%x: "+
"%v", peer.PubKey(), err)
// Queue the message for asynchronous processing to prevent
// blocking the gossiper when rate limiting is active.
if !syncer.QueueTimestampRange(m) {
log.Warnf("Unable to queue gossip filter for peer=%x: "+
"queue full", peer.PubKey())
errChan <- err
// Return nil to indicate we've handled the message,
// even though it was dropped. This prevents the peer
// from being disconnected.
errChan <- nil
return errChan
}