From ce4fdd311747ed91c5517784945e302bcec3bb78 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 24 Jul 2025 18:58:10 -0500 Subject: [PATCH] discovery: only permit a single gossip backlog goroutine per peer In this commit, we add a new atomic bool to only permit a single gossip backlog goroutine per peer. If we get a new reuqest that needs a backlog while we're still processing the other, then we'll drop that request. --- discovery/syncer.go | 25 +++++ discovery/syncer_atomic_test.go | 172 ++++++++++++++++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 discovery/syncer_atomic_test.go diff --git a/discovery/syncer.go b/discovery/syncer.go index e3e773eca..82e17ee94 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -396,6 +396,11 @@ type GossipSyncer struct { // gossiper from blocking when ApplyGossipFilter is called. timestampRangeQueue chan *lnwire.GossipTimestampRange + // isSendingBacklog is an atomic flag that indicates whether a goroutine + // is currently sending the backlog of messages. This ensures only one + // goroutine is active at a time. + isSendingBacklog atomic.Bool + sync.Mutex // cg is a helper that encapsulates a wait group and quit channel and @@ -1396,6 +1401,14 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context, return nil } + // Check if a goroutine is already sending the backlog. If so, return + // early without attempting to acquire the semaphore. + if g.isSendingBacklog.Load() { + log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+ + "backlog send already in progress", g.cfg.peerPub[:]) + return nil + } + select { case <-g.syncerSema: case <-g.cg.Done(): @@ -1430,11 +1443,23 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context, return nil } + // Set the atomic flag to indicate we're starting to send the backlog. + // If the swap fails, it means another goroutine is already active, so + // we return early. + if !g.isSendingBacklog.CompareAndSwap(false, true) { + returnSema() + log.Debugf("GossipSyncer(%x): another goroutine already "+ + "sending backlog, skipping", g.cfg.peerPub[:]) + + return nil + } + // We'll conclude by launching a goroutine to send out any updates. g.cg.WgAdd(1) go func() { defer g.cg.WgDone() defer returnSema() + defer g.isSendingBacklog.Store(false) for _, msg := range newUpdatestoSend { err := g.cfg.sendToPeerSync(ctx, msg) diff --git a/discovery/syncer_atomic_test.go b/discovery/syncer_atomic_test.go new file mode 100644 index 000000000..9396dbc39 --- /dev/null +++ b/discovery/syncer_atomic_test.go @@ -0,0 +1,172 @@ +package discovery + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +// TestGossipSyncerSingleBacklogSend tests that only one goroutine can send the +// backlog at a time using the atomic flag. +func TestGossipSyncerSingleBacklogSend(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // Track how many goroutines are actively sending. + var ( + activeGoroutines atomic.Int32 + totalGoroutinesLaunched atomic.Int32 + ) + + // Create a blocking sendToPeerSync function. We'll use this to simulate + // sending a large backlog. + blockingSendChan := make(chan struct{}) + sendToPeerSync := func(_ context.Context, + msgs ...lnwire.Message) error { + + // Track that we're in a send goroutine. + count := activeGoroutines.Add(1) + totalGoroutinesLaunched.Add(1) + + // Verify only one goroutine is active. + require.Equal( + t, int32(1), count, + "only one goroutine should be sending at a time", + ) + + // We'll now block to simulate slow sending. + <-blockingSendChan + + // When we exit, we should decrement the count on the way out + activeGoroutines.Add(-1) + + return nil + } + + // Now we'll kick off the test by making a syncer that uses our blocking + // send function. + msgChan, syncer, chanSeries := newTestSyncer( + lnwire.NewShortChanIDFromInt(10), defaultEncoding, + defaultChunkSize, true, true, true, + ) + + // Override the sendToPeerSync to use our blocking version. + syncer.cfg.sendToPeerSync = sendToPeerSync + syncer.cfg.ignoreHistoricalFilters = false + + syncer.Start() + defer syncer.Stop() + + // Next, we'll launch a goroutine to send out a backlog of messages. + go func() { + for { + select { + case <-chanSeries.horizonReq: + cid := lnwire.NewShortChanIDFromInt(1) + chanSeries.horizonResp <- []lnwire.Message{ + &lnwire.ChannelUpdate1{ + ShortChannelID: cid, + Timestamp: uint32( + time.Now().Unix(), + ), + }, + } + + case <-time.After(5 * time.Second): + return + } + } + }() + + // Now we'll create a filter, then apply it in a goroutine. + filter := &lnwire.GossipTimestampRange{ + FirstTimestamp: uint32(time.Now().Unix() - 3600), + TimestampRange: 7200, + } + go func() { + err := syncer.ApplyGossipFilter(ctx, filter) + require.NoError(t, err) + }() + + // Wait for the first goroutine to start and block. + time.Sleep(100 * time.Millisecond) + + // Verify the atomic flag is set, as the first goroutine should be + // blocked on the send. + require.True( + t, syncer.isSendingBacklog.Load(), + "isSendingBacklog should be true while first goroutine "+ + "is active", + ) + + // Now apply more filters concurrently - they should all return early as + // we're still sending out the first backlog. + var ( + wg sync.WaitGroup + earlyReturns atomic.Int32 + ) + + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + // Record the flag state before calling. + flagWasSet := syncer.isSendingBacklog.Load() + + err := syncer.ApplyGossipFilter(ctx, filter) + require.NoError(t, err) + + // If the flag was already set, we should have returned + // early. + if flagWasSet { + earlyReturns.Add(1) + } + }() + } + + // Give time for the concurrent attempts to execute. + time.Sleep(100 * time.Millisecond) + + // There should still be only a single active goroutine. + require.Equal( + t, int32(1), activeGoroutines.Load(), + "only one goroutine should be active despite multiple attempts", + ) + + // Now we'll unblock the first goroutine, then wait for them all to + // exit. + close(blockingSendChan) + wg.Wait() + + // Give time for cleanup. + time.Sleep(100 * time.Millisecond) + + // At this point, only a single goroutine should have been launched, + require.Equal( + t, int32(1), totalGoroutinesLaunched.Load(), + "only one goroutine should have been launched total", + ) + require.GreaterOrEqual( + t, earlyReturns.Load(), int32(4), + "at least 4 calls should have returned early due to atomic "+ + "flag", + ) + + // The atomic flag should be cleared now. + require.False( + t, syncer.isSendingBacklog.Load(), + "isSendingBacklog should be false after goroutine completes", + ) + + // Drain any messages. + select { + case <-msgChan: + case <-time.After(100 * time.Millisecond): + } +}