mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-29 15:11:09 +02:00
discovery: add async timestamp range queue to prevent blocking
In this commit, we introduce an asynchronous processing queue for GossipTimestampRange messages in the GossipSyncer. This change addresses a critical issue where the gossiper could block indefinitely when processing timestamp range messages during periods of high load. Previously, when a peer sent a GossipTimestampRange message, the gossiper would synchronously call ApplyGossipFilter, which could block on semaphore acquisition, database queries, and rate limiting. This synchronous processing created a bottleneck where the entire peer message processing pipeline would stall, potentially causing timeouts and disconnections. The new design adds a timestampRangeQueue channel with a capacity of 1 message and a dedicated goroutine for processing these messages asynchronously. This follows the established pattern used for other message types in the syncer. When the queue is full, we drop messages and log a warning rather than blocking indefinitely, providing graceful degradation under extreme load conditions.
This commit is contained in:
committed by
Oliver Gugger
parent
57872b9cff
commit
80e0ea0d40
@@ -54,6 +54,12 @@ const (
|
|||||||
PinnedSync
|
PinnedSync
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defaultTimestampQueueSize is the size of the timestamp range queue
|
||||||
|
// used.
|
||||||
|
defaultTimestampQueueSize = 1
|
||||||
|
)
|
||||||
|
|
||||||
// String returns a human readable string describing the target SyncerType.
|
// String returns a human readable string describing the target SyncerType.
|
||||||
func (t SyncerType) String() string {
|
func (t SyncerType) String() string {
|
||||||
switch t {
|
switch t {
|
||||||
@@ -285,6 +291,10 @@ type gossipSyncerCfg struct {
|
|||||||
// updates for a channel and returns true if the channel should be
|
// updates for a channel and returns true if the channel should be
|
||||||
// considered a zombie based on these timestamps.
|
// considered a zombie based on these timestamps.
|
||||||
isStillZombieChannel func(time.Time, time.Time) bool
|
isStillZombieChannel func(time.Time, time.Time) bool
|
||||||
|
|
||||||
|
// timestampQueueSize is the size of the timestamp range queue. If not
|
||||||
|
// set, defaults to the global timestampQueueSize constant.
|
||||||
|
timestampQueueSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// GossipSyncer is a struct that handles synchronizing the channel graph state
|
// GossipSyncer is a struct that handles synchronizing the channel graph state
|
||||||
@@ -381,6 +391,11 @@ type GossipSyncer struct {
|
|||||||
// respond to gossip timestamp range messages.
|
// respond to gossip timestamp range messages.
|
||||||
syncerSema chan struct{}
|
syncerSema chan struct{}
|
||||||
|
|
||||||
|
// timestampRangeQueue is a buffered channel for queuing timestamp range
|
||||||
|
// messages that need to be processed asynchronously. This prevents the
|
||||||
|
// gossiper from blocking when ApplyGossipFilter is called.
|
||||||
|
timestampRangeQueue chan *lnwire.GossipTimestampRange
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
// cg is a helper that encapsulates a wait group and quit channel and
|
// cg is a helper that encapsulates a wait group and quit channel and
|
||||||
@@ -392,14 +407,23 @@ type GossipSyncer struct {
|
|||||||
// newGossipSyncer returns a new instance of the GossipSyncer populated using
|
// newGossipSyncer returns a new instance of the GossipSyncer populated using
|
||||||
// the passed config.
|
// the passed config.
|
||||||
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
|
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
|
||||||
|
// Use the configured queue size if set, otherwise use the default.
|
||||||
|
queueSize := cfg.timestampQueueSize
|
||||||
|
if queueSize == 0 {
|
||||||
|
queueSize = defaultTimestampQueueSize
|
||||||
|
}
|
||||||
|
|
||||||
return &GossipSyncer{
|
return &GossipSyncer{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
syncTransitionReqs: make(chan *syncTransitionReq),
|
syncTransitionReqs: make(chan *syncTransitionReq),
|
||||||
historicalSyncReqs: make(chan *historicalSyncReq),
|
historicalSyncReqs: make(chan *historicalSyncReq),
|
||||||
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
|
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
|
||||||
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
|
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
|
||||||
syncerSema: sema,
|
timestampRangeQueue: make(
|
||||||
cg: fn.NewContextGuard(),
|
chan *lnwire.GossipTimestampRange, queueSize,
|
||||||
|
),
|
||||||
|
syncerSema: sema,
|
||||||
|
cg: fn.NewContextGuard(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -424,6 +448,13 @@ func (g *GossipSyncer) Start() {
|
|||||||
g.cg.WgAdd(1)
|
g.cg.WgAdd(1)
|
||||||
go g.replyHandler(ctx)
|
go g.replyHandler(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start the timestamp range queue processor to handle gossip
|
||||||
|
// filter applications asynchronously.
|
||||||
|
if !g.cfg.noTimestampQueryOption {
|
||||||
|
g.cg.WgAdd(1)
|
||||||
|
go g.processTimestampRangeQueue(ctx)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -674,6 +705,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processTimestampRangeQueue handles timestamp range messages from the queue
|
||||||
|
// asynchronously. This prevents blocking the gossiper when rate limiting is
|
||||||
|
// active and multiple peers are trying to apply gossip filters.
|
||||||
|
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
|
||||||
|
defer g.cg.WgDone()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg := <-g.timestampRangeQueue:
|
||||||
|
// Process the timestamp range message. If we hit an
|
||||||
|
// error, log it but continue processing to avoid
|
||||||
|
// blocking the queue.
|
||||||
|
err := g.ApplyGossipFilter(ctx, msg)
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, ErrGossipSyncerExiting):
|
||||||
|
return
|
||||||
|
|
||||||
|
case errors.Is(err, lnpeer.ErrPeerExiting):
|
||||||
|
return
|
||||||
|
|
||||||
|
case err != nil:
|
||||||
|
log.Errorf("Unable to apply gossip filter: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-g.cg.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueTimestampRange attempts to queue a timestamp range message for
|
||||||
|
// asynchronous processing. If the queue is full, it returns false to indicate
|
||||||
|
// the message was dropped.
|
||||||
|
func (g *GossipSyncer) QueueTimestampRange(
|
||||||
|
msg *lnwire.GossipTimestampRange) bool {
|
||||||
|
|
||||||
|
// If timestamp queries are disabled, don't queue the message.
|
||||||
|
if g.cfg.noTimestampQueryOption {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case g.timestampRangeQueue <- msg:
|
||||||
|
return true
|
||||||
|
|
||||||
|
// Queue is full, drop the message to prevent blocking.
|
||||||
|
default:
|
||||||
|
log.Warnf("Timestamp range queue full for peer %x, "+
|
||||||
|
"dropping message", g.cfg.peerPub[:])
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
|
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
|
||||||
// syncer and sends it to the remote peer.
|
// syncer and sends it to the remote peer.
|
||||||
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
|
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
|
||||||
|
Reference in New Issue
Block a user