From ced883389508b7601c37e5957f546dfb59e8bfbf Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 24 Nov 2022 06:57:53 +0800 Subject: [PATCH 1/6] channelnotifier: add `InactiveLinkEvent` This commit adds a new event `InactiveLinkEvent` to be used when a link becomes inactive. --- channelnotifier/channelnotifier.go | 16 ++++++++++++++++ rpcserver.go | 11 ++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index 7c6cc3095..78e1a8033 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -47,6 +47,13 @@ type ActiveLinkEvent struct { ChannelPoint *wire.OutPoint } +// InactiveLinkEvent represents a new event where the link becomes inactive in +// the switch. +type InactiveLinkEvent struct { + // ChannelPoint is the channel point for the inactive channel. + ChannelPoint *wire.OutPoint +} + // ActiveChannelEvent represents a new event where a channel becomes active. type ActiveChannelEvent struct { // ChannelPoint is the channelpoint for the newly active channel. @@ -193,6 +200,15 @@ func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) { } } +// NotifyInactiveLinkEvent notifies the channelEventNotifier goroutine that a +// link has been removed from the switch. +func (c *ChannelNotifier) NotifyInactiveLinkEvent(chanPoint wire.OutPoint) { + event := InactiveLinkEvent{ChannelPoint: &chanPoint} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send inactive link update: %v", err) + } +} + // NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a // channel is inactive. func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) { diff --git a/rpcserver.go b/rpcserver.go index a5bd31686..7e4ded225 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4624,9 +4624,12 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, }, } - // Completely ignore ActiveLinkEvent as this is explicitly not - // exposed to the RPC. - case channelnotifier.ActiveLinkEvent: + // Completely ignore ActiveLinkEvent and + // InactiveLinkEvent as this is explicitly not exposed + // to the RPC. + case channelnotifier.ActiveLinkEvent, + channelnotifier.InactiveLinkEvent: + continue case channelnotifier.FullyResolvedChannelEvent: @@ -7323,6 +7326,8 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription continue case channelnotifier.ActiveLinkEvent: continue + case channelnotifier.InactiveLinkEvent: + continue } // Now that we know the channel state has changed, From f632a58b3b37a12e88ac099f0894e15396dc2954 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 24 Nov 2022 06:58:33 +0800 Subject: [PATCH 2/6] htlcswitch+peer: notify inactive link event when `htlcManager` exits --- htlcswitch/link.go | 9 ++++++++- htlcswitch/link_test.go | 40 +++++++++++++++++++++------------------- htlcswitch/switch.go | 2 ++ htlcswitch/test_utils.go | 1 + peer/brontide.go | 2 ++ 5 files changed, 34 insertions(+), 20 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 65bdd9977..88476f33e 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -293,6 +293,10 @@ type ChannelLinkConfig struct { // when channels become inactive. NotifyInactiveChannel func(wire.OutPoint) + // NotifyInactiveLinkEvent allows the switch to tell the + // ChannelNotifier when a channel link become inactive. + NotifyInactiveLinkEvent func(wire.OutPoint) + // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc // events through. HtlcNotifier htlcNotifier @@ -979,8 +983,11 @@ func (l *channelLink) htlcManager() { l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth()) // Notify any clients that the link is now in the switch via an - // ActiveLinkEvent. + // ActiveLinkEvent. We'll also defer an inactive link notification for + // when the link exits to ensure that every active notification is + // matched by an inactive one. l.cfg.NotifyActiveLink(*l.ChannelPoint()) + defer l.cfg.NotifyInactiveLinkEvent(*l.ChannelPoint()) // TODO(roasbeef): need to call wipe chan whenever D/C? diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index d0aac8551..55d77d13d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1886,16 +1886,17 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount) PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. - BatchSize: 10000, - MinFeeUpdateTimeout: 30 * time.Minute, - MaxFeeUpdateTimeout: 40 * time.Minute, - MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, - MaxFeeAllocation: DefaultMaxLinkFeeAllocation, - NotifyActiveLink: func(wire.OutPoint) {}, - NotifyActiveChannel: func(wire.OutPoint) {}, - NotifyInactiveChannel: func(wire.OutPoint) {}, - HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, - GetAliases: getAliases, + BatchSize: 10000, + MinFeeUpdateTimeout: 30 * time.Minute, + MaxFeeUpdateTimeout: 40 * time.Minute, + MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, + MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, + NotifyActiveChannel: func(wire.OutPoint) {}, + NotifyInactiveChannel: func(wire.OutPoint) {}, + NotifyInactiveLinkEvent: func(wire.OutPoint) {}, + HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, + GetAliases: getAliases, } aliceLink := NewChannelLink(aliceCfg, aliceLc.channel) @@ -4329,15 +4330,16 @@ func (h *persistentLinkHarness) restartLink( MinFeeUpdateTimeout: 30 * time.Minute, MaxFeeUpdateTimeout: 40 * time.Minute, // Set any hodl flags requested for the new link. - HodlMask: hodl.MaskFromFlags(hodlFlags...), - MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, - MaxFeeAllocation: DefaultMaxLinkFeeAllocation, - NotifyActiveLink: func(wire.OutPoint) {}, - NotifyActiveChannel: func(wire.OutPoint) {}, - NotifyInactiveChannel: func(wire.OutPoint) {}, - HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, - SyncStates: syncStates, - GetAliases: getAliases, + HodlMask: hodl.MaskFromFlags(hodlFlags...), + MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, + MaxFeeAllocation: DefaultMaxLinkFeeAllocation, + NotifyActiveLink: func(wire.OutPoint) {}, + NotifyActiveChannel: func(wire.OutPoint) {}, + NotifyInactiveChannel: func(wire.OutPoint) {}, + NotifyInactiveLinkEvent: func(wire.OutPoint) {}, + HtlcNotifier: aliceSwitch.cfg.HtlcNotifier, + SyncStates: syncStates, + GetAliases: getAliases, } aliceLink := NewChannelLink(aliceCfg, aliceChannel) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 460bf31b5..6f632be35 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -2293,6 +2293,8 @@ func (s *Switch) AddLink(link ChannelLink) error { link.attachFailAliasUpdate(s.failAliasUpdate) if err := link.Start(); err != nil { + log.Errorf("AddLink failed to start link with chanID=%v: %v", + chanID, err) s.removeLink(chanID) return err } diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 6e8defe3f..e1bd6783a 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1159,6 +1159,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, NotifyActiveLink: func(wire.OutPoint) {}, NotifyActiveChannel: func(wire.OutPoint) {}, NotifyInactiveChannel: func(wire.OutPoint) {}, + NotifyInactiveLinkEvent: func(wire.OutPoint) {}, HtlcNotifier: server.htlcSwitch.cfg.HtlcNotifier, GetAliases: getAliases, }, diff --git a/peer/brontide.go b/peer/brontide.go index 7ac65aaf8..6254c44d3 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -941,6 +941,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, towerClient = p.cfg.TowerClient } + //nolint:lll linkCfg := htlcswitch.ChannelLinkConfig{ Peer: p, DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators, @@ -976,6 +977,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, NotifyActiveLink: p.cfg.ChannelNotifier.NotifyActiveLinkEvent, NotifyActiveChannel: p.cfg.ChannelNotifier.NotifyActiveChannelEvent, NotifyInactiveChannel: p.cfg.ChannelNotifier.NotifyInactiveChannelEvent, + NotifyInactiveLinkEvent: p.cfg.ChannelNotifier.NotifyInactiveLinkEvent, HtlcNotifier: p.cfg.HtlcNotifier, GetAliases: p.cfg.GetAliases, } From a81d4e9c5c2d0f4363e99d726132f632b69d8107 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 23 Nov 2022 12:50:20 +0800 Subject: [PATCH 3/6] peer: add `filterChannelsToEnable` to filter channels for enabling --- peer/brontide.go | 67 ++++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 6254c44d3..38c60e357 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -2508,34 +2508,7 @@ out: func (p *Brontide) reenableActiveChannels() { // First, filter all known channels with this peer for ones that are // both public and not pending. - var activePublicChans []wire.OutPoint - p.activeChanMtx.RLock() - for chanID, lnChan := range p.activeChannels { - // If the lnChan is nil, continue as this is a pending channel. - if lnChan == nil { - continue - } - - dbChan := lnChan.State() - isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 - if !isPublic || dbChan.IsPending { - continue - } - - // We'll also skip any channels added during this peer's - // lifecycle since they haven't waited out the timeout. Their - // first announcement will be enabled, and the chan status - // manager will begin monitoring them passively since they exist - // in the database. - if _, ok := p.addedChannels[chanID]; ok { - continue - } - - activePublicChans = append( - activePublicChans, dbChan.FundingOutpoint, - ) - } - p.activeChanMtx.RUnlock() + activePublicChans := p.filterChannelsToEnable() // For each of the public, non-pending channels, set the channel // disabled bit to false and send out a new ChannelUpdate. If this @@ -2628,6 +2601,44 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( return chanCloser, nil } +// filterChannelsToEnable filters a list of channels to be enabled upon start. +// The filtered channels are active channels that's neither private nor +// pending. +func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { + var activePublicChans []wire.OutPoint + + p.activeChanMtx.RLock() + defer p.activeChanMtx.RUnlock() + + for chanID, lnChan := range p.activeChannels { + // If the lnChan is nil, continue as this is a pending channel. + if lnChan == nil { + continue + } + + dbChan := lnChan.State() + isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 + if !isPublic || dbChan.IsPending { + continue + } + + // We'll also skip any channels added during this peer's + // lifecycle since they haven't waited out the timeout. Their + // first announcement will be enabled, and the chan status + // manager will begin monitoring them passively since they exist + // in the database. + if _, ok := p.addedChannels[chanID]; ok { + continue + } + + activePublicChans = append( + activePublicChans, dbChan.FundingOutpoint, + ) + } + + return activePublicChans +} + // chooseDeliveryScript takes two optionally set shutdown scripts and returns // a suitable script to close out to. This may be nil if neither script is // set. If both scripts are set, this function will error if they do not match. From 2f9ba1594ce700d8825accff3e67d7cb93846848 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 23 Nov 2022 13:23:18 +0800 Subject: [PATCH 4/6] peer: retry enabling channels on `ErrEnableInactiveChan` This commit adds a retry logic to the channels that failed with `ErrEnableInactiveChan` when requesting enabling. We now subscribe the channel events to decide what to do with the failed channels. --- peer/brontide.go | 163 +++++++++++++++++++++++++++++++++++++++--- peer/brontide_test.go | 12 +++- peer/test_utils.go | 15 +++- 3 files changed, 178 insertions(+), 12 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 38c60e357..aeae14144 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -39,6 +39,7 @@ import ( "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/queue" + "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/watchtower/wtclient" ) @@ -592,6 +593,18 @@ func (p *Brontide) Start() error { p.log.Debugf("Loaded %v active channels from database", len(activeChans)) + // Subscribe channel events before loading channels so we won't miss + // events. This subscription is used to listen to active channel event + // when reenabling channels. Once the reenabling process is finished, + // this subscription will be canceled. + // + // NOTE: ChannelNotifier must be started before subscribing events + // otherwise we'd panic here. + sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() + if err != nil { + return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + } + msgs, err := p.loadActiveChannels(activeChans) if err != nil { return fmt.Errorf("unable to load channels: %v", err) @@ -603,7 +616,7 @@ func (p *Brontide) Start() error { go p.queueHandler() go p.writeHandler() go p.readHandler() - go p.channelManager() + go p.channelManager(sub) go p.pingHandler() // Signal to any external processes that the peer is now active. @@ -2311,7 +2324,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) { // channels maintained with the remote peer. // // NOTE: This method MUST be run as a goroutine. -func (p *Brontide) channelManager() { +func (p *Brontide) channelManager(client *subscribe.Client) { defer p.wg.Done() // reenableTimeout will fire once after the configured channel status @@ -2472,7 +2485,7 @@ out: // TODO(conner): consolidate reenables timers inside chan status // manager case <-reenableTimeout: - p.reenableActiveChannels() + p.reenableActiveChannels(client) // Since this channel will never fire again during the // lifecycle of the peer, we nil the channel to mark it @@ -2482,6 +2495,11 @@ out: // select will ignore this case entirely. reenableTimeout = nil + // Once the reenabling is attempted, we also cancel the + // channel event subscription to free up the overflow + // queue used in channel notifier. + client.Cancel() + case <-p.quit: // As, we've been signalled to exit, we'll reset all // our active channel back to their default state. @@ -2505,24 +2523,55 @@ out: // peer, and reenables each public, non-pending channel. This is done at the // gossip level by broadcasting a new ChannelUpdate with the disabled bit unset. // No message will be sent if the channel is already enabled. -func (p *Brontide) reenableActiveChannels() { +func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { // First, filter all known channels with this peer for ones that are // both public and not pending. activePublicChans := p.filterChannelsToEnable() + // Create a map to hold channels that needs to be retried. + retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans)) + // For each of the public, non-pending channels, set the channel // disabled bit to false and send out a new ChannelUpdate. If this // channel is already active, the update won't be sent. for _, chanPoint := range activePublicChans { err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false) - if err == netann.ErrEnableManuallyDisabledChan { - p.log.Debugf("Channel(%v) was manually disabled, ignoring "+ - "automatic enable request", chanPoint) - } else if err != nil { - p.log.Errorf("Unable to enable channel %v: %v", - chanPoint, err) + + switch { + // No error occurred, continue to request the next channel. + case err == nil: + continue + + // Cannot auto enable a manually disabled channel so we do + // nothing but proceed to the next channel. + case errors.Is(err, netann.ErrEnableManuallyDisabledChan): + p.log.Debugf("Channel(%v) was manually disabled, "+ + "ignoring automatic enable request", chanPoint) + + continue + + // If the channel is reported as inactive, we will give it + // another chance. When handling the request, ChanStatusManager + // will check whether the link is active or not. One of the + // conditions is whether the link has been marked as + // reestablished, which happens inside a goroutine(htlcManager) + // after the link is started. And we may get a false negative + // saying the link is not active because that goroutine hasn't + // reached the line to mark the reestablishment. Thus we give + // it a second chance to send the request. + case errors.Is(err, netann.ErrEnableInactiveChan): + p.log.Warnf("Channel(%v) cannot be enabled as " + + "ChanStatusManager reported inactive, retrying") + + // Add the channel to the retry map. + retryChans[chanPoint] = struct{}{} } } + + // Retry the channels if we have any. + if len(retryChans) != 0 { + p.retryRequestEnable(retryChans, client) + } } // fetchActiveChanCloser attempts to fetch the active chan closer state machine @@ -2639,6 +2688,100 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { return activePublicChans } +// retryRequestEnable takes a map of channel outpoints and a channel event +// client. It listens to the channel events and removes a channel from the map +// if it's matched to the event. Upon receiving an active channel event, it +// will send the enabling request again. +func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}, + client *subscribe.Client) { + + p.log.Debugf("Retry enabling %v channels", len(activeChans)) + + // retryEnable is a helper closure that sends an enable request and + // removes the channel from the map if it's matched. + retryEnable := func(chanPoint wire.OutPoint) error { + // If this is an active channel event, check whether it's in + // our targeted channels map. + _, found := activeChans[chanPoint] + + // If this channel is irrelevant, return nil so the loop can + // jump to next iteration. + if !found { + return nil + } + + // Otherwise we've just received an active signal for a channel + // that's previously failed to be enabled, we send the request + // again. + // + // We only give the channel one more shot, so we delete it from + // our map first to keep it from being attempted again. + delete(activeChans, chanPoint) + + // Send the request. + err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false) + if err != nil { + return fmt.Errorf("request enabling channel %v "+ + "failed: %w", chanPoint, err) + } + + return nil + } + + for { + // If activeChans is empty, we've done processing all the + // channels. + if len(activeChans) == 0 { + p.log.Debug("Finished retry enabling channels") + return + } + + select { + // A new event has been sent by the ChannelNotifier. We now + // check whether it's an active or inactive channel event. + case e := <-client.Updates(): + // If this is an active channel event, try enable the + // channel then jump to the next iteration. + active, ok := e.(channelnotifier.ActiveChannelEvent) + if ok { + chanPoint := *active.ChannelPoint + + // If we received an error for this particular + // channel, we log an error and won't quit as + // we still want to retry other channels. + if err := retryEnable(chanPoint); err != nil { + p.log.Errorf("Retry failed: %v", err) + } + + continue + } + + // Otherwise check for inactive link event, and jump to + // next iteration if it's not. + inactive, ok := e.(channelnotifier.InactiveLinkEvent) + if !ok { + continue + } + + // Found an inactive link event, if this is our + // targeted channel, remove it from our map. + chanPoint := *inactive.ChannelPoint + _, found := activeChans[chanPoint] + if !found { + continue + } + + delete(activeChans, chanPoint) + p.log.Warnf("Re-enable channel %v failed, received "+ + "inactive link event", chanPoint) + + case <-p.quit: + p.log.Debugf("Peer shutdown during retry enabling") + return + } + } +} + // chooseDeliveryScript takes two optionally set shutdown scripts and returns // a suitable script to close out to. This may be nil if neither script is // set. If both scripts are set, this function will error if they do not match. diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 44e63267b..cf71e0b92 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lntest/mock" @@ -1035,6 +1036,14 @@ func TestPeerCustomMessage(t *testing.T) { ConfChan: make(chan *chainntnfs.TxConfirmation), } + // TODO(yy): change ChannelNotifier to be an interface. + channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB()) + require.NoError(t, channelNotifier.Start()) + t.Cleanup(func() { + require.NoError(t, channelNotifier.Stop(), + "stop channel notifier failed") + }) + alicePeer := NewBrontide(Config{ PubKeyBytes: remoteKey, ChannelDB: dbAlice.ChannelStateDB(), @@ -1057,7 +1066,8 @@ func TestPeerCustomMessage(t *testing.T) { } return nil }, - PongBuf: make([]byte, lnwire.MaxPongBytes), + PongBuf: make([]byte, lnwire.MaxPongBytes), + ChannelNotifier: channelNotifier, }) // Set up the init sequence. diff --git a/peer/test_utils.go b/peer/test_utils.go index 41f6d2c0f..223ee119f 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" @@ -377,6 +378,14 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, return nil, nil, err } + // TODO(yy): change ChannelNotifier to be an interface. + channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB()) + require.NoError(t, channelNotifier.Start()) + t.Cleanup(func() { + require.NoError(t, channelNotifier.Stop(), + "stop channel notifier failed") + }) + cfg := &Config{ Addr: cfgAddr, PubKeyBytes: pubKey, @@ -392,6 +401,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, ChanStatusMgr: chanStatusMgr, Features: lnwire.NewFeatureVector(nil, lnwire.Features), DisconnectPeer: func(b *btcec.PublicKey) error { return nil }, + ChannelNotifier: channelNotifier, } alicePeer := NewBrontide(*cfg) @@ -400,8 +410,11 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) alicePeer.activeChannels[chanID] = channelAlice + sub, err := cfg.ChannelNotifier.SubscribeChannelEvents() + require.NoError(t, err) + alicePeer.wg.Add(1) - go alicePeer.channelManager() + go alicePeer.channelManager(sub) return alicePeer, channelBob, nil } From a012a5e2ff4f4ed989f4e7e1fbc282c9448e312d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 14 Dec 2022 02:26:20 +0800 Subject: [PATCH 5/6] peer: skip retrying when `reenableTimeout` is greater than 1 min This commit makes retrying enabling channels conditional. We now would only retry sending the enable request when the `ChanActiveTimeout` is no greater than 1 min. --- peer/brontide.go | 81 ++++++++++++++++++++++++++++++++++++---------- peer/test_utils.go | 5 +-- 2 files changed, 65 insertions(+), 21 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index aeae14144..4c56626ea 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -462,6 +462,15 @@ type Brontide struct { // peer's chansync message with its own over and over again. resentChanSyncMsg map[lnwire.ChannelID]struct{} + // channelEventClient is the channel event subscription client that's + // used to assist retry enabling the channels. This client is only + // created when the reenableTimeout is no greater than 1 minute. Once + // created, it is canceled once the reenabling has been finished. + // + // NOTE: we choose to create the client conditionally to avoid + // potentially holding lots of un-consumed events. + channelEventClient *subscribe.Client + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -593,16 +602,15 @@ func (p *Brontide) Start() error { p.log.Debugf("Loaded %v active channels from database", len(activeChans)) - // Subscribe channel events before loading channels so we won't miss - // events. This subscription is used to listen to active channel event - // when reenabling channels. Once the reenabling process is finished, - // this subscription will be canceled. + // Conditionally subscribe to channel events before loading channels so + // we won't miss events. This subscription is used to listen to active + // channel event when reenabling channels. Once the reenabling process + // is finished, this subscription will be canceled. // // NOTE: ChannelNotifier must be started before subscribing events // otherwise we'd panic here. - sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() - if err != nil { - return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + if err := p.attachChannelEventSubscription(); err != nil { + return err } msgs, err := p.loadActiveChannels(activeChans) @@ -616,7 +624,7 @@ func (p *Brontide) Start() error { go p.queueHandler() go p.writeHandler() go p.readHandler() - go p.channelManager(sub) + go p.channelManager() go p.pingHandler() // Signal to any external processes that the peer is now active. @@ -2324,7 +2332,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) { // channels maintained with the remote peer. // // NOTE: This method MUST be run as a goroutine. -func (p *Brontide) channelManager(client *subscribe.Client) { +func (p *Brontide) channelManager() { defer p.wg.Done() // reenableTimeout will fire once after the configured channel status @@ -2485,7 +2493,7 @@ out: // TODO(conner): consolidate reenables timers inside chan status // manager case <-reenableTimeout: - p.reenableActiveChannels(client) + p.reenableActiveChannels() // Since this channel will never fire again during the // lifecycle of the peer, we nil the channel to mark it @@ -2498,7 +2506,12 @@ out: // Once the reenabling is attempted, we also cancel the // channel event subscription to free up the overflow // queue used in channel notifier. - client.Cancel() + // + // NOTE: channelEventClient will be nil if the + // reenableTimeout is greater than 1 minute. + if p.channelEventClient != nil { + p.channelEventClient.Cancel() + } case <-p.quit: // As, we've been signalled to exit, we'll reset all @@ -2523,7 +2536,7 @@ out: // peer, and reenables each public, non-pending channel. This is done at the // gossip level by broadcasting a new ChannelUpdate with the disabled bit unset. // No message will be sent if the channel is already enabled. -func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { +func (p *Brontide) reenableActiveChannels() { // First, filter all known channels with this peer for ones that are // both public and not pending. activePublicChans := p.filterChannelsToEnable() @@ -2560,6 +2573,16 @@ func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { // reached the line to mark the reestablishment. Thus we give // it a second chance to send the request. case errors.Is(err, netann.ErrEnableInactiveChan): + // If we don't have a client created, it means we + // shouldn't retry enabling the channel. + if p.channelEventClient == nil { + p.log.Errorf("Channel(%v) request enabling "+ + "failed due to inactive link", + chanPoint) + + continue + } + p.log.Warnf("Channel(%v) cannot be enabled as " + "ChanStatusManager reported inactive, retrying") @@ -2570,7 +2593,7 @@ func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { // Retry the channels if we have any. if len(retryChans) != 0 { - p.retryRequestEnable(retryChans, client) + p.retryRequestEnable(retryChans) } } @@ -2692,9 +2715,7 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { // client. It listens to the channel events and removes a channel from the map // if it's matched to the event. Upon receiving an active channel event, it // will send the enabling request again. -func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}, - client *subscribe.Client) { - +func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) { p.log.Debugf("Retry enabling %v channels", len(activeChans)) // retryEnable is a helper closure that sends an enable request and @@ -2739,7 +2760,7 @@ func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}, select { // A new event has been sent by the ChannelNotifier. We now // check whether it's an active or inactive channel event. - case e := <-client.Updates(): + case e := <-p.channelEventClient.Updates(): // If this is an active channel event, try enable the // channel then jump to the next iteration. active, ok := e.(channelnotifier.ActiveChannelEvent) @@ -3680,3 +3701,29 @@ func (p *Brontide) LastRemotePingPayload() []byte { return pingBytes } + +// attachChannelEventSubscription creates a channel event subscription and +// attaches to client to Brontide if the reenableTimeout is no greater than 1 +// minute. +func (p *Brontide) attachChannelEventSubscription() error { + // If the timeout is greater than 1 minute, it's unlikely that the link + // hasn't yet finished its reestablishment. Return a nil without + // creating the client to specify that we don't want to retry. + if p.cfg.ChanActiveTimeout > 1*time.Minute { + return nil + } + + // When the reenable timeout is less than 1 minute, it's likely the + // channel link hasn't finished its reestablishment yet. In that case, + // we'll give it a second chance by subscribing to the channel update + // events. Upon receiving the `ActiveLinkEvent`, we'll then request + // enabling the channel again. + sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() + if err != nil { + return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + } + + p.channelEventClient = sub + + return nil +} diff --git a/peer/test_utils.go b/peer/test_utils.go index 223ee119f..a997411df 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -410,11 +410,8 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) alicePeer.activeChannels[chanID] = channelAlice - sub, err := cfg.ChannelNotifier.SubscribeChannelEvents() - require.NoError(t, err) - alicePeer.wg.Add(1) - go alicePeer.channelManager(sub) + go alicePeer.channelManager() return alicePeer, channelBob, nil } From b558c9c23e310454fa2a0030ee85f715142ccac0 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 15 Nov 2022 17:09:33 +0800 Subject: [PATCH 6/6] docs: update release notes --- docs/release-notes/release-notes-0.16.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.16.0.md b/docs/release-notes/release-notes-0.16.0.md index 1146051de..d62fd0977 100644 --- a/docs/release-notes/release-notes-0.16.0.md +++ b/docs/release-notes/release-notes-0.16.0.md @@ -269,6 +269,10 @@ data. * [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/7186) that might lead to channel updates being missed, causing channel graph being incomplete. +* During reconnection, enabling channels might be failed due to the startup of + link is falling behind, which is now fixed by [retrying the enable + request](https://github.com/lightningnetwork/lnd/pull/7157). + ## Code Health * [test: use `T.TempDir` to create temporary test