diff --git a/discovery/syncer.go b/discovery/syncer.go index 1f120b3f8..ce970eeef 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -492,15 +492,19 @@ func (g *GossipSyncer) Stop() { // handleSyncingChans handles the state syncingChans for the GossipSyncer. When // in this state, we will send a QueryChannelRange msg to our peer and advance -// the syncer's state to waitingQueryRangeReply. -func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { +// the syncer's state to waitingQueryRangeReply. Returns an error if a fatal +// error occurs that should cause the goroutine to exit. +func (g *GossipSyncer) handleSyncingChans(ctx context.Context) error { // Prepare the query msg. queryRangeMsg, err := g.genChanRangeQuery( ctx, g.genHistoricalChanRangeQuery, ) if err != nil { log.Errorf("Unable to gen chan range query: %v", err) - return + + // Any error here is likely fatal (context cancelled, db error, + // etc.), so return it to exit the goroutine cleanly. + return err } // Acquire a lock so the following state transition is atomic. @@ -517,12 +521,18 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) { err = g.sendToPeer(ctx, queryRangeMsg) if err != nil { log.Errorf("Unable to send chan range query: %v", err) - return + + // Any send error (peer exiting, connection closed, rate + // limiter signaling exit, etc.) is fatal, so return it to + // exit the goroutine cleanly. + return err } // With the message sent successfully, we'll transition into the next // state where we wait for their reply. g.setSyncState(waitingQueryRangeReply) + + return nil } // channelGraphSyncer is the main goroutine responsible for ensuring that we @@ -545,7 +555,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) { // understand, as we'll as responding to any other queries by // them. case syncingChans: - g.handleSyncingChans(ctx) + err := g.handleSyncingChans(ctx) + if err != nil { + log.Debugf("GossipSyncer(%x): exiting due to "+ + "error in syncingChans: %v", + g.cfg.peerPub[:], err) + + return + } // In this state, we've sent out our initial channel range // query and are waiting for the final response from the remote @@ -593,7 +610,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) { // First, we'll attempt to continue our channel // synchronization by continuing to send off another // query chunk. - done := g.synchronizeChanIDs(ctx) + done, err := g.synchronizeChanIDs(ctx) + if err != nil { + log.Debugf("GossipSyncer(%x): exiting due to "+ + "error in queryNewChannels: %v", + g.cfg.peerPub[:], err) + + return + } // If this wasn't our last query, then we'll need to // transition to our waiting state. @@ -819,8 +843,10 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context, // range. This method will be called continually until the entire range has // been queried for with a response received. We'll chunk our requests as // required to ensure they fit into a single message. We may re-renter this -// state in the case that chunking is required. -func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { +// state in the case that chunking is required. Returns true if synchronization +// is complete, and an error if a fatal error occurs that should cause the +// goroutine to exit. +func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) (bool, error) { // If we're in this state yet there are no more new channels to query // for, then we'll transition to our final synced state and return true // to signal that we're fully synchronized. @@ -828,7 +854,7 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { log.Infof("GossipSyncer(%x): no more chans to query", g.cfg.peerPub[:]) - return true + return true, nil } // Otherwise, we'll issue our next chunked query to receive replies @@ -864,9 +890,14 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool { }) if err != nil { log.Errorf("Unable to sync chan IDs: %v", err) + + // Any send error (peer exiting, connection closed, rate + // limiter signaling exit, etc.) is fatal, so return it to + // exit the goroutine cleanly. + return false, err } - return false + return false, nil } // isLegacyReplyChannelRange determines where a ReplyChannelRange message is diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 4420f4aa4..d9b5c199e 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -1518,7 +1518,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { for i := 0; i < chunkSize*2; i += 2 { // With our set up complete, we'll request a sync of chan ID's. - done := syncer.synchronizeChanIDs(t.Context()) + done, err := syncer.synchronizeChanIDs(t.Context()) + require.NoError(t, err) // At this point, we shouldn't yet be done as only 2 items // should have been queried for. @@ -1565,7 +1566,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) { } // If we issue another query, the syncer should tell us that it's done. - done := syncer.synchronizeChanIDs(t.Context()) + done, err := syncer.synchronizeChanIDs(t.Context()) + require.NoError(t, err) if done { t.Fatalf("syncer should be finished!") }