mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-08 23:01:53 +02:00
discovery: only permit a single gossip backlog goroutine per peer
In this commit, we add a new atomic bool to only permit a single gossip backlog goroutine per peer. If we get a new reuqest that needs a backlog while we're still processing the other, then we'll drop that request.
This commit is contained in:
@@ -396,6 +396,11 @@ type GossipSyncer struct {
|
||||
// gossiper from blocking when ApplyGossipFilter is called.
|
||||
timestampRangeQueue chan *lnwire.GossipTimestampRange
|
||||
|
||||
// isSendingBacklog is an atomic flag that indicates whether a goroutine
|
||||
// is currently sending the backlog of messages. This ensures only one
|
||||
// goroutine is active at a time.
|
||||
isSendingBacklog atomic.Bool
|
||||
|
||||
sync.Mutex
|
||||
|
||||
// cg is a helper that encapsulates a wait group and quit channel and
|
||||
@@ -1396,6 +1401,14 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if a goroutine is already sending the backlog. If so, return
|
||||
// early without attempting to acquire the semaphore.
|
||||
if g.isSendingBacklog.Load() {
|
||||
log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
|
||||
"backlog send already in progress", g.cfg.peerPub[:])
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-g.syncerSema:
|
||||
case <-g.cg.Done():
|
||||
@@ -1430,11 +1443,23 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set the atomic flag to indicate we're starting to send the backlog.
|
||||
// If the swap fails, it means another goroutine is already active, so
|
||||
// we return early.
|
||||
if !g.isSendingBacklog.CompareAndSwap(false, true) {
|
||||
returnSema()
|
||||
log.Debugf("GossipSyncer(%x): another goroutine already "+
|
||||
"sending backlog, skipping", g.cfg.peerPub[:])
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// We'll conclude by launching a goroutine to send out any updates.
|
||||
g.cg.WgAdd(1)
|
||||
go func() {
|
||||
defer g.cg.WgDone()
|
||||
defer returnSema()
|
||||
defer g.isSendingBacklog.Store(false)
|
||||
|
||||
for _, msg := range newUpdatestoSend {
|
||||
err := g.cfg.sendToPeerSync(ctx, msg)
|
||||
|
Reference in New Issue
Block a user