mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-06-03 19:50:25 +02:00
peer: retry enabling channels on ErrEnableInactiveChan
This commit adds a retry logic to the channels that failed with `ErrEnableInactiveChan` when requesting enabling. We now subscribe the channel events to decide what to do with the failed channels.
This commit is contained in:
parent
a81d4e9c5c
commit
2f9ba1594c
163
peer/brontide.go
163
peer/brontide.go
@ -39,6 +39,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/netann"
|
"github.com/lightningnetwork/lnd/netann"
|
||||||
"github.com/lightningnetwork/lnd/pool"
|
"github.com/lightningnetwork/lnd/pool"
|
||||||
"github.com/lightningnetwork/lnd/queue"
|
"github.com/lightningnetwork/lnd/queue"
|
||||||
|
"github.com/lightningnetwork/lnd/subscribe"
|
||||||
"github.com/lightningnetwork/lnd/ticker"
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
"github.com/lightningnetwork/lnd/watchtower/wtclient"
|
"github.com/lightningnetwork/lnd/watchtower/wtclient"
|
||||||
)
|
)
|
||||||
@ -592,6 +593,18 @@ func (p *Brontide) Start() error {
|
|||||||
p.log.Debugf("Loaded %v active channels from database",
|
p.log.Debugf("Loaded %v active channels from database",
|
||||||
len(activeChans))
|
len(activeChans))
|
||||||
|
|
||||||
|
// Subscribe channel events before loading channels so we won't miss
|
||||||
|
// events. This subscription is used to listen to active channel event
|
||||||
|
// when reenabling channels. Once the reenabling process is finished,
|
||||||
|
// this subscription will be canceled.
|
||||||
|
//
|
||||||
|
// NOTE: ChannelNotifier must be started before subscribing events
|
||||||
|
// otherwise we'd panic here.
|
||||||
|
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
msgs, err := p.loadActiveChannels(activeChans)
|
msgs, err := p.loadActiveChannels(activeChans)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to load channels: %v", err)
|
return fmt.Errorf("unable to load channels: %v", err)
|
||||||
@ -603,7 +616,7 @@ func (p *Brontide) Start() error {
|
|||||||
go p.queueHandler()
|
go p.queueHandler()
|
||||||
go p.writeHandler()
|
go p.writeHandler()
|
||||||
go p.readHandler()
|
go p.readHandler()
|
||||||
go p.channelManager()
|
go p.channelManager(sub)
|
||||||
go p.pingHandler()
|
go p.pingHandler()
|
||||||
|
|
||||||
// Signal to any external processes that the peer is now active.
|
// Signal to any external processes that the peer is now active.
|
||||||
@ -2311,7 +2324,7 @@ func (p *Brontide) genDeliveryScript() ([]byte, error) {
|
|||||||
// channels maintained with the remote peer.
|
// channels maintained with the remote peer.
|
||||||
//
|
//
|
||||||
// NOTE: This method MUST be run as a goroutine.
|
// NOTE: This method MUST be run as a goroutine.
|
||||||
func (p *Brontide) channelManager() {
|
func (p *Brontide) channelManager(client *subscribe.Client) {
|
||||||
defer p.wg.Done()
|
defer p.wg.Done()
|
||||||
|
|
||||||
// reenableTimeout will fire once after the configured channel status
|
// reenableTimeout will fire once after the configured channel status
|
||||||
@ -2472,7 +2485,7 @@ out:
|
|||||||
// TODO(conner): consolidate reenables timers inside chan status
|
// TODO(conner): consolidate reenables timers inside chan status
|
||||||
// manager
|
// manager
|
||||||
case <-reenableTimeout:
|
case <-reenableTimeout:
|
||||||
p.reenableActiveChannels()
|
p.reenableActiveChannels(client)
|
||||||
|
|
||||||
// Since this channel will never fire again during the
|
// Since this channel will never fire again during the
|
||||||
// lifecycle of the peer, we nil the channel to mark it
|
// lifecycle of the peer, we nil the channel to mark it
|
||||||
@ -2482,6 +2495,11 @@ out:
|
|||||||
// select will ignore this case entirely.
|
// select will ignore this case entirely.
|
||||||
reenableTimeout = nil
|
reenableTimeout = nil
|
||||||
|
|
||||||
|
// Once the reenabling is attempted, we also cancel the
|
||||||
|
// channel event subscription to free up the overflow
|
||||||
|
// queue used in channel notifier.
|
||||||
|
client.Cancel()
|
||||||
|
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
// As, we've been signalled to exit, we'll reset all
|
// As, we've been signalled to exit, we'll reset all
|
||||||
// our active channel back to their default state.
|
// our active channel back to their default state.
|
||||||
@ -2505,24 +2523,55 @@ out:
|
|||||||
// peer, and reenables each public, non-pending channel. This is done at the
|
// peer, and reenables each public, non-pending channel. This is done at the
|
||||||
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
|
// gossip level by broadcasting a new ChannelUpdate with the disabled bit unset.
|
||||||
// No message will be sent if the channel is already enabled.
|
// No message will be sent if the channel is already enabled.
|
||||||
func (p *Brontide) reenableActiveChannels() {
|
func (p *Brontide) reenableActiveChannels(client *subscribe.Client) {
|
||||||
// First, filter all known channels with this peer for ones that are
|
// First, filter all known channels with this peer for ones that are
|
||||||
// both public and not pending.
|
// both public and not pending.
|
||||||
activePublicChans := p.filterChannelsToEnable()
|
activePublicChans := p.filterChannelsToEnable()
|
||||||
|
|
||||||
|
// Create a map to hold channels that needs to be retried.
|
||||||
|
retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans))
|
||||||
|
|
||||||
// For each of the public, non-pending channels, set the channel
|
// For each of the public, non-pending channels, set the channel
|
||||||
// disabled bit to false and send out a new ChannelUpdate. If this
|
// disabled bit to false and send out a new ChannelUpdate. If this
|
||||||
// channel is already active, the update won't be sent.
|
// channel is already active, the update won't be sent.
|
||||||
for _, chanPoint := range activePublicChans {
|
for _, chanPoint := range activePublicChans {
|
||||||
err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
|
err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
|
||||||
if err == netann.ErrEnableManuallyDisabledChan {
|
|
||||||
p.log.Debugf("Channel(%v) was manually disabled, ignoring "+
|
switch {
|
||||||
"automatic enable request", chanPoint)
|
// No error occurred, continue to request the next channel.
|
||||||
} else if err != nil {
|
case err == nil:
|
||||||
p.log.Errorf("Unable to enable channel %v: %v",
|
continue
|
||||||
chanPoint, err)
|
|
||||||
|
// Cannot auto enable a manually disabled channel so we do
|
||||||
|
// nothing but proceed to the next channel.
|
||||||
|
case errors.Is(err, netann.ErrEnableManuallyDisabledChan):
|
||||||
|
p.log.Debugf("Channel(%v) was manually disabled, "+
|
||||||
|
"ignoring automatic enable request", chanPoint)
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
// If the channel is reported as inactive, we will give it
|
||||||
|
// another chance. When handling the request, ChanStatusManager
|
||||||
|
// will check whether the link is active or not. One of the
|
||||||
|
// conditions is whether the link has been marked as
|
||||||
|
// reestablished, which happens inside a goroutine(htlcManager)
|
||||||
|
// after the link is started. And we may get a false negative
|
||||||
|
// saying the link is not active because that goroutine hasn't
|
||||||
|
// reached the line to mark the reestablishment. Thus we give
|
||||||
|
// it a second chance to send the request.
|
||||||
|
case errors.Is(err, netann.ErrEnableInactiveChan):
|
||||||
|
p.log.Warnf("Channel(%v) cannot be enabled as " +
|
||||||
|
"ChanStatusManager reported inactive, retrying")
|
||||||
|
|
||||||
|
// Add the channel to the retry map.
|
||||||
|
retryChans[chanPoint] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retry the channels if we have any.
|
||||||
|
if len(retryChans) != 0 {
|
||||||
|
p.retryRequestEnable(retryChans, client)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchActiveChanCloser attempts to fetch the active chan closer state machine
|
// fetchActiveChanCloser attempts to fetch the active chan closer state machine
|
||||||
@ -2639,6 +2688,100 @@ func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
|
|||||||
return activePublicChans
|
return activePublicChans
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retryRequestEnable takes a map of channel outpoints and a channel event
|
||||||
|
// client. It listens to the channel events and removes a channel from the map
|
||||||
|
// if it's matched to the event. Upon receiving an active channel event, it
|
||||||
|
// will send the enabling request again.
|
||||||
|
func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{},
|
||||||
|
client *subscribe.Client) {
|
||||||
|
|
||||||
|
p.log.Debugf("Retry enabling %v channels", len(activeChans))
|
||||||
|
|
||||||
|
// retryEnable is a helper closure that sends an enable request and
|
||||||
|
// removes the channel from the map if it's matched.
|
||||||
|
retryEnable := func(chanPoint wire.OutPoint) error {
|
||||||
|
// If this is an active channel event, check whether it's in
|
||||||
|
// our targeted channels map.
|
||||||
|
_, found := activeChans[chanPoint]
|
||||||
|
|
||||||
|
// If this channel is irrelevant, return nil so the loop can
|
||||||
|
// jump to next iteration.
|
||||||
|
if !found {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise we've just received an active signal for a channel
|
||||||
|
// that's previously failed to be enabled, we send the request
|
||||||
|
// again.
|
||||||
|
//
|
||||||
|
// We only give the channel one more shot, so we delete it from
|
||||||
|
// our map first to keep it from being attempted again.
|
||||||
|
delete(activeChans, chanPoint)
|
||||||
|
|
||||||
|
// Send the request.
|
||||||
|
err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("request enabling channel %v "+
|
||||||
|
"failed: %w", chanPoint, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
// If activeChans is empty, we've done processing all the
|
||||||
|
// channels.
|
||||||
|
if len(activeChans) == 0 {
|
||||||
|
p.log.Debug("Finished retry enabling channels")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
// A new event has been sent by the ChannelNotifier. We now
|
||||||
|
// check whether it's an active or inactive channel event.
|
||||||
|
case e := <-client.Updates():
|
||||||
|
// If this is an active channel event, try enable the
|
||||||
|
// channel then jump to the next iteration.
|
||||||
|
active, ok := e.(channelnotifier.ActiveChannelEvent)
|
||||||
|
if ok {
|
||||||
|
chanPoint := *active.ChannelPoint
|
||||||
|
|
||||||
|
// If we received an error for this particular
|
||||||
|
// channel, we log an error and won't quit as
|
||||||
|
// we still want to retry other channels.
|
||||||
|
if err := retryEnable(chanPoint); err != nil {
|
||||||
|
p.log.Errorf("Retry failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise check for inactive link event, and jump to
|
||||||
|
// next iteration if it's not.
|
||||||
|
inactive, ok := e.(channelnotifier.InactiveLinkEvent)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Found an inactive link event, if this is our
|
||||||
|
// targeted channel, remove it from our map.
|
||||||
|
chanPoint := *inactive.ChannelPoint
|
||||||
|
_, found := activeChans[chanPoint]
|
||||||
|
if !found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(activeChans, chanPoint)
|
||||||
|
p.log.Warnf("Re-enable channel %v failed, received "+
|
||||||
|
"inactive link event", chanPoint)
|
||||||
|
|
||||||
|
case <-p.quit:
|
||||||
|
p.log.Debugf("Peer shutdown during retry enabling")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// chooseDeliveryScript takes two optionally set shutdown scripts and returns
|
// chooseDeliveryScript takes two optionally set shutdown scripts and returns
|
||||||
// a suitable script to close out to. This may be nil if neither script is
|
// a suitable script to close out to. This may be nil if neither script is
|
||||||
// set. If both scripts are set, this function will error if they do not match.
|
// set. If both scripts are set, this function will error if they do not match.
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
"github.com/lightningnetwork/lnd/htlcswitch"
|
"github.com/lightningnetwork/lnd/htlcswitch"
|
||||||
"github.com/lightningnetwork/lnd/lntest/mock"
|
"github.com/lightningnetwork/lnd/lntest/mock"
|
||||||
@ -1035,6 +1036,14 @@ func TestPeerCustomMessage(t *testing.T) {
|
|||||||
ConfChan: make(chan *chainntnfs.TxConfirmation),
|
ConfChan: make(chan *chainntnfs.TxConfirmation),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(yy): change ChannelNotifier to be an interface.
|
||||||
|
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
|
||||||
|
require.NoError(t, channelNotifier.Start())
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, channelNotifier.Stop(),
|
||||||
|
"stop channel notifier failed")
|
||||||
|
})
|
||||||
|
|
||||||
alicePeer := NewBrontide(Config{
|
alicePeer := NewBrontide(Config{
|
||||||
PubKeyBytes: remoteKey,
|
PubKeyBytes: remoteKey,
|
||||||
ChannelDB: dbAlice.ChannelStateDB(),
|
ChannelDB: dbAlice.ChannelStateDB(),
|
||||||
@ -1057,7 +1066,8 @@ func TestPeerCustomMessage(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
PongBuf: make([]byte, lnwire.MaxPongBytes),
|
PongBuf: make([]byte, lnwire.MaxPongBytes),
|
||||||
|
ChannelNotifier: channelNotifier,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Set up the init sequence.
|
// Set up the init sequence.
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/htlcswitch"
|
"github.com/lightningnetwork/lnd/htlcswitch"
|
||||||
"github.com/lightningnetwork/lnd/input"
|
"github.com/lightningnetwork/lnd/input"
|
||||||
"github.com/lightningnetwork/lnd/keychain"
|
"github.com/lightningnetwork/lnd/keychain"
|
||||||
@ -377,6 +378,14 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(yy): change ChannelNotifier to be an interface.
|
||||||
|
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
|
||||||
|
require.NoError(t, channelNotifier.Start())
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, channelNotifier.Stop(),
|
||||||
|
"stop channel notifier failed")
|
||||||
|
})
|
||||||
|
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
Addr: cfgAddr,
|
Addr: cfgAddr,
|
||||||
PubKeyBytes: pubKey,
|
PubKeyBytes: pubKey,
|
||||||
@ -392,6 +401,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
|
|||||||
ChanStatusMgr: chanStatusMgr,
|
ChanStatusMgr: chanStatusMgr,
|
||||||
Features: lnwire.NewFeatureVector(nil, lnwire.Features),
|
Features: lnwire.NewFeatureVector(nil, lnwire.Features),
|
||||||
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
|
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
|
||||||
|
ChannelNotifier: channelNotifier,
|
||||||
}
|
}
|
||||||
|
|
||||||
alicePeer := NewBrontide(*cfg)
|
alicePeer := NewBrontide(*cfg)
|
||||||
@ -400,8 +410,11 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
|
|||||||
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
|
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
|
||||||
alicePeer.activeChannels[chanID] = channelAlice
|
alicePeer.activeChannels[chanID] = channelAlice
|
||||||
|
|
||||||
|
sub, err := cfg.ChannelNotifier.SubscribeChannelEvents()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
alicePeer.wg.Add(1)
|
alicePeer.wg.Add(1)
|
||||||
go alicePeer.channelManager()
|
go alicePeer.channelManager(sub)
|
||||||
|
|
||||||
return alicePeer, channelBob, nil
|
return alicePeer, channelBob, nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user