diff --git a/discovery/gossiper.go b/discovery/gossiper.go index c4677bbf5..2473eda24 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -399,6 +399,10 @@ type Config struct { // MsgBurstBytes is the allotted burst amount in bytes. This is the // number of starting tokens in our token bucket algorithm. MsgBurstBytes uint64 + + // FilterConcurrency is the maximum number of concurrent gossip filter + // applications that can be processed. + FilterConcurrency int } // processedNetworkMsg is a wrapper around networkMsg and a boolean. It is @@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper IsStillZombieChannel: cfg.IsStillZombieChannel, AllotedMsgBytesPerSecond: cfg.MsgRateBytes, AllotedMsgBytesBurst: cfg.MsgBurstBytes, + FilterConcurrency: cfg.FilterConcurrency, }) gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ @@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context, return errChan } - // If we've found the message target, then we'll dispatch the - // message directly to it. - if err := syncer.ApplyGossipFilter(ctx, m); err != nil { - log.Warnf("Unable to apply gossip filter for peer=%x: "+ - "%v", peer.PubKey(), err) + // Queue the message for asynchronous processing to prevent + // blocking the gossiper when rate limiting is active. + if !syncer.QueueTimestampRange(m) { + log.Warnf("Unable to queue gossip filter for peer=%x: "+ + "queue full", peer.PubKey()) - errChan <- err + // Return nil to indicate we've handled the message, + // even though it was dropped. This prevents the peer + // from being disconnected. + errChan <- nil return errChan }