diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index 7c6cc3095..78e1a8033 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -47,6 +47,13 @@ type ActiveLinkEvent struct { ChannelPoint *wire.OutPoint } +// InactiveLinkEvent represents a new event where the link becomes inactive in +// the switch. +type InactiveLinkEvent struct { + // ChannelPoint is the channel point for the inactive channel. + ChannelPoint *wire.OutPoint +} + // ActiveChannelEvent represents a new event where a channel becomes active. type ActiveChannelEvent struct { // ChannelPoint is the channelpoint for the newly active channel. @@ -193,6 +200,15 @@ func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) { } } +// NotifyInactiveLinkEvent notifies the channelEventNotifier goroutine that a +// link has been removed from the switch. +func (c *ChannelNotifier) NotifyInactiveLinkEvent(chanPoint wire.OutPoint) { + event := InactiveLinkEvent{ChannelPoint: &chanPoint} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send inactive link update: %v", err) + } +} + // NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a // channel is inactive. func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) { diff --git a/docs/release-notes/release-notes-0.16.0.md b/docs/release-notes/release-notes-0.16.0.md index 1146051de..d62fd0977 100644 --- a/docs/release-notes/release-notes-0.16.0.md +++ b/docs/release-notes/release-notes-0.16.0.md @@ -269,6 +269,10 @@ data. * [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/7186) that might lead to channel updates being missed, causing channel graph being incomplete. +* During reconnection, enabling channels might be failed due to the startup of + link is falling behind, which is now fixed by [retrying the enable + request](https://github.com/lightningnetwork/lnd/pull/7157). + ## Code Health * [test: use `T.TempDir` to create temporary test diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 65bdd9977..88476f33e 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -293,6 +293,10 @@ type ChannelLinkConfig struct { // when channels become inactive. NotifyInactiveChannel func(wire.OutPoint) + // NotifyInactiveLinkEvent allows the switch to tell the + // ChannelNotifier when a channel link become inactive. + NotifyInactiveLinkEvent func(wire.OutPoint) + // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc // events through. HtlcNotifier htlcNotifier @@ -979,8 +983,11 @@ func (l *channelLink) htlcManager() { l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth()) // Notify any clients that the link is now in the switch via an - // ActiveLinkEvent. + // ActiveLinkEvent. We'll also defer an inactive link notification for + // when the link exits to ensure that every active notification is + // matched by an inactive one. l.cfg.NotifyActiveLink(*l.ChannelPoint()) + defer l.cfg.NotifyInactiveLinkEvent(*l.ChannelPoint()) // TODO(roasbeef): need to call wipe chan whenever D/C? diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index d0aac8551..55d77d13d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1886,16 +1886,17 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount) PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. - BatchSize: 10000, - MinFeeUpdateTimeout: 30 * time.Minute, - MaxFeeUpdateTimeout: 40 * time.Minute, - MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, - MaxFeeAllocation: DefaultMaxLinkFeeAllocation, - NotifyActiveLink: func(wire.OutPoint) {}, - NotifyActiveChannel: func(wire.OutPoint) {}, - NotifyInactiveChannel: func(wire.OutPoint) {}, - HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, - GetAliases: getAliases, + BatchSize: 10000, + MinFeeUpdateTimeout: 30 * time.Minute, + MaxFeeUpdateTimeout: 40 * time.Minute, + MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, + MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, + NotifyActiveChannel: func(wire.OutPoint) {}, + NotifyInactiveChannel: func(wire.OutPoint) {}, + NotifyInactiveLinkEvent: func(wire.OutPoint) {}, + HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, + GetAliases: getAliases, } aliceLink := NewChannelLink(aliceCfg, aliceLc.channel) @@ -4329,15 +4330,16 @@ func (h *persistentLinkHarness) restartLink( MinFeeUpdateTimeout: 30 * time.Minute, MaxFeeUpdateTimeout: 40 * time.Minute, // Set any hodl flags requested for the new link. - HodlMask: hodl.MaskFromFlags(hodlFlags...), - MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, - MaxFeeAllocation: DefaultMaxLinkFeeAllocation, - NotifyActiveLink: func(wire.OutPoint) {}, - NotifyActiveChannel: func(wire.OutPoint) {}, - NotifyInactiveChannel: func(wire.OutPoint) {}, - HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, - SyncStates: syncStates, - GetAliases: getAliases, + HodlMask: hodl.MaskFromFlags(hodlFlags...), + MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, + MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, + NotifyActiveChannel: func(wire.OutPoint) {}, + NotifyInactiveChannel: func(wire.OutPoint) {}, + NotifyInactiveLinkEvent: func(wire.OutPoint) {}, + HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, + SyncStates: syncStates, + GetAliases: getAliases, } aliceLink := NewChannelLink(aliceCfg, aliceChannel) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 460bf31b5..6f632be35 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -2293,6 +2293,8 @@ func (s *Switch) AddLink(link ChannelLink) error { link.attachFailAliasUpdate(s.failAliasUpdate) if err := link.Start(); err != nil { + log.Errorf("AddLink failed to start link with chanID=%v: %v", + chanID, err) s.removeLink(chanID) return err } diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 6e8defe3f..e1bd6783a 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1159,6 +1159,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, NotifyActiveLink: func(wire.OutPoint) {}, NotifyActiveChannel: func(wire.OutPoint) {}, NotifyInactiveChannel: func(wire.OutPoint) {}, + NotifyInactiveLinkEvent: func(wire.OutPoint) {}, HtlcNotifier: server.htlcSwitch.cfg.HtlcNotifier, GetAliases: getAliases, }, diff --git a/peer/brontide.go b/peer/brontide.go index 7ac65aaf8..4c56626ea 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -39,6 +39,7 @@ import ( "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/queue" + "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/watchtower/wtclient" ) @@ -461,6 +462,15 @@ type Brontide struct { // peer's chansync message with its own over and over again. resentChanSyncMsg map[lnwire.ChannelID]struct{} + // channelEventClient is the channel event subscription client that's + // used to assist retry enabling the channels. This client is only + // created when the reenableTimeout is no greater than 1 minute. Once + // created, it is canceled once the reenabling has been finished. + // + // NOTE: we choose to create the client conditionally to avoid + // potentially holding lots of un-consumed events. + channelEventClient *subscribe.Client + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -592,6 +602,17 @@ func (p *Brontide) Start() error { p.log.Debugf("Loaded %v active channels from database", len(activeChans)) + // Conditionally subscribe to channel events before loading channels so + // we won't miss events. This subscription is used to listen to active + // channel event when reenabling channels. Once the reenabling process + // is finished, this subscription will be canceled. + // + // NOTE: ChannelNotifier must be started before subscribing events + // otherwise we'd panic here. + if err := p.attachChannelEventSubscription(); err != nil { + return err + } + msgs, err := p.loadActiveChannels(activeChans) if err != nil { return fmt.Errorf("unable to load channels: %v", err) @@ -941,6 +962,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, towerClient = p.cfg.TowerClient } + //nolint:lll linkCfg := htlcswitch.ChannelLinkConfig{ Peer: p, DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators, @@ -976,6 +998,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, NotifyActiveLink: p.cfg.ChannelNotifier.NotifyActiveLinkEvent, NotifyActiveChannel: p.cfg.ChannelNotifier.NotifyActiveChannelEvent, NotifyInactiveChannel: p.cfg.ChannelNotifier.NotifyInactiveChannelEvent, + NotifyInactiveLinkEvent: p.cfg.ChannelNotifier.NotifyInactiveLinkEvent, HtlcNotifier: p.cfg.HtlcNotifier, GetAliases: p.cfg.GetAliases, } @@ -2480,6 +2503,16 @@ out: // select will ignore this case entirely. reenableTimeout = nil + // Once the reenabling is attempted, we also cancel the + // channel event subscription to free up the overflow + // queue used in channel notifier. + // + // NOTE: channelEventClient will be nil if the + // reenableTimeout is greater than 1 minute. + if p.channelEventClient != nil { + p.channelEventClient.Cancel() + } + case <-p.quit: // As, we've been signalled to exit, we'll reset all // our active channel back to their default state. @@ -2506,48 +2539,62 @@ out: func (p *Brontide) reenableActiveChannels() { // First, filter all known channels with this peer for ones that are // both public and not pending. - var activePublicChans []wire.OutPoint - p.activeChanMtx.RLock() - for chanID, lnChan := range p.activeChannels { - // If the lnChan is nil, continue as this is a pending channel. - if lnChan == nil { - continue - } + activePublicChans := p.filterChannelsToEnable() - dbChan := lnChan.State() - isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 - if !isPublic || dbChan.IsPending { - continue - } - - // We'll also skip any channels added during this peer's - // lifecycle since they haven't waited out the timeout. Their - // first announcement will be enabled, and the chan status - // manager will begin monitoring them passively since they exist - // in the database. - if _, ok := p.addedChannels[chanID]; ok { - continue - } - - activePublicChans = append( - activePublicChans, dbChan.FundingOutpoint, - ) - } - p.activeChanMtx.RUnlock() + // Create a map to hold channels that needs to be retried. + retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans)) // For each of the public, non-pending channels, set the channel // disabled bit to false and send out a new ChannelUpdate. If this // channel is already active, the update won't be sent. for _, chanPoint := range activePublicChans { err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false) - if err == netann.ErrEnableManuallyDisabledChan { - p.log.Debugf("Channel(%v) was manually disabled, ignoring "+ - "automatic enable request", chanPoint) - } else if err != nil { - p.log.Errorf("Unable to enable channel %v: %v", - chanPoint, err) + + switch { + // No error occurred, continue to request the next channel. + case err == nil: + continue + + // Cannot auto enable a manually disabled channel so we do + // nothing but proceed to the next channel. + case errors.Is(err, netann.ErrEnableManuallyDisabledChan): + p.log.Debugf("Channel(%v) was manually disabled, "+ + "ignoring automatic enable request", chanPoint) + + continue + + // If the channel is reported as inactive, we will give it + // another chance. When handling the request, ChanStatusManager + // will check whether the link is active or not. One of the + // conditions is whether the link has been marked as + // reestablished, which happens inside a goroutine(htlcManager) + // after the link is started. And we may get a false negative + // saying the link is not active because that goroutine hasn't + // reached the line to mark the reestablishment. Thus we give + // it a second chance to send the request. + case errors.Is(err, netann.ErrEnableInactiveChan): + // If we don't have a client created, it means we + // shouldn't retry enabling the channel. + if p.channelEventClient == nil { + p.log.Errorf("Channel(%v) request enabling "+ + "failed due to inactive link", + chanPoint) + + continue + } + + p.log.Warnf("Channel(%v) cannot be enabled as " + + "ChanStatusManager reported inactive, retrying") + + // Add the channel to the retry map. + retryChans[chanPoint] = struct{}{} } } + + // Retry the channels if we have any. + if len(retryChans) != 0 { + p.retryRequestEnable(retryChans) + } } // fetchActiveChanCloser attempts to fetch the active chan closer state machine @@ -2626,6 +2673,136 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( return chanCloser, nil } +// filterChannelsToEnable filters a list of channels to be enabled upon start. +// The filtered channels are active channels that's neither private nor +// pending. +func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { + var activePublicChans []wire.OutPoint + + p.activeChanMtx.RLock() + defer p.activeChanMtx.RUnlock() + + for chanID, lnChan := range p.activeChannels { + // If the lnChan is nil, continue as this is a pending channel. + if lnChan == nil { + continue + } + + dbChan := lnChan.State() + isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 + if !isPublic || dbChan.IsPending { + continue + } + + // We'll also skip any channels added during this peer's + // lifecycle since they haven't waited out the timeout. Their + // first announcement will be enabled, and the chan status + // manager will begin monitoring them passively since they exist + // in the database. + if _, ok := p.addedChannels[chanID]; ok { + continue + } + + activePublicChans = append( + activePublicChans, dbChan.FundingOutpoint, + ) + } + + return activePublicChans +} + +// retryRequestEnable takes a map of channel outpoints and a channel event +// client. It listens to the channel events and removes a channel from the map +// if it's matched to the event. Upon receiving an active channel event, it +// will send the enabling request again. +func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) { + p.log.Debugf("Retry enabling %v channels", len(activeChans)) + + // retryEnable is a helper closure that sends an enable request and + // removes the channel from the map if it's matched. + retryEnable := func(chanPoint wire.OutPoint) error { + // If this is an active channel event, check whether it's in + // our targeted channels map. + _, found := activeChans[chanPoint] + + // If this channel is irrelevant, return nil so the loop can + // jump to next iteration. + if !found { + return nil + } + + // Otherwise we've just received an active signal for a channel + // that's previously failed to be enabled, we send the request + // again. + // + // We only give the channel one more shot, so we delete it from + // our map first to keep it from being attempted again. + delete(activeChans, chanPoint) + + // Send the request. + err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false) + if err != nil { + return fmt.Errorf("request enabling channel %v "+ + "failed: %w", chanPoint, err) + } + + return nil + } + + for { + // If activeChans is empty, we've done processing all the + // channels. + if len(activeChans) == 0 { + p.log.Debug("Finished retry enabling channels") + return + } + + select { + // A new event has been sent by the ChannelNotifier. We now + // check whether it's an active or inactive channel event. + case e := <-p.channelEventClient.Updates(): + // If this is an active channel event, try enable the + // channel then jump to the next iteration. + active, ok := e.(channelnotifier.ActiveChannelEvent) + if ok { + chanPoint := *active.ChannelPoint + + // If we received an error for this particular + // channel, we log an error and won't quit as + // we still want to retry other channels. + if err := retryEnable(chanPoint); err != nil { + p.log.Errorf("Retry failed: %v", err) + } + + continue + } + + // Otherwise check for inactive link event, and jump to + // next iteration if it's not. + inactive, ok := e.(channelnotifier.InactiveLinkEvent) + if !ok { + continue + } + + // Found an inactive link event, if this is our + // targeted channel, remove it from our map. + chanPoint := *inactive.ChannelPoint + _, found := activeChans[chanPoint] + if !found { + continue + } + + delete(activeChans, chanPoint) + p.log.Warnf("Re-enable channel %v failed, received "+ + "inactive link event", chanPoint) + + case <-p.quit: + p.log.Debugf("Peer shutdown during retry enabling") + return + } + } +} + // chooseDeliveryScript takes two optionally set shutdown scripts and returns // a suitable script to close out to. This may be nil if neither script is // set. If both scripts are set, this function will error if they do not match. @@ -3524,3 +3701,29 @@ func (p *Brontide) LastRemotePingPayload() []byte { return pingBytes } + +// attachChannelEventSubscription creates a channel event subscription and +// attaches to client to Brontide if the reenableTimeout is no greater than 1 +// minute. +func (p *Brontide) attachChannelEventSubscription() error { + // If the timeout is greater than 1 minute, it's unlikely that the link + // hasn't yet finished its reestablishment. Return a nil without + // creating the client to specify that we don't want to retry. + if p.cfg.ChanActiveTimeout > 1*time.Minute { + return nil + } + + // When the reenable timeout is less than 1 minute, it's likely the + // channel link hasn't finished its reestablishment yet. In that case, + // we'll give it a second chance by subscribing to the channel update + // events. Upon receiving the `ActiveLinkEvent`, we'll then request + // enabling the channel again. + sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() + if err != nil { + return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + } + + p.channelEventClient = sub + + return nil +} diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 44e63267b..cf71e0b92 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lntest/mock" @@ -1035,6 +1036,14 @@ func TestPeerCustomMessage(t *testing.T) { ConfChan: make(chan *chainntnfs.TxConfirmation), } + // TODO(yy): change ChannelNotifier to be an interface. + channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB()) + require.NoError(t, channelNotifier.Start()) + t.Cleanup(func() { + require.NoError(t, channelNotifier.Stop(), + "stop channel notifier failed") + }) + alicePeer := NewBrontide(Config{ PubKeyBytes: remoteKey, ChannelDB: dbAlice.ChannelStateDB(), @@ -1057,7 +1066,8 @@ func TestPeerCustomMessage(t *testing.T) { } return nil }, - PongBuf: make([]byte, lnwire.MaxPongBytes), + PongBuf: make([]byte, lnwire.MaxPongBytes), + ChannelNotifier: channelNotifier, }) // Set up the init sequence. diff --git a/peer/test_utils.go b/peer/test_utils.go index 41f6d2c0f..a997411df 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" @@ -377,6 +378,14 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, return nil, nil, err } + // TODO(yy): change ChannelNotifier to be an interface. + channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB()) + require.NoError(t, channelNotifier.Start()) + t.Cleanup(func() { + require.NoError(t, channelNotifier.Stop(), + "stop channel notifier failed") + }) + cfg := &Config{ Addr: cfgAddr, PubKeyBytes: pubKey, @@ -392,6 +401,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, ChanStatusMgr: chanStatusMgr, Features: lnwire.NewFeatureVector(nil, lnwire.Features), DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, + ChannelNotifier: channelNotifier, } alicePeer := NewBrontide(*cfg) diff --git a/rpcserver.go b/rpcserver.go index a5bd31686..7e4ded225 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4624,9 +4624,12 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, }, } - // Completely ignore ActiveLinkEvent as this is explicitly not - // exposed to the RPC. - case channelnotifier.ActiveLinkEvent: + // Completely ignore ActiveLinkEvent and + // InactiveLinkEvent as this is explicitly not exposed + // to the RPC. + case channelnotifier.ActiveLinkEvent, + channelnotifier.InactiveLinkEvent: + continue case channelnotifier.FullyResolvedChannelEvent: @@ -7323,6 +7326,8 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription continue case channelnotifier.ActiveLinkEvent: continue + case channelnotifier.InactiveLinkEvent: + continue } // Now that we know the channel state has changed,