discovery: add async timestamp range queue to prevent blocking

In this commit, we introduce an asynchronous processing queue for
GossipTimestampRange messages in the GossipSyncer. This change addresses
a critical issue where the gossiper could block indefinitely when
processing timestamp range messages during periods of high load.

Previously, when a peer sent a GossipTimestampRange message, the
gossiper would synchronously call ApplyGossipFilter, which could block
on semaphore acquisition, database queries, and rate limiting. This
synchronous processing created a bottleneck where the entire peer
message processing pipeline would stall, potentially causing timeouts
and disconnections.

The new design adds a timestampRangeQueue channel with a capacity of 1
message and a dedicated goroutine for processing these messages
asynchronously. This follows the established pattern used for other
message types in the syncer. When the queue is full, we drop messages
and log a warning rather than blocking indefinitely, providing graceful
degradation under extreme load conditions.
This commit is contained in:
Olaoluwa Osuntokun
2025-07-21 11:51:57 -05:00
parent 694cc15a73
commit 7fb289f24f

View File

@@ -54,6 +54,12 @@ const (
PinnedSync
)
const (
// defaultTimestampQueueSize is the size of the timestamp range queue
// used.
defaultTimestampQueueSize = 1
)
// String returns a human readable string describing the target SyncerType.
func (t SyncerType) String() string {
switch t {
@@ -285,6 +291,10 @@ type gossipSyncerCfg struct {
// updates for a channel and returns true if the channel should be
// considered a zombie based on these timestamps.
isStillZombieChannel func(time.Time, time.Time) bool
// timestampQueueSize is the size of the timestamp range queue. If not
// set, defaults to the global timestampQueueSize constant.
timestampQueueSize int
}
// GossipSyncer is a struct that handles synchronizing the channel graph state
@@ -381,6 +391,11 @@ type GossipSyncer struct {
// respond to gossip timestamp range messages.
syncerSema chan struct{}
// timestampRangeQueue is a buffered channel for queuing timestamp range
// messages that need to be processed asynchronously. This prevents the
// gossiper from blocking when ApplyGossipFilter is called.
timestampRangeQueue chan *lnwire.GossipTimestampRange
sync.Mutex
// cg is a helper that encapsulates a wait group and quit channel and
@@ -392,14 +407,23 @@ type GossipSyncer struct {
// newGossipSyncer returns a new instance of the GossipSyncer populated using
// the passed config.
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
// Use the configured queue size if set, otherwise use the default.
queueSize := cfg.timestampQueueSize
if queueSize == 0 {
queueSize = defaultTimestampQueueSize
}
return &GossipSyncer{
cfg: cfg,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
syncerSema: sema,
cg: fn.NewContextGuard(),
timestampRangeQueue: make(
chan *lnwire.GossipTimestampRange, queueSize,
),
syncerSema: sema,
cg: fn.NewContextGuard(),
}
}
@@ -422,6 +446,13 @@ func (g *GossipSyncer) Start() {
g.cg.WgAdd(1)
go g.replyHandler(ctx)
}
// Start the timestamp range queue processor to handle gossip
// filter applications asynchronously.
if !g.cfg.noTimestampQueryOption {
g.cg.WgAdd(1)
go g.processTimestampRangeQueue(ctx)
}
})
}
@@ -672,6 +703,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) {
}
}
// processTimestampRangeQueue handles timestamp range messages from the queue
// asynchronously. This prevents blocking the gossiper when rate limiting is
// active and multiple peers are trying to apply gossip filters.
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
defer g.cg.WgDone()
for {
select {
case msg := <-g.timestampRangeQueue:
// Process the timestamp range message. If we hit an
// error, log it but continue processing to avoid
// blocking the queue.
err := g.ApplyGossipFilter(ctx, msg)
switch {
case errors.Is(err, ErrGossipSyncerExiting):
return
case errors.Is(err, lnpeer.ErrPeerExiting):
return
case err != nil:
log.Errorf("Unable to apply gossip filter: %v",
err)
}
case <-g.cg.Done():
return
case <-ctx.Done():
return
}
}
}
// QueueTimestampRange attempts to queue a timestamp range message for
// asynchronous processing. If the queue is full, it returns false to indicate
// the message was dropped.
func (g *GossipSyncer) QueueTimestampRange(
msg *lnwire.GossipTimestampRange) bool {
// If timestamp queries are disabled, don't queue the message.
if g.cfg.noTimestampQueryOption {
return false
}
select {
case g.timestampRangeQueue <- msg:
return true
// Queue is full, drop the message to prevent blocking.
default:
log.Warnf("Timestamp range queue full for peer %x, "+
"dropping message", g.cfg.peerPub[:])
return false
}
}
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
// syncer and sends it to the remote peer.
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,