discovery: fix endless loop in gossip syncer on context cancellation

This commit fixes a critical bug where the channelGraphSyncer goroutine
would enter an endless loop when context cancellation or peer disconnect
errors occurred during the syncingChans or queryNewChannels states.

The root cause was that state handler functions (handleSyncingChans and
synchronizeChanIDs) did not return errors to the main goroutine loop.
When these functions encountered fatal errors like context cancellation,
they would log the error and return early without changing the syncer's
state. This caused the main loop to immediately re-enter the same state
handler, encounter the same error, and loop indefinitely while spamming
error logs.

The fix makes error handling explicit by having state handlers return
errors. The main channelGraphSyncer loop now checks these errors and
exits cleanly when fatal errors occur. We return any error (not just
context cancellation) because fatal errors can manifest in multiple
forms: context.Canceled, ErrGossipSyncerExiting from the rate limiter,
lnpeer.ErrPeerExiting from Brontide, or network errors like connection
closed. This approach matches the error handling pattern already used in
other goroutines like replyHandler.
This commit is contained in:
Olaoluwa Osuntokun
2025-10-29 12:52:42 -07:00
parent f938e40afe
commit 06c7d60452
2 changed files with 45 additions and 12 deletions

View File

@@ -492,15 +492,19 @@ func (g *GossipSyncer) Stop() {
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When // handleSyncingChans handles the state syncingChans for the GossipSyncer. When
// in this state, we will send a QueryChannelRange msg to our peer and advance // in this state, we will send a QueryChannelRange msg to our peer and advance
// the syncer's state to waitingQueryRangeReply. // the syncer's state to waitingQueryRangeReply. Returns an error if a fatal
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { // error occurs that should cause the goroutine to exit.
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) error {
// Prepare the query msg. // Prepare the query msg.
queryRangeMsg, err := g.genChanRangeQuery( queryRangeMsg, err := g.genChanRangeQuery(
ctx, g.genHistoricalChanRangeQuery, ctx, g.genHistoricalChanRangeQuery,
) )
if err != nil { if err != nil {
log.Errorf("Unable to gen chan range query: %v", err) log.Errorf("Unable to gen chan range query: %v", err)
return
// Any error here is likely fatal (context cancelled, db error,
// etc.), so return it to exit the goroutine cleanly.
return err
} }
// Acquire a lock so the following state transition is atomic. // Acquire a lock so the following state transition is atomic.
@@ -517,12 +521,18 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
err = g.sendToPeer(ctx, queryRangeMsg) err = g.sendToPeer(ctx, queryRangeMsg)
if err != nil { if err != nil {
log.Errorf("Unable to send chan range query: %v", err) log.Errorf("Unable to send chan range query: %v", err)
return
// Any send error (peer exiting, connection closed, rate
// limiter signaling exit, etc.) is fatal, so return it to
// exit the goroutine cleanly.
return err
} }
// With the message sent successfully, we'll transition into the next // With the message sent successfully, we'll transition into the next
// state where we wait for their reply. // state where we wait for their reply.
g.setSyncState(waitingQueryRangeReply) g.setSyncState(waitingQueryRangeReply)
return nil
} }
// channelGraphSyncer is the main goroutine responsible for ensuring that we // channelGraphSyncer is the main goroutine responsible for ensuring that we
@@ -545,7 +555,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
// understand, as we'll as responding to any other queries by // understand, as we'll as responding to any other queries by
// them. // them.
case syncingChans: case syncingChans:
g.handleSyncingChans(ctx) err := g.handleSyncingChans(ctx)
if err != nil {
log.Debugf("GossipSyncer(%x): exiting due to "+
"error in syncingChans: %v",
g.cfg.peerPub[:], err)
return
}
// In this state, we've sent out our initial channel range // In this state, we've sent out our initial channel range
// query and are waiting for the final response from the remote // query and are waiting for the final response from the remote
@@ -593,7 +610,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
// First, we'll attempt to continue our channel // First, we'll attempt to continue our channel
// synchronization by continuing to send off another // synchronization by continuing to send off another
// query chunk. // query chunk.
done := g.synchronizeChanIDs(ctx) done, err := g.synchronizeChanIDs(ctx)
if err != nil {
log.Debugf("GossipSyncer(%x): exiting due to "+
"error in queryNewChannels: %v",
g.cfg.peerPub[:], err)
return
}
// If this wasn't our last query, then we'll need to // If this wasn't our last query, then we'll need to
// transition to our waiting state. // transition to our waiting state.
@@ -819,8 +843,10 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
// range. This method will be called continually until the entire range has // range. This method will be called continually until the entire range has
// been queried for with a response received. We'll chunk our requests as // been queried for with a response received. We'll chunk our requests as
// required to ensure they fit into a single message. We may re-renter this // required to ensure they fit into a single message. We may re-renter this
// state in the case that chunking is required. // state in the case that chunking is required. Returns true if synchronization
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { // is complete, and an error if a fatal error occurs that should cause the
// goroutine to exit.
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) (bool, error) {
// If we're in this state yet there are no more new channels to query // If we're in this state yet there are no more new channels to query
// for, then we'll transition to our final synced state and return true // for, then we'll transition to our final synced state and return true
// to signal that we're fully synchronized. // to signal that we're fully synchronized.
@@ -828,7 +854,7 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
log.Infof("GossipSyncer(%x): no more chans to query", log.Infof("GossipSyncer(%x): no more chans to query",
g.cfg.peerPub[:]) g.cfg.peerPub[:])
return true return true, nil
} }
// Otherwise, we'll issue our next chunked query to receive replies // Otherwise, we'll issue our next chunked query to receive replies
@@ -864,9 +890,14 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
}) })
if err != nil { if err != nil {
log.Errorf("Unable to sync chan IDs: %v", err) log.Errorf("Unable to sync chan IDs: %v", err)
// Any send error (peer exiting, connection closed, rate
// limiter signaling exit, etc.) is fatal, so return it to
// exit the goroutine cleanly.
return false, err
} }
return false return false, nil
} }
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is // isLegacyReplyChannelRange determines where a ReplyChannelRange message is

View File

@@ -1518,7 +1518,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
for i := 0; i < chunkSize*2; i += 2 { for i := 0; i < chunkSize*2; i += 2 {
// With our set up complete, we'll request a sync of chan ID's. // With our set up complete, we'll request a sync of chan ID's.
done := syncer.synchronizeChanIDs(t.Context()) done, err := syncer.synchronizeChanIDs(t.Context())
require.NoError(t, err)
// At this point, we shouldn't yet be done as only 2 items // At this point, we shouldn't yet be done as only 2 items
// should have been queried for. // should have been queried for.
@@ -1565,7 +1566,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
} }
// If we issue another query, the syncer should tell us that it's done. // If we issue another query, the syncer should tell us that it's done.
done := syncer.synchronizeChanIDs(t.Context()) done, err := syncer.synchronizeChanIDs(t.Context())
require.NoError(t, err)
if done { if done {
t.Fatalf("syncer should be finished!") t.Fatalf("syncer should be finished!")
} }