mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-11-10 14:17:56 +01:00
discovery: update ApplyGossipFilter to use lazy iterator with Pull2
In this commit, we update ApplyGossipFilter to leverage the new iterator-based UpdatesInHorizon method. The key innovation here is using iter.Pull2 to create a pull-based iterator that allows us to check if any updates exist before launching the background goroutine. This approach provides several benefits over the previous implementation. First, we avoid the overhead of launching a goroutine when there are no updates to send, which was previously unavoidable without materializing the entire result set. Second, we maintain lazy loading throughout the sending process, only pulling messages from the database as they're needed for transmission. The implementation uses Pull2 to peek at the first message, determining whether to proceed with sending updates. If updates exist, ownership of the iterator is transferred to the goroutine, which continues pulling and sending messages until exhausted. This design ensures memory usage remains bounded regardless of the number of updates being synchronized.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -1446,8 +1447,29 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
|
|||||||
g.cfg.chainHash, startTime, endTime,
|
g.cfg.chainHash, startTime, endTime,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Create a pull-based iterator so we can check if there are any
|
||||||
|
// updates before launching the goroutine.
|
||||||
|
next, stop := iter.Pull2(newUpdatestoSend)
|
||||||
|
|
||||||
|
// Check if we have any updates to send by attempting to get the first
|
||||||
|
// message.
|
||||||
|
firstMsg, firstErr, ok := next()
|
||||||
|
if firstErr != nil {
|
||||||
|
stop()
|
||||||
|
returnSema()
|
||||||
|
return firstErr
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
|
log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
|
||||||
"start=%v, end=%v", g.cfg.peerPub[:], startTime, endTime)
|
"start=%v, end=%v, has_updates=%v", g.cfg.peerPub[:],
|
||||||
|
startTime, endTime, ok)
|
||||||
|
|
||||||
|
// If we don't have any to send, then we can return early.
|
||||||
|
if !ok {
|
||||||
|
stop()
|
||||||
|
returnSema()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Set the atomic flag to indicate we're starting to send the backlog.
|
// Set the atomic flag to indicate we're starting to send the backlog.
|
||||||
// If the swap fails, it means another goroutine is already active, so
|
// If the swap fails, it means another goroutine is already active, so
|
||||||
@@ -1461,14 +1483,45 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// We'll conclude by launching a goroutine to send out any updates.
|
// We'll conclude by launching a goroutine to send out any updates.
|
||||||
|
// The goroutine takes ownership of the iterator.
|
||||||
g.cg.WgAdd(1)
|
g.cg.WgAdd(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer g.cg.WgDone()
|
defer g.cg.WgDone()
|
||||||
defer returnSema()
|
defer returnSema()
|
||||||
defer g.isSendingBacklog.Store(false)
|
defer g.isSendingBacklog.Store(false)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
for msg := range newUpdatestoSend {
|
// Send the first message we already pulled.
|
||||||
err := g.sendToPeerSync(ctx, msg)
|
err := g.sendToPeerSync(ctx, firstMsg)
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, ErrGossipSyncerExiting):
|
||||||
|
return
|
||||||
|
|
||||||
|
case errors.Is(err, lnpeer.ErrPeerExiting):
|
||||||
|
return
|
||||||
|
|
||||||
|
case err != nil:
|
||||||
|
log.Errorf("Unable to send message for "+
|
||||||
|
"peer catch up: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Continue with the rest of the messages using the same pull
|
||||||
|
// iterator.
|
||||||
|
for {
|
||||||
|
msg, err, ok := next()
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the iterator yielded an error, log it and
|
||||||
|
// continue.
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error fetching update for peer "+
|
||||||
|
"catch up: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = g.sendToPeerSync(ctx, msg)
|
||||||
switch {
|
switch {
|
||||||
case err == ErrGossipSyncerExiting:
|
case err == ErrGossipSyncerExiting:
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user