diff --git a/discovery/syncer.go b/discovery/syncer.go index 41cc525f0..e3e773eca 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -54,6 +54,12 @@ const ( PinnedSync ) +const ( + // defaultTimestampQueueSize is the size of the timestamp range queue + // used. + defaultTimestampQueueSize = 1 +) + // String returns a human readable string describing the target SyncerType. func (t SyncerType) String() string { switch t { @@ -285,6 +291,10 @@ type gossipSyncerCfg struct { // updates for a channel and returns true if the channel should be // considered a zombie based on these timestamps. isStillZombieChannel func(time.Time, time.Time) bool + + // timestampQueueSize is the size of the timestamp range queue. If not + // set, defaults to the global timestampQueueSize constant. + timestampQueueSize int } // GossipSyncer is a struct that handles synchronizing the channel graph state @@ -381,6 +391,11 @@ type GossipSyncer struct { // respond to gossip timestamp range messages. syncerSema chan struct{} + // timestampRangeQueue is a buffered channel for queuing timestamp range + // messages that need to be processed asynchronously. This prevents the + // gossiper from blocking when ApplyGossipFilter is called. + timestampRangeQueue chan *lnwire.GossipTimestampRange + sync.Mutex // cg is a helper that encapsulates a wait group and quit channel and @@ -392,14 +407,23 @@ type GossipSyncer struct { // newGossipSyncer returns a new instance of the GossipSyncer populated using // the passed config. func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer { + // Use the configured queue size if set, otherwise use the default. + queueSize := cfg.timestampQueueSize + if queueSize == 0 { + queueSize = defaultTimestampQueueSize + } + return &GossipSyncer{ cfg: cfg, syncTransitionReqs: make(chan *syncTransitionReq), historicalSyncReqs: make(chan *historicalSyncReq), gossipMsgs: make(chan lnwire.Message, syncerBufferSize), queryMsgs: make(chan lnwire.Message, syncerBufferSize), - syncerSema: sema, - cg: fn.NewContextGuard(), + timestampRangeQueue: make( + chan *lnwire.GossipTimestampRange, queueSize, + ), + syncerSema: sema, + cg: fn.NewContextGuard(), } } @@ -424,6 +448,13 @@ func (g *GossipSyncer) Start() { g.cg.WgAdd(1) go g.replyHandler(ctx) } + + // Start the timestamp range queue processor to handle gossip + // filter applications asynchronously. + if !g.cfg.noTimestampQueryOption { + g.cg.WgAdd(1) + go g.processTimestampRangeQueue(ctx) + } }) } @@ -674,6 +705,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) { } } +// processTimestampRangeQueue handles timestamp range messages from the queue +// asynchronously. This prevents blocking the gossiper when rate limiting is +// active and multiple peers are trying to apply gossip filters. +func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) { + defer g.cg.WgDone() + + for { + select { + case msg := <-g.timestampRangeQueue: + // Process the timestamp range message. If we hit an + // error, log it but continue processing to avoid + // blocking the queue. + err := g.ApplyGossipFilter(ctx, msg) + switch { + case errors.Is(err, ErrGossipSyncerExiting): + return + + case errors.Is(err, lnpeer.ErrPeerExiting): + return + + case err != nil: + log.Errorf("Unable to apply gossip filter: %v", + err) + } + + case <-g.cg.Done(): + return + + case <-ctx.Done(): + return + } + } +} + +// QueueTimestampRange attempts to queue a timestamp range message for +// asynchronous processing. If the queue is full, it returns false to indicate +// the message was dropped. +func (g *GossipSyncer) QueueTimestampRange( + msg *lnwire.GossipTimestampRange) bool { + + // If timestamp queries are disabled, don't queue the message. + if g.cfg.noTimestampQueryOption { + return false + } + + select { + case g.timestampRangeQueue <- msg: + return true + + // Queue is full, drop the message to prevent blocking. + default: + log.Warnf("Timestamp range queue full for peer %x, "+ + "dropping message", g.cfg.peerPub[:]) + return false + } +} + // sendGossipTimestampRange constructs and sets a GossipTimestampRange for the // syncer and sends it to the remote peer. func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,