mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-11-09 21:56:47 +01:00
Merge pull request #10330 from Roasbeef/fix-error-handling-gossiper
discovery: fix potential infinite loop bug re context cancel error handling in gossip syncer
This commit is contained in:
@@ -492,15 +492,19 @@ func (g *GossipSyncer) Stop() {
|
||||
|
||||
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
|
||||
// in this state, we will send a QueryChannelRange msg to our peer and advance
|
||||
// the syncer's state to waitingQueryRangeReply.
|
||||
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
|
||||
// the syncer's state to waitingQueryRangeReply. Returns an error if a fatal
|
||||
// error occurs that should cause the goroutine to exit.
|
||||
func (g *GossipSyncer) handleSyncingChans(ctx context.Context) error {
|
||||
// Prepare the query msg.
|
||||
queryRangeMsg, err := g.genChanRangeQuery(
|
||||
ctx, g.genHistoricalChanRangeQuery,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to gen chan range query: %v", err)
|
||||
return
|
||||
|
||||
// Any error here is likely fatal (context cancelled, db error,
|
||||
// etc.), so return it to exit the goroutine cleanly.
|
||||
return err
|
||||
}
|
||||
|
||||
// Acquire a lock so the following state transition is atomic.
|
||||
@@ -517,12 +521,18 @@ func (g *GossipSyncer) handleSyncingChans(ctx context.Context) {
|
||||
err = g.sendToPeer(ctx, queryRangeMsg)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to send chan range query: %v", err)
|
||||
return
|
||||
|
||||
// Any send error (peer exiting, connection closed, rate
|
||||
// limiter signaling exit, etc.) is fatal, so return it to
|
||||
// exit the goroutine cleanly.
|
||||
return err
|
||||
}
|
||||
|
||||
// With the message sent successfully, we'll transition into the next
|
||||
// state where we wait for their reply.
|
||||
g.setSyncState(waitingQueryRangeReply)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// channelGraphSyncer is the main goroutine responsible for ensuring that we
|
||||
@@ -545,7 +555,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
|
||||
// understand, as we'll as responding to any other queries by
|
||||
// them.
|
||||
case syncingChans:
|
||||
g.handleSyncingChans(ctx)
|
||||
err := g.handleSyncingChans(ctx)
|
||||
if err != nil {
|
||||
log.Debugf("GossipSyncer(%x): exiting due to "+
|
||||
"error in syncingChans: %v",
|
||||
g.cfg.peerPub[:], err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// In this state, we've sent out our initial channel range
|
||||
// query and are waiting for the final response from the remote
|
||||
@@ -593,7 +610,14 @@ func (g *GossipSyncer) channelGraphSyncer(ctx context.Context) {
|
||||
// First, we'll attempt to continue our channel
|
||||
// synchronization by continuing to send off another
|
||||
// query chunk.
|
||||
done := g.synchronizeChanIDs(ctx)
|
||||
done, err := g.synchronizeChanIDs(ctx)
|
||||
if err != nil {
|
||||
log.Debugf("GossipSyncer(%x): exiting due to "+
|
||||
"error in queryNewChannels: %v",
|
||||
g.cfg.peerPub[:], err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// If this wasn't our last query, then we'll need to
|
||||
// transition to our waiting state.
|
||||
@@ -819,8 +843,10 @@ func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
|
||||
// range. This method will be called continually until the entire range has
|
||||
// been queried for with a response received. We'll chunk our requests as
|
||||
// required to ensure they fit into a single message. We may re-renter this
|
||||
// state in the case that chunking is required.
|
||||
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
|
||||
// state in the case that chunking is required. Returns true if synchronization
|
||||
// is complete, and an error if a fatal error occurs that should cause the
|
||||
// goroutine to exit.
|
||||
func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) (bool, error) {
|
||||
// If we're in this state yet there are no more new channels to query
|
||||
// for, then we'll transition to our final synced state and return true
|
||||
// to signal that we're fully synchronized.
|
||||
@@ -828,7 +854,7 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
|
||||
log.Infof("GossipSyncer(%x): no more chans to query",
|
||||
g.cfg.peerPub[:])
|
||||
|
||||
return true
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Otherwise, we'll issue our next chunked query to receive replies
|
||||
@@ -864,9 +890,14 @@ func (g *GossipSyncer) synchronizeChanIDs(ctx context.Context) bool {
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Unable to sync chan IDs: %v", err)
|
||||
|
||||
// Any send error (peer exiting, connection closed, rate
|
||||
// limiter signaling exit, etc.) is fatal, so return it to
|
||||
// exit the goroutine cleanly.
|
||||
return false, err
|
||||
}
|
||||
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
graphdb "github.com/lightningnetwork/lnd/graph/db"
|
||||
"github.com/lightningnetwork/lnd/lnpeer"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -229,9 +230,111 @@ func newTestSyncer(hID lnwire.ShortChannelID,
|
||||
|
||||
syncer := newGossipSyncer(cfg, syncerSema)
|
||||
|
||||
//nolint:forcetypeassert
|
||||
return msgChan, syncer, cfg.channelSeries.(*mockChannelGraphTimeSeries)
|
||||
}
|
||||
|
||||
// errorInjector provides thread-safe error injection for test syncers and
|
||||
// tracks the number of send attempts to detect endless loops.
|
||||
type errorInjector struct {
|
||||
mu sync.Mutex
|
||||
err error
|
||||
attemptCount int
|
||||
}
|
||||
|
||||
// setError sets the error that will be returned by sendMsg calls.
|
||||
func (ei *errorInjector) setError(err error) {
|
||||
ei.mu.Lock()
|
||||
defer ei.mu.Unlock()
|
||||
ei.err = err
|
||||
}
|
||||
|
||||
// getError retrieves the current error in a thread-safe manner and increments
|
||||
// the attempt counter.
|
||||
func (ei *errorInjector) getError() error {
|
||||
ei.mu.Lock()
|
||||
defer ei.mu.Unlock()
|
||||
ei.attemptCount++
|
||||
|
||||
return ei.err
|
||||
}
|
||||
|
||||
// getAttemptCount returns the number of times sendMsg was called.
|
||||
func (ei *errorInjector) getAttemptCount() int {
|
||||
ei.mu.Lock()
|
||||
defer ei.mu.Unlock()
|
||||
return ei.attemptCount
|
||||
}
|
||||
|
||||
// newErrorInjectingSyncer creates a GossipSyncer with controllable error
|
||||
// injection for testing error handling. The returned errorInjector can be used
|
||||
// to inject errors into sendMsg calls.
|
||||
func newErrorInjectingSyncer(hID lnwire.ShortChannelID, chunkSize int32) (
|
||||
*GossipSyncer, *errorInjector, chan []lnwire.Message) {
|
||||
|
||||
ei := &errorInjector{}
|
||||
msgChan := make(chan []lnwire.Message, 20)
|
||||
|
||||
cfg := gossipSyncerCfg{
|
||||
channelSeries: newMockChannelGraphTimeSeries(hID),
|
||||
encodingType: defaultEncoding,
|
||||
chunkSize: chunkSize,
|
||||
batchSize: chunkSize,
|
||||
noSyncChannels: false,
|
||||
noReplyQueries: true,
|
||||
noTimestampQueryOption: false,
|
||||
sendMsg: func(_ context.Context, _ bool,
|
||||
msgs ...lnwire.Message) error {
|
||||
|
||||
// Check if we should inject an error.
|
||||
if err := ei.getError(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgChan <- msgs
|
||||
return nil
|
||||
},
|
||||
bestHeight: func() uint32 {
|
||||
return latestKnownHeight
|
||||
},
|
||||
markGraphSynced: func() {},
|
||||
maxQueryChanRangeReplies: maxQueryChanRangeReplies,
|
||||
timestampQueueSize: 10,
|
||||
}
|
||||
|
||||
syncerSema := make(chan struct{}, 1)
|
||||
syncerSema <- struct{}{}
|
||||
|
||||
syncer := newGossipSyncer(cfg, syncerSema)
|
||||
|
||||
return syncer, ei, msgChan
|
||||
}
|
||||
|
||||
// assertSyncerExitsCleanly verifies that a syncer stops cleanly within the
|
||||
// given timeout. This is used to ensure error handling doesn't cause endless
|
||||
// loops.
|
||||
func assertSyncerExitsCleanly(t *testing.T, syncer *GossipSyncer,
|
||||
timeout time.Duration) {
|
||||
|
||||
t.Helper()
|
||||
|
||||
stopChan := make(chan struct{})
|
||||
go func() {
|
||||
syncer.Stop()
|
||||
close(stopChan)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopChan:
|
||||
// Success - syncer stopped cleanly.
|
||||
case <-time.After(timeout):
|
||||
t.Fatal(
|
||||
"syncer did not stop within timeout - possible " +
|
||||
"endless loop",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGossipSyncerFilterGossipMsgsNoHorizon tests that if the remote peer
|
||||
// doesn't have a horizon set, then we won't send any incoming messages to it.
|
||||
func TestGossipSyncerFilterGossipMsgsNoHorizon(t *testing.T) {
|
||||
@@ -1518,7 +1621,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
||||
|
||||
for i := 0; i < chunkSize*2; i += 2 {
|
||||
// With our set up complete, we'll request a sync of chan ID's.
|
||||
done := syncer.synchronizeChanIDs(t.Context())
|
||||
done, err := syncer.synchronizeChanIDs(t.Context())
|
||||
require.NoError(t, err)
|
||||
|
||||
// At this point, we shouldn't yet be done as only 2 items
|
||||
// should have been queried for.
|
||||
@@ -1565,7 +1669,8 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
|
||||
}
|
||||
|
||||
// If we issue another query, the syncer should tell us that it's done.
|
||||
done := syncer.synchronizeChanIDs(t.Context())
|
||||
done, err := syncer.synchronizeChanIDs(t.Context())
|
||||
require.NoError(t, err)
|
||||
if done {
|
||||
t.Fatalf("syncer should be finished!")
|
||||
}
|
||||
@@ -2409,3 +2514,107 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
|
||||
},
|
||||
}, nil))
|
||||
}
|
||||
|
||||
// TestGossipSyncerStateHandlerErrors tests that errors in state handlers cause
|
||||
// the channelGraphSyncer goroutine to exit cleanly without endless retry loops.
|
||||
// This is a table-driven test covering various error types and states.
|
||||
func TestGossipSyncerStateHandlerErrors(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
state syncerState
|
||||
setupState func(*GossipSyncer)
|
||||
chunkSize int32
|
||||
injectedErr error
|
||||
}{
|
||||
{
|
||||
name: "context cancel during syncingChans",
|
||||
state: syncingChans,
|
||||
chunkSize: defaultChunkSize,
|
||||
injectedErr: context.Canceled,
|
||||
setupState: func(s *GossipSyncer) {},
|
||||
},
|
||||
{
|
||||
name: "peer exit during syncingChans",
|
||||
state: syncingChans,
|
||||
chunkSize: defaultChunkSize,
|
||||
injectedErr: lnpeer.ErrPeerExiting,
|
||||
setupState: func(s *GossipSyncer) {},
|
||||
},
|
||||
{
|
||||
name: "context cancel during queryNewChannels",
|
||||
state: queryNewChannels,
|
||||
chunkSize: 2,
|
||||
injectedErr: context.Canceled,
|
||||
setupState: func(s *GossipSyncer) {
|
||||
s.newChansToQuery = []lnwire.ShortChannelID{
|
||||
lnwire.NewShortChanIDFromInt(1),
|
||||
lnwire.NewShortChanIDFromInt(2),
|
||||
lnwire.NewShortChanIDFromInt(3),
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "network error during queryNewChannels",
|
||||
state: queryNewChannels,
|
||||
chunkSize: 2,
|
||||
injectedErr: errors.New("connection closed"),
|
||||
setupState: func(s *GossipSyncer) {
|
||||
s.newChansToQuery = []lnwire.ShortChannelID{
|
||||
lnwire.NewShortChanIDFromInt(1),
|
||||
lnwire.NewShortChanIDFromInt(2),
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create syncer with error injection capability.
|
||||
hID := lnwire.NewShortChanIDFromInt(10)
|
||||
syncer, errInj, _ := newErrorInjectingSyncer(
|
||||
hID, tt.chunkSize,
|
||||
)
|
||||
|
||||
// Set up the initial state and any required state data.
|
||||
syncer.setSyncState(tt.state)
|
||||
tt.setupState(syncer)
|
||||
|
||||
// Inject the error that should cause the goroutine to
|
||||
// exit.
|
||||
errInj.setError(tt.injectedErr)
|
||||
|
||||
// Start the syncer which spawns the channelGraphSyncer
|
||||
// goroutine.
|
||||
syncer.Start()
|
||||
|
||||
// Wait long enough that an endless loop would
|
||||
// accumulate many attempts. With the fix, we should
|
||||
// only see 1-3 attempts. Without the fix, we'd see
|
||||
// 50-100+ attempts.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Check how many send attempts were made. This verifies
|
||||
// that the state handler doesn't loop endlessly.
|
||||
attemptCount := errInj.getAttemptCount()
|
||||
require.GreaterOrEqual(
|
||||
t, attemptCount, 1,
|
||||
"state handler was not called - test "+
|
||||
"setup issue",
|
||||
)
|
||||
require.LessOrEqual(
|
||||
t, attemptCount, 5,
|
||||
"too many attempts (%d) - endless loop "+
|
||||
"not fixed",
|
||||
attemptCount,
|
||||
)
|
||||
|
||||
// Verify the syncer exits cleanly without hanging.
|
||||
assertSyncerExitsCleanly(t, syncer, 2*time.Second)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,9 @@
|
||||
utxonursery (the legacy sweeper) where htlcs with a locktime of 0 would not
|
||||
be swept.
|
||||
|
||||
- [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/10330) to ensure that goroutine resources are properly freed in the case
|
||||
of a disconnection or other failure event.
|
||||
|
||||
# New Features
|
||||
|
||||
* Use persisted [nodeannouncement](https://github.com/lightningnetwork/lnd/pull/8825)
|
||||
|
||||
Reference in New Issue
Block a user