mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-25 19:38:05 +02:00
peer: skip retrying when reenableTimeout
is greater than 1 min
This commit makes retrying enabling channels conditional. We now would only retry sending the enable request when the `ChanActiveTimeout` is no greater than 1 min.
This commit is contained in:
@@ -462,6 +462,15 @@ type Brontide struct {
|
|||||||
// peer's chansync message with its own over and over again.
|
// peer's chansync message with its own over and over again.
|
||||||
resentChanSyncMsg map[lnwire.ChannelID]struct{}
|
resentChanSyncMsg map[lnwire.ChannelID]struct{}
|
||||||
|
|
||||||
|
// channelEventClient is the channel event subscription client that's
|
||||||
|
// used to assist retry enabling the channels. This client is only
|
||||||
|
// created when the reenableTimeout is no greater than 1 minute. Once
|
||||||
|
// created, it is canceled once the reenabling has been finished.
|
||||||
|
//
|
||||||
|
// NOTE: we choose to create the client conditionally to avoid
|
||||||
|
// potentially holding lots of un-consumed events.
|
||||||
|
channelEventClient *subscribe.Client
|
||||||
|
|
||||||
queueQuit chan struct{}
|
queueQuit chan struct{}
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@@ -593,16 +602,15 @@ func (p *Brontide) Start() error {
|
|||||||
p.log.Debugf("Loaded %v active channels from database",
|
p.log.Debugf("Loaded %v active channels from database",
|
||||||
len(activeChans))
|
len(activeChans))
|
||||||
|
|
||||||
// Subscribe channel events before loading channels so we won't miss
|
// Conditionally subscribe to channel events before loading channels so
|
||||||
// events. This subscription is used to listen to active channel event
|
// we won't miss events. This subscription is used to listen to active
|
||||||
// when reenabling channels. Once the reenabling process is finished,
|
// channel event when reenabling channels. Once the reenabling process
|
||||||
// this subscription will be canceled.
|
// is finished, this subscription will be canceled.
|
||||||
//
|
//
|
||||||
// NOTE: ChannelNotifier must be started before subscribing events
|
// NOTE: ChannelNotifier must be started before subscribing events
|
||||||
// otherwise we'd panic here.
|
// otherwise we'd panic here.
|
||||||
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
|
if err := p.attachChannelEventSubscription(); err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := p.loadActiveChannels(activeChans)
|
msgs, err := p.loadActiveChannels(activeChans)
|
||||||
@@ -616,7 +624,7 @@ func (p *Brontide) Start() error {
|
|||||||
go p.queueHandler()
|
go p.queueHandler()
|
||||||
go p.writeHandler()
|
go p.writeHandler()
|
||||||
go p.readHandler()
|
go p.readHandler()
|
||||||
go p.channelManager(sub)
|
go p.channelManager()
|
||||||
go p.pingHandler()
|
go p.pingHandler()
|
||||||
|
|
||||||
// Signal to any external processes that the peer is now active.
|
// Signal to any external processes that the peer is now active.
|
||||||
@@ -2324,7 +2332,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) {
|
|||||||
// channels maintained with the remote peer.
|
// channels maintained with the remote peer.
|
||||||
//
|
//
|
||||||
// NOTE: This method MUST be run as a goroutine.
|
// NOTE: This method MUST be run as a goroutine.
|
||||||
func (p *Brontide) channelManager(client *subscribe.Client) {
|
func (p *Brontide) channelManager() {
|
||||||
defer p.wg.Done()
|
defer p.wg.Done()
|
||||||
|
|
||||||
// reenableTimeout will fire once after the configured channel status
|
// reenableTimeout will fire once after the configured channel status
|
||||||
@@ -2485,7 +2493,7 @@ out:
|
|||||||
// TODO(conner): consolidate reenables timers inside chan status
|
// TODO(conner): consolidate reenables timers inside chan status
|
||||||
// manager
|
// manager
|
||||||
case <-reenableTimeout:
|
case <-reenableTimeout:
|
||||||
p.reenableActiveChannels(client)
|
p.reenableActiveChannels()
|
||||||
|
|
||||||
// Since this channel will never fire again during the
|
// Since this channel will never fire again during the
|
||||||
// lifecycle of the peer, we nil the channel to mark it
|
// lifecycle of the peer, we nil the channel to mark it
|
||||||
@@ -2498,7 +2506,12 @@ out:
|
|||||||
// Once the reenabling is attempted, we also cancel the
|
// Once the reenabling is attempted, we also cancel the
|
||||||
// channel event subscription to free up the overflow
|
// channel event subscription to free up the overflow
|
||||||
// queue used in channel notifier.
|
// queue used in channel notifier.
|
||||||
client.Cancel()
|
//
|
||||||
|
// NOTE: channelEventClient will be nil if the
|
||||||
|
// reenableTimeout is greater than 1 minute.
|
||||||
|
if p.channelEventClient != nil {
|
||||||
|
p.channelEventClient.Cancel()
|
||||||
|
}
|
||||||
|
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
// As, we've been signalled to exit, we'll reset all
|
// As, we've been signalled to exit, we'll reset all
|
||||||
@@ -2523,7 +2536,7 @@ out:
|
|||||||
// peer, and reenables each public, non-pending channel. This is done at the
|
// peer, and reenables each public, non-pending channel. This is done at the
|
||||||
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
|
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
|
||||||
// No message will be sent if the channel is already enabled.
|
// No message will be sent if the channel is already enabled.
|
||||||
func (p *Brontide) reenableActiveChannels(client *subscribe.Client) {
|
func (p *Brontide) reenableActiveChannels() {
|
||||||
// First, filter all known channels with this peer for ones that are
|
// First, filter all known channels with this peer for ones that are
|
||||||
// both public and not pending.
|
// both public and not pending.
|
||||||
activePublicChans := p.filterChannelsToEnable()
|
activePublicChans := p.filterChannelsToEnable()
|
||||||
@@ -2560,6 +2573,16 @@ func (p *Brontide) reenableActiveChannels(client *subscribe.Client) {
|
|||||||
// reached the line to mark the reestablishment. Thus we give
|
// reached the line to mark the reestablishment. Thus we give
|
||||||
// it a second chance to send the request.
|
// it a second chance to send the request.
|
||||||
case errors.Is(err, netann.ErrEnableInactiveChan):
|
case errors.Is(err, netann.ErrEnableInactiveChan):
|
||||||
|
// If we don't have a client created, it means we
|
||||||
|
// shouldn't retry enabling the channel.
|
||||||
|
if p.channelEventClient == nil {
|
||||||
|
p.log.Errorf("Channel(%v) request enabling "+
|
||||||
|
"failed due to inactive link",
|
||||||
|
chanPoint)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
p.log.Warnf("Channel(%v) cannot be enabled as " +
|
p.log.Warnf("Channel(%v) cannot be enabled as " +
|
||||||
"ChanStatusManager reported inactive, retrying")
|
"ChanStatusManager reported inactive, retrying")
|
||||||
|
|
||||||
@@ -2570,7 +2593,7 @@ func (p *Brontide) reenableActiveChannels(client *subscribe.Client) {
|
|||||||
|
|
||||||
// Retry the channels if we have any.
|
// Retry the channels if we have any.
|
||||||
if len(retryChans) != 0 {
|
if len(retryChans) != 0 {
|
||||||
p.retryRequestEnable(retryChans, client)
|
p.retryRequestEnable(retryChans)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2692,9 +2715,7 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
|
|||||||
// client. It listens to the channel events and removes a channel from the map
|
// client. It listens to the channel events and removes a channel from the map
|
||||||
// if it's matched to the event. Upon receiving an active channel event, it
|
// if it's matched to the event. Upon receiving an active channel event, it
|
||||||
// will send the enabling request again.
|
// will send the enabling request again.
|
||||||
func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{},
|
func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) {
|
||||||
client *subscribe.Client) {
|
|
||||||
|
|
||||||
p.log.Debugf("Retry enabling %v channels", len(activeChans))
|
p.log.Debugf("Retry enabling %v channels", len(activeChans))
|
||||||
|
|
||||||
// retryEnable is a helper closure that sends an enable request and
|
// retryEnable is a helper closure that sends an enable request and
|
||||||
@@ -2739,7 +2760,7 @@ func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{},
|
|||||||
select {
|
select {
|
||||||
// A new event has been sent by the ChannelNotifier. We now
|
// A new event has been sent by the ChannelNotifier. We now
|
||||||
// check whether it's an active or inactive channel event.
|
// check whether it's an active or inactive channel event.
|
||||||
case e := <-client.Updates():
|
case e := <-p.channelEventClient.Updates():
|
||||||
// If this is an active channel event, try enable the
|
// If this is an active channel event, try enable the
|
||||||
// channel then jump to the next iteration.
|
// channel then jump to the next iteration.
|
||||||
active, ok := e.(channelnotifier.ActiveChannelEvent)
|
active, ok := e.(channelnotifier.ActiveChannelEvent)
|
||||||
@@ -3680,3 +3701,29 @@ func (p *Brontide) LastRemotePingPayload() []byte {
|
|||||||
|
|
||||||
return pingBytes
|
return pingBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// attachChannelEventSubscription creates a channel event subscription and
|
||||||
|
// attaches to client to Brontide if the reenableTimeout is no greater than 1
|
||||||
|
// minute.
|
||||||
|
func (p *Brontide) attachChannelEventSubscription() error {
|
||||||
|
// If the timeout is greater than 1 minute, it's unlikely that the link
|
||||||
|
// hasn't yet finished its reestablishment. Return a nil without
|
||||||
|
// creating the client to specify that we don't want to retry.
|
||||||
|
if p.cfg.ChanActiveTimeout > 1*time.Minute {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the reenable timeout is less than 1 minute, it's likely the
|
||||||
|
// channel link hasn't finished its reestablishment yet. In that case,
|
||||||
|
// we'll give it a second chance by subscribing to the channel update
|
||||||
|
// events. Upon receiving the `ActiveLinkEvent`, we'll then request
|
||||||
|
// enabling the channel again.
|
||||||
|
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.channelEventClient = sub
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -410,11 +410,8 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
|
|||||||
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
|
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
|
||||||
alicePeer.activeChannels[chanID] = channelAlice
|
alicePeer.activeChannels[chanID] = channelAlice
|
||||||
|
|
||||||
sub, err := cfg.ChannelNotifier.SubscribeChannelEvents()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
alicePeer.wg.Add(1)
|
alicePeer.wg.Add(1)
|
||||||
go alicePeer.channelManager(sub)
|
go alicePeer.channelManager()
|
||||||
|
|
||||||
return alicePeer, channelBob, nil
|
return alicePeer, channelBob, nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user