From 80e0ea0d4058d110bfad2ccba9721924d36da022 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 21 Jul 2025 11:51:57 -0500 Subject: [PATCH] discovery: add async timestamp range queue to prevent blocking In this commit, we introduce an asynchronous processing queue for GossipTimestampRange messages in the GossipSyncer. This change addresses a critical issue where the gossiper could block indefinitely when processing timestamp range messages during periods of high load. Previously, when a peer sent a GossipTimestampRange message, the gossiper would synchronously call ApplyGossipFilter, which could block on semaphore acquisition, database queries, and rate limiting. This synchronous processing created a bottleneck where the entire peer message processing pipeline would stall, potentially causing timeouts and disconnections. The new design adds a timestampRangeQueue channel with a capacity of 1 message and a dedicated goroutine for processing these messages asynchronously. This follows the established pattern used for other message types in the syncer. When the queue is full, we drop messages and log a warning rather than blocking indefinitely, providing graceful degradation under extreme load conditions. --- discovery/syncer.go | 92 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/discovery/syncer.go b/discovery/syncer.go index 41cc525f0..e3e773eca 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -54,6 +54,12 @@ const ( PinnedSync ) +const ( + // defaultTimestampQueueSize is the size of the timestamp range queue + // used. + defaultTimestampQueueSize = 1 +) + // String returns a human readable string describing the target SyncerType. func (t SyncerType) String() string { switch t { @@ -285,6 +291,10 @@ type gossipSyncerCfg struct { // updates for a channel and returns true if the channel should be // considered a zombie based on these timestamps. isStillZombieChannel func(time.Time, time.Time) bool + + // timestampQueueSize is the size of the timestamp range queue. If not + // set, defaults to the global timestampQueueSize constant. + timestampQueueSize int } // GossipSyncer is a struct that handles synchronizing the channel graph state @@ -381,6 +391,11 @@ type GossipSyncer struct { // respond to gossip timestamp range messages. syncerSema chan struct{} + // timestampRangeQueue is a buffered channel for queuing timestamp range + // messages that need to be processed asynchronously. This prevents the + // gossiper from blocking when ApplyGossipFilter is called. + timestampRangeQueue chan *lnwire.GossipTimestampRange + sync.Mutex // cg is a helper that encapsulates a wait group and quit channel and @@ -392,14 +407,23 @@ type GossipSyncer struct { // newGossipSyncer returns a new instance of the GossipSyncer populated using // the passed config. func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer { + // Use the configured queue size if set, otherwise use the default. + queueSize := cfg.timestampQueueSize + if queueSize == 0 { + queueSize = defaultTimestampQueueSize + } + return &GossipSyncer{ cfg: cfg, syncTransitionReqs: make(chan *syncTransitionReq), historicalSyncReqs: make(chan *historicalSyncReq), gossipMsgs: make(chan lnwire.Message, syncerBufferSize), queryMsgs: make(chan lnwire.Message, syncerBufferSize), - syncerSema: sema, - cg: fn.NewContextGuard(), + timestampRangeQueue: make( + chan *lnwire.GossipTimestampRange, queueSize, + ), + syncerSema: sema, + cg: fn.NewContextGuard(), } } @@ -424,6 +448,13 @@ func (g *GossipSyncer) Start() { g.cg.WgAdd(1) go g.replyHandler(ctx) } + + // Start the timestamp range queue processor to handle gossip + // filter applications asynchronously. + if !g.cfg.noTimestampQueryOption { + g.cg.WgAdd(1) + go g.processTimestampRangeQueue(ctx) + } }) } @@ -674,6 +705,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) { } } +// processTimestampRangeQueue handles timestamp range messages from the queue +// asynchronously. This prevents blocking the gossiper when rate limiting is +// active and multiple peers are trying to apply gossip filters. +func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) { + defer g.cg.WgDone() + + for { + select { + case msg := <-g.timestampRangeQueue: + // Process the timestamp range message. If we hit an + // error, log it but continue processing to avoid + // blocking the queue. + err := g.ApplyGossipFilter(ctx, msg) + switch { + case errors.Is(err, ErrGossipSyncerExiting): + return + + case errors.Is(err, lnpeer.ErrPeerExiting): + return + + case err != nil: + log.Errorf("Unable to apply gossip filter: %v", + err) + } + + case <-g.cg.Done(): + return + + case <-ctx.Done(): + return + } + } +} + +// QueueTimestampRange attempts to queue a timestamp range message for +// asynchronous processing. If the queue is full, it returns false to indicate +// the message was dropped. +func (g *GossipSyncer) QueueTimestampRange( + msg *lnwire.GossipTimestampRange) bool { + + // If timestamp queries are disabled, don't queue the message. + if g.cfg.noTimestampQueryOption { + return false + } + + select { + case g.timestampRangeQueue <- msg: + return true + + // Queue is full, drop the message to prevent blocking. + default: + log.Warnf("Timestamp range queue full for peer %x, "+ + "dropping message", g.cfg.peerPub[:]) + return false + } +} + // sendGossipTimestampRange constructs and sets a GossipTimestampRange for the // syncer and sends it to the remote peer. func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,