From 77f3b356408753b5286cb90bf95e2fbcffd1118c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 10 Sep 2025 18:23:15 -0700 Subject: [PATCH] discovery: update ApplyGossipFilter to use lazy iterator with Pull2 In this commit, we update ApplyGossipFilter to leverage the new iterator-based UpdatesInHorizon method. The key innovation here is using iter.Pull2 to create a pull-based iterator that allows us to check if any updates exist before launching the background goroutine. This approach provides several benefits over the previous implementation. First, we avoid the overhead of launching a goroutine when there are no updates to send, which was previously unavoidable without materializing the entire result set. Second, we maintain lazy loading throughout the sending process, only pulling messages from the database as they're needed for transmission. The implementation uses Pull2 to peek at the first message, determining whether to proceed with sending updates. If updates exist, ownership of the iterator is transferred to the goroutine, which continues pulling and sending messages until exhausted. This design ensures memory usage remains bounded regardless of the number of updates being synchronized. --- discovery/syncer.go | 59 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 3 deletions(-) diff --git a/discovery/syncer.go b/discovery/syncer.go index 478e7acd3..b8c51a9a5 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "math" "math/rand" "sort" @@ -1446,8 +1447,29 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context, g.cfg.chainHash, startTime, endTime, ) + // Create a pull-based iterator so we can check if there are any + // updates before launching the goroutine. + next, stop := iter.Pull2(newUpdatestoSend) + + // Check if we have any updates to send by attempting to get the first + // message. + firstMsg, firstErr, ok := next() + if firstErr != nil { + stop() + returnSema() + return firstErr + } + log.Infof("GossipSyncer(%x): applying new remote update horizon: "+ - "start=%v, end=%v", g.cfg.peerPub[:], startTime, endTime) + "start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:], + startTime, endTime, ok) + + // If we don't have any to send, then we can return early. + if !ok { + stop() + returnSema() + return nil + } // Set the atomic flag to indicate we're starting to send the backlog. // If the swap fails, it means another goroutine is already active, so @@ -1461,14 +1483,45 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context, } // We'll conclude by launching a goroutine to send out any updates. + // The goroutine takes ownership of the iterator. g.cg.WgAdd(1) go func() { defer g.cg.WgDone() defer returnSema() defer g.isSendingBacklog.Store(false) + defer stop() - for msg := range newUpdatestoSend { - err := g.sendToPeerSync(ctx, msg) + // Send the first message we already pulled. + err := g.sendToPeerSync(ctx, firstMsg) + switch { + case errors.Is(err, ErrGossipSyncerExiting): + return + + case errors.Is(err, lnpeer.ErrPeerExiting): + return + + case err != nil: + log.Errorf("Unable to send message for "+ + "peer catch up: %v", err) + } + + // Continue with the rest of the messages using the same pull + // iterator. + for { + msg, err, ok := next() + if !ok { + return + } + + // If the iterator yielded an error, log it and + // continue. + if err != nil { + log.Errorf("Error fetching update for peer "+ + "catch up: %v", err) + continue + } + + err = g.sendToPeerSync(ctx, msg) switch { case err == ErrGossipSyncerExiting: return