From a012a5e2ff4f4ed989f4e7e1fbc282c9448e312d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 14 Dec 2022 02:26:20 +0800 Subject: [PATCH] peer: skip retrying when `reenableTimeout` is greater than 1 min This commit makes retrying enabling channels conditional. We now would only retry sending the enable request when the `ChanActiveTimeout` is no greater than 1 min. --- peer/brontide.go | 81 ++++++++++++++++++++++++++++++++++++---------- peer/test_utils.go | 5 +-- 2 files changed, 65 insertions(+), 21 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index aeae14144..4c56626ea 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -462,6 +462,15 @@ type Brontide struct { // peer's chansync message with its own over and over again. resentChanSyncMsg map[lnwire.ChannelID]struct{} + // channelEventClient is the channel event subscription client that's + // used to assist retry enabling the channels. This client is only + // created when the reenableTimeout is no greater than 1 minute. Once + // created, it is canceled once the reenabling has been finished. + // + // NOTE: we choose to create the client conditionally to avoid + // potentially holding lots of un-consumed events. + channelEventClient *subscribe.Client + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -593,16 +602,15 @@ func (p *Brontide) Start() error { p.log.Debugf("Loaded %v active channels from database", len(activeChans)) - // Subscribe channel events before loading channels so we won't miss - // events. This subscription is used to listen to active channel event - // when reenabling channels. Once the reenabling process is finished, - // this subscription will be canceled. + // Conditionally subscribe to channel events before loading channels so + // we won't miss events. This subscription is used to listen to active + // channel event when reenabling channels. Once the reenabling process + // is finished, this subscription will be canceled. // // NOTE: ChannelNotifier must be started before subscribing events // otherwise we'd panic here. - sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() - if err != nil { - return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + if err := p.attachChannelEventSubscription(); err != nil { + return err } msgs, err := p.loadActiveChannels(activeChans) @@ -616,7 +624,7 @@ func (p *Brontide) Start() error { go p.queueHandler() go p.writeHandler() go p.readHandler() - go p.channelManager(sub) + go p.channelManager() go p.pingHandler() // Signal to any external processes that the peer is now active. @@ -2324,7 +2332,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) { // channels maintained with the remote peer. // // NOTE: This method MUST be run as a goroutine. -func (p *Brontide) channelManager(client *subscribe.Client) { +func (p *Brontide) channelManager() { defer p.wg.Done() // reenableTimeout will fire once after the configured channel status @@ -2485,7 +2493,7 @@ out: // TODO(conner): consolidate reenables timers inside chan status // manager case <-reenableTimeout: - p.reenableActiveChannels(client) + p.reenableActiveChannels() // Since this channel will never fire again during the // lifecycle of the peer, we nil the channel to mark it @@ -2498,7 +2506,12 @@ out: // Once the reenabling is attempted, we also cancel the // channel event subscription to free up the overflow // queue used in channel notifier. - client.Cancel() + // + // NOTE: channelEventClient will be nil if the + // reenableTimeout is greater than 1 minute. + if p.channelEventClient != nil { + p.channelEventClient.Cancel() + } case <-p.quit: // As, we've been signalled to exit, we'll reset all @@ -2523,7 +2536,7 @@ out: // peer, and reenables each public, non-pending channel. This is done at the // gossip level by broadcasting a new ChannelUpdate with the disabled bit unset. // No message will be sent if the channel is already enabled. -func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { +func (p *Brontide) reenableActiveChannels() { // First, filter all known channels with this peer for ones that are // both public and not pending. activePublicChans := p.filterChannelsToEnable() @@ -2560,6 +2573,16 @@ func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { // reached the line to mark the reestablishment. Thus we give // it a second chance to send the request. case errors.Is(err, netann.ErrEnableInactiveChan): + // If we don't have a client created, it means we + // shouldn't retry enabling the channel. + if p.channelEventClient == nil { + p.log.Errorf("Channel(%v) request enabling "+ + "failed due to inactive link", + chanPoint) + + continue + } + p.log.Warnf("Channel(%v) cannot be enabled as " + "ChanStatusManager reported inactive, retrying") @@ -2570,7 +2593,7 @@ func (p *Brontide) reenableActiveChannels(client *subscribe.Client) { // Retry the channels if we have any. if len(retryChans) != 0 { - p.retryRequestEnable(retryChans, client) + p.retryRequestEnable(retryChans) } } @@ -2692,9 +2715,7 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint { // client. It listens to the channel events and removes a channel from the map // if it's matched to the event. Upon receiving an active channel event, it // will send the enabling request again. -func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}, - client *subscribe.Client) { - +func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) { p.log.Debugf("Retry enabling %v channels", len(activeChans)) // retryEnable is a helper closure that sends an enable request and @@ -2739,7 +2760,7 @@ func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}, select { // A new event has been sent by the ChannelNotifier. We now // check whether it's an active or inactive channel event. - case e := <-client.Updates(): + case e := <-p.channelEventClient.Updates(): // If this is an active channel event, try enable the // channel then jump to the next iteration. active, ok := e.(channelnotifier.ActiveChannelEvent) @@ -3680,3 +3701,29 @@ func (p *Brontide) LastRemotePingPayload() []byte { return pingBytes } + +// attachChannelEventSubscription creates a channel event subscription and +// attaches to client to Brontide if the reenableTimeout is no greater than 1 +// minute. +func (p *Brontide) attachChannelEventSubscription() error { + // If the timeout is greater than 1 minute, it's unlikely that the link + // hasn't yet finished its reestablishment. Return a nil without + // creating the client to specify that we don't want to retry. + if p.cfg.ChanActiveTimeout > 1*time.Minute { + return nil + } + + // When the reenable timeout is less than 1 minute, it's likely the + // channel link hasn't finished its reestablishment yet. In that case, + // we'll give it a second chance by subscribing to the channel update + // events. Upon receiving the `ActiveLinkEvent`, we'll then request + // enabling the channel again. + sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents() + if err != nil { + return fmt.Errorf("SubscribeChannelEvents failed: %w", err) + } + + p.channelEventClient = sub + + return nil +} diff --git a/peer/test_utils.go b/peer/test_utils.go index 223ee119f..a997411df 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -410,11 +410,8 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier, chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint()) alicePeer.activeChannels[chanID] = channelAlice - sub, err := cfg.ChannelNotifier.SubscribeChannelEvents() - require.NoError(t, err) - alicePeer.wg.Add(1) - go alicePeer.channelManager(sub) + go alicePeer.channelManager() return alicePeer, channelBob, nil }