discovery: only permit a single gossip backlog goroutine per peer

In this commit, we add a new atomic bool to only permit a single gossip
backlog goroutine per peer. If we get a new reuqest that needs a backlog
while we're still processing the other, then we'll drop that request.
This commit is contained in:
Olaoluwa Osuntokun
2025-07-24 18:58:10 -05:00
committed by Oliver Gugger
parent bb5825387e
commit ce4fdd3117
2 changed files with 197 additions and 0 deletions

View File

@@ -396,6 +396,11 @@ type GossipSyncer struct {
// gossiper from blocking when ApplyGossipFilter is called. // gossiper from blocking when ApplyGossipFilter is called.
timestampRangeQueue chan *lnwire.GossipTimestampRange timestampRangeQueue chan *lnwire.GossipTimestampRange
// isSendingBacklog is an atomic flag that indicates whether a goroutine
// is currently sending the backlog of messages. This ensures only one
// goroutine is active at a time.
isSendingBacklog atomic.Bool
sync.Mutex sync.Mutex
// cg is a helper that encapsulates a wait group and quit channel and // cg is a helper that encapsulates a wait group and quit channel and
@@ -1396,6 +1401,14 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
return nil return nil
} }
// Check if a goroutine is already sending the backlog. If so, return
// early without attempting to acquire the semaphore.
if g.isSendingBacklog.Load() {
log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
"backlog send already in progress", g.cfg.peerPub[:])
return nil
}
select { select {
case <-g.syncerSema: case <-g.syncerSema:
case <-g.cg.Done(): case <-g.cg.Done():
@@ -1430,11 +1443,23 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
return nil return nil
} }
// Set the atomic flag to indicate we're starting to send the backlog.
// If the swap fails, it means another goroutine is already active, so
// we return early.
if !g.isSendingBacklog.CompareAndSwap(false, true) {
returnSema()
log.Debugf("GossipSyncer(%x): another goroutine already "+
"sending backlog, skipping", g.cfg.peerPub[:])
return nil
}
// We'll conclude by launching a goroutine to send out any updates. // We'll conclude by launching a goroutine to send out any updates.
g.cg.WgAdd(1) g.cg.WgAdd(1)
go func() { go func() {
defer g.cg.WgDone() defer g.cg.WgDone()
defer returnSema() defer returnSema()
defer g.isSendingBacklog.Store(false)
for _, msg := range newUpdatestoSend { for _, msg := range newUpdatestoSend {
err := g.cfg.sendToPeerSync(ctx, msg) err := g.cfg.sendToPeerSync(ctx, msg)

View File

@@ -0,0 +1,172 @@
package discovery
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// TestGossipSyncerSingleBacklogSend tests that only one goroutine can send the
// backlog at a time using the atomic flag.
func TestGossipSyncerSingleBacklogSend(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Track how many goroutines are actively sending.
var (
activeGoroutines atomic.Int32
totalGoroutinesLaunched atomic.Int32
)
// Create a blocking sendToPeerSync function. We'll use this to simulate
// sending a large backlog.
blockingSendChan := make(chan struct{})
sendToPeerSync := func(_ context.Context,
msgs ...lnwire.Message) error {
// Track that we're in a send goroutine.
count := activeGoroutines.Add(1)
totalGoroutinesLaunched.Add(1)
// Verify only one goroutine is active.
require.Equal(
t, int32(1), count,
"only one goroutine should be sending at a time",
)
// We'll now block to simulate slow sending.
<-blockingSendChan
// When we exit, we should decrement the count on the way out
activeGoroutines.Add(-1)
return nil
}
// Now we'll kick off the test by making a syncer that uses our blocking
// send function.
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize, true, true, true,
)
// Override the sendToPeerSync to use our blocking version.
syncer.cfg.sendToPeerSync = sendToPeerSync
syncer.cfg.ignoreHistoricalFilters = false
syncer.Start()
defer syncer.Stop()
// Next, we'll launch a goroutine to send out a backlog of messages.
go func() {
for {
select {
case <-chanSeries.horizonReq:
cid := lnwire.NewShortChanIDFromInt(1)
chanSeries.horizonResp <- []lnwire.Message{
&lnwire.ChannelUpdate1{
ShortChannelID: cid,
Timestamp: uint32(
time.Now().Unix(),
),
},
}
case <-time.After(5 * time.Second):
return
}
}
}()
// Now we'll create a filter, then apply it in a goroutine.
filter := &lnwire.GossipTimestampRange{
FirstTimestamp: uint32(time.Now().Unix() - 3600),
TimestampRange: 7200,
}
go func() {
err := syncer.ApplyGossipFilter(ctx, filter)
require.NoError(t, err)
}()
// Wait for the first goroutine to start and block.
time.Sleep(100 * time.Millisecond)
// Verify the atomic flag is set, as the first goroutine should be
// blocked on the send.
require.True(
t, syncer.isSendingBacklog.Load(),
"isSendingBacklog should be true while first goroutine "+
"is active",
)
// Now apply more filters concurrently - they should all return early as
// we're still sending out the first backlog.
var (
wg sync.WaitGroup
earlyReturns atomic.Int32
)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Record the flag state before calling.
flagWasSet := syncer.isSendingBacklog.Load()
err := syncer.ApplyGossipFilter(ctx, filter)
require.NoError(t, err)
// If the flag was already set, we should have returned
// early.
if flagWasSet {
earlyReturns.Add(1)
}
}()
}
// Give time for the concurrent attempts to execute.
time.Sleep(100 * time.Millisecond)
// There should still be only a single active goroutine.
require.Equal(
t, int32(1), activeGoroutines.Load(),
"only one goroutine should be active despite multiple attempts",
)
// Now we'll unblock the first goroutine, then wait for them all to
// exit.
close(blockingSendChan)
wg.Wait()
// Give time for cleanup.
time.Sleep(100 * time.Millisecond)
// At this point, only a single goroutine should have been launched,
require.Equal(
t, int32(1), totalGoroutinesLaunched.Load(),
"only one goroutine should have been launched total",
)
require.GreaterOrEqual(
t, earlyReturns.Load(), int32(4),
"at least 4 calls should have returned early due to atomic "+
"flag",
)
// The atomic flag should be cleared now.
require.False(
t, syncer.isSendingBacklog.Load(),
"isSendingBacklog should be false after goroutine completes",
)
// Drain any messages.
select {
case <-msgChan:
case <-time.After(100 * time.Millisecond):
}
}