Merge pull request #7157 from yyforyongyu/fix-peer

brontide: retry enabling channels
This commit is contained in:
Oliver Gugger 2023-01-17 23:37:44 +01:00 committed by GitHub
commit de94a4ea5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 317 additions and 57 deletions

View File

@ -47,6 +47,13 @@ type ActiveLinkEvent struct {
ChannelPoint *wire.OutPoint
}
// InactiveLinkEvent represents a new event where the link becomes inactive in
// the switch.
type InactiveLinkEvent struct {
// ChannelPoint is the channel point for the inactive channel.
ChannelPoint *wire.OutPoint
}
// ActiveChannelEvent represents a new event where a channel becomes active.
type ActiveChannelEvent struct {
// ChannelPoint is the channelpoint for the newly active channel.
@ -193,6 +200,15 @@ func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) {
}
}
// NotifyInactiveLinkEvent notifies the channelEventNotifier goroutine that a
// link has been removed from the switch.
func (c *ChannelNotifier) NotifyInactiveLinkEvent(chanPoint wire.OutPoint) {
event := InactiveLinkEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send inactive link update: %v", err)
}
}
// NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a
// channel is inactive.
func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) {

View File

@ -269,6 +269,10 @@ data.
* [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/7186) that might
lead to channel updates being missed, causing channel graph being incomplete.
* During reconnection, enabling channels might be failed due to the startup of
link is falling behind, which is now fixed by [retrying the enable
request](https://github.com/lightningnetwork/lnd/pull/7157).
## Code Health
* [test: use `T.TempDir` to create temporary test

View File

@ -293,6 +293,10 @@ type ChannelLinkConfig struct {
// when channels become inactive.
NotifyInactiveChannel func(wire.OutPoint)
// NotifyInactiveLinkEvent allows the switch to tell the
// ChannelNotifier when a channel link become inactive.
NotifyInactiveLinkEvent func(wire.OutPoint)
// HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
// events through.
HtlcNotifier htlcNotifier
@ -979,8 +983,11 @@ func (l *channelLink) htlcManager() {
l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
// Notify any clients that the link is now in the switch via an
// ActiveLinkEvent.
// ActiveLinkEvent. We'll also defer an inactive link notification for
// when the link exits to ensure that every active notification is
// matched by an inactive one.
l.cfg.NotifyActiveLink(*l.ChannelPoint())
defer l.cfg.NotifyInactiveLinkEvent(*l.ChannelPoint())
// TODO(roasbeef): need to call wipe chan whenever D/C?

View File

@ -1886,16 +1886,17 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount)
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 40 * time.Minute,
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveLink: func(wire.OutPoint) {},
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
HtlcNotifier: aliceSwitch.cfg.HtlcNotifier,
GetAliases: getAliases,
BatchSize: 10000,
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 40 * time.Minute,
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveLink: func(wire.OutPoint) {},
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
NotifyInactiveLinkEvent: func(wire.OutPoint) {},
HtlcNotifier: aliceSwitch.cfg.HtlcNotifier,
GetAliases: getAliases,
}
aliceLink := NewChannelLink(aliceCfg, aliceLc.channel)
@ -4329,15 +4330,16 @@ func (h *persistentLinkHarness) restartLink(
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 40 * time.Minute,
// Set any hodl flags requested for the new link.
HodlMask: hodl.MaskFromFlags(hodlFlags...),
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveLink: func(wire.OutPoint) {},
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
HtlcNotifier: aliceSwitch.cfg.HtlcNotifier,
SyncStates: syncStates,
GetAliases: getAliases,
HodlMask: hodl.MaskFromFlags(hodlFlags...),
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
MaxFeeAllocation: DefaultMaxLinkFeeAllocation,
NotifyActiveLink: func(wire.OutPoint) {},
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
NotifyInactiveLinkEvent: func(wire.OutPoint) {},
HtlcNotifier: aliceSwitch.cfg.HtlcNotifier,
SyncStates: syncStates,
GetAliases: getAliases,
}
aliceLink := NewChannelLink(aliceCfg, aliceChannel)

View File

@ -2293,6 +2293,8 @@ func (s *Switch) AddLink(link ChannelLink) error {
link.attachFailAliasUpdate(s.failAliasUpdate)
if err := link.Start(); err != nil {
log.Errorf("AddLink failed to start link with chanID=%v: %v",
chanID, err)
s.removeLink(chanID)
return err
}

View File

@ -1159,6 +1159,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
NotifyActiveLink: func(wire.OutPoint) {},
NotifyActiveChannel: func(wire.OutPoint) {},
NotifyInactiveChannel: func(wire.OutPoint) {},
NotifyInactiveLinkEvent: func(wire.OutPoint) {},
HtlcNotifier: server.htlcSwitch.cfg.HtlcNotifier,
GetAliases: getAliases,
},

View File

@ -39,6 +39,7 @@ import (
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/ticker"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
)
@ -461,6 +462,15 @@ type Brontide struct {
// peer's chansync message with its own over and over again.
resentChanSyncMsg map[lnwire.ChannelID]struct{}
// channelEventClient is the channel event subscription client that's
// used to assist retry enabling the channels. This client is only
// created when the reenableTimeout is no greater than 1 minute. Once
// created, it is canceled once the reenabling has been finished.
//
// NOTE: we choose to create the client conditionally to avoid
// potentially holding lots of un-consumed events.
channelEventClient *subscribe.Client
queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
@ -592,6 +602,17 @@ func (p *Brontide) Start() error {
p.log.Debugf("Loaded %v active channels from database",
len(activeChans))
// Conditionally subscribe to channel events before loading channels so
// we won't miss events. This subscription is used to listen to active
// channel event when reenabling channels. Once the reenabling process
// is finished, this subscription will be canceled.
//
// NOTE: ChannelNotifier must be started before subscribing events
// otherwise we'd panic here.
if err := p.attachChannelEventSubscription(); err != nil {
return err
}
msgs, err := p.loadActiveChannels(activeChans)
if err != nil {
return fmt.Errorf("unable to load channels: %v", err)
@ -941,6 +962,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
towerClient = p.cfg.TowerClient
}
//nolint:lll
linkCfg := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.cfg.Sphinx.DecodeHopIterators,
@ -976,6 +998,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
NotifyActiveLink: p.cfg.ChannelNotifier.NotifyActiveLinkEvent,
NotifyActiveChannel: p.cfg.ChannelNotifier.NotifyActiveChannelEvent,
NotifyInactiveChannel: p.cfg.ChannelNotifier.NotifyInactiveChannelEvent,
NotifyInactiveLinkEvent: p.cfg.ChannelNotifier.NotifyInactiveLinkEvent,
HtlcNotifier: p.cfg.HtlcNotifier,
GetAliases: p.cfg.GetAliases,
}
@ -2480,6 +2503,16 @@ out:
// select will ignore this case entirely.
reenableTimeout = nil
// Once the reenabling is attempted, we also cancel the
// channel event subscription to free up the overflow
// queue used in channel notifier.
//
// NOTE: channelEventClient will be nil if the
// reenableTimeout is greater than 1 minute.
if p.channelEventClient != nil {
p.channelEventClient.Cancel()
}
case <-p.quit:
// As, we've been signalled to exit, we'll reset all
// our active channel back to their default state.
@ -2506,48 +2539,62 @@ out:
func (p *Brontide) reenableActiveChannels() {
// First, filter all known channels with this peer for ones that are
// both public and not pending.
var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock()
for chanID, lnChan := range p.activeChannels {
// If the lnChan is nil, continue as this is a pending channel.
if lnChan == nil {
continue
}
activePublicChans := p.filterChannelsToEnable()
dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending {
continue
}
// We'll also skip any channels added during this peer's
// lifecycle since they haven't waited out the timeout. Their
// first announcement will be enabled, and the chan status
// manager will begin monitoring them passively since they exist
// in the database.
if _, ok := p.addedChannels[chanID]; ok {
continue
}
activePublicChans = append(
activePublicChans, dbChan.FundingOutpoint,
)
}
p.activeChanMtx.RUnlock()
// Create a map to hold channels that needs to be retried.
retryChans := make(map[wire.OutPoint]struct{}, len(activePublicChans))
// For each of the public, non-pending channels, set the channel
// disabled bit to false and send out a new ChannelUpdate. If this
// channel is already active, the update won't be sent.
for _, chanPoint := range activePublicChans {
err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
if err == netann.ErrEnableManuallyDisabledChan {
p.log.Debugf("Channel(%v) was manually disabled, ignoring "+
"automatic enable request", chanPoint)
} else if err != nil {
p.log.Errorf("Unable to enable channel %v: %v",
chanPoint, err)
switch {
// No error occurred, continue to request the next channel.
case err == nil:
continue
// Cannot auto enable a manually disabled channel so we do
// nothing but proceed to the next channel.
case errors.Is(err, netann.ErrEnableManuallyDisabledChan):
p.log.Debugf("Channel(%v) was manually disabled, "+
"ignoring automatic enable request", chanPoint)
continue
// If the channel is reported as inactive, we will give it
// another chance. When handling the request, ChanStatusManager
// will check whether the link is active or not. One of the
// conditions is whether the link has been marked as
// reestablished, which happens inside a goroutine(htlcManager)
// after the link is started. And we may get a false negative
// saying the link is not active because that goroutine hasn't
// reached the line to mark the reestablishment. Thus we give
// it a second chance to send the request.
case errors.Is(err, netann.ErrEnableInactiveChan):
// If we don't have a client created, it means we
// shouldn't retry enabling the channel.
if p.channelEventClient == nil {
p.log.Errorf("Channel(%v) request enabling "+
"failed due to inactive link",
chanPoint)
continue
}
p.log.Warnf("Channel(%v) cannot be enabled as " +
"ChanStatusManager reported inactive, retrying")
// Add the channel to the retry map.
retryChans[chanPoint] = struct{}{}
}
}
// Retry the channels if we have any.
if len(retryChans) != 0 {
p.retryRequestEnable(retryChans)
}
}
// fetchActiveChanCloser attempts to fetch the active chan closer state machine
@ -2626,6 +2673,136 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
return chanCloser, nil
}
// filterChannelsToEnable filters a list of channels to be enabled upon start.
// The filtered channels are active channels that's neither private nor
// pending.
func (p *Brontide) filterChannelsToEnable() []wire.OutPoint {
var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock()
defer p.activeChanMtx.RUnlock()
for chanID, lnChan := range p.activeChannels {
// If the lnChan is nil, continue as this is a pending channel.
if lnChan == nil {
continue
}
dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending {
continue
}
// We'll also skip any channels added during this peer's
// lifecycle since they haven't waited out the timeout. Their
// first announcement will be enabled, and the chan status
// manager will begin monitoring them passively since they exist
// in the database.
if _, ok := p.addedChannels[chanID]; ok {
continue
}
activePublicChans = append(
activePublicChans, dbChan.FundingOutpoint,
)
}
return activePublicChans
}
// retryRequestEnable takes a map of channel outpoints and a channel event
// client. It listens to the channel events and removes a channel from the map
// if it's matched to the event. Upon receiving an active channel event, it
// will send the enabling request again.
func (p *Brontide) retryRequestEnable(activeChans map[wire.OutPoint]struct{}) {
p.log.Debugf("Retry enabling %v channels", len(activeChans))
// retryEnable is a helper closure that sends an enable request and
// removes the channel from the map if it's matched.
retryEnable := func(chanPoint wire.OutPoint) error {
// If this is an active channel event, check whether it's in
// our targeted channels map.
_, found := activeChans[chanPoint]
// If this channel is irrelevant, return nil so the loop can
// jump to next iteration.
if !found {
return nil
}
// Otherwise we've just received an active signal for a channel
// that's previously failed to be enabled, we send the request
// again.
//
// We only give the channel one more shot, so we delete it from
// our map first to keep it from being attempted again.
delete(activeChans, chanPoint)
// Send the request.
err := p.cfg.ChanStatusMgr.RequestEnable(chanPoint, false)
if err != nil {
return fmt.Errorf("request enabling channel %v "+
"failed: %w", chanPoint, err)
}
return nil
}
for {
// If activeChans is empty, we've done processing all the
// channels.
if len(activeChans) == 0 {
p.log.Debug("Finished retry enabling channels")
return
}
select {
// A new event has been sent by the ChannelNotifier. We now
// check whether it's an active or inactive channel event.
case e := <-p.channelEventClient.Updates():
// If this is an active channel event, try enable the
// channel then jump to the next iteration.
active, ok := e.(channelnotifier.ActiveChannelEvent)
if ok {
chanPoint := *active.ChannelPoint
// If we received an error for this particular
// channel, we log an error and won't quit as
// we still want to retry other channels.
if err := retryEnable(chanPoint); err != nil {
p.log.Errorf("Retry failed: %v", err)
}
continue
}
// Otherwise check for inactive link event, and jump to
// next iteration if it's not.
inactive, ok := e.(channelnotifier.InactiveLinkEvent)
if !ok {
continue
}
// Found an inactive link event, if this is our
// targeted channel, remove it from our map.
chanPoint := *inactive.ChannelPoint
_, found := activeChans[chanPoint]
if !found {
continue
}
delete(activeChans, chanPoint)
p.log.Warnf("Re-enable channel %v failed, received "+
"inactive link event", chanPoint)
case <-p.quit:
p.log.Debugf("Peer shutdown during retry enabling")
return
}
}
}
// chooseDeliveryScript takes two optionally set shutdown scripts and returns
// a suitable script to close out to. This may be nil if neither script is
// set. If both scripts are set, this function will error if they do not match.
@ -3524,3 +3701,29 @@ func (p *Brontide) LastRemotePingPayload() []byte {
return pingBytes
}
// attachChannelEventSubscription creates a channel event subscription and
// attaches to client to Brontide if the reenableTimeout is no greater than 1
// minute.
func (p *Brontide) attachChannelEventSubscription() error {
// If the timeout is greater than 1 minute, it's unlikely that the link
// hasn't yet finished its reestablishment. Return a nil without
// creating the client to specify that we don't want to retry.
if p.cfg.ChanActiveTimeout > 1*time.Minute {
return nil
}
// When the reenable timeout is less than 1 minute, it's likely the
// channel link hasn't finished its reestablishment yet. In that case,
// we'll give it a second chance by subscribing to the channel update
// events. Upon receiving the `ActiveLinkEvent`, we'll then request
// enabling the channel again.
sub, err := p.cfg.ChannelNotifier.SubscribeChannelEvents()
if err != nil {
return fmt.Errorf("SubscribeChannelEvents failed: %w", err)
}
p.channelEventClient = sub
return nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntest/mock"
@ -1035,6 +1036,14 @@ func TestPeerCustomMessage(t *testing.T) {
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
alicePeer := NewBrontide(Config{
PubKeyBytes: remoteKey,
ChannelDB: dbAlice.ChannelStateDB(),
@ -1057,7 +1066,8 @@ func TestPeerCustomMessage(t *testing.T) {
}
return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
PongBuf: make([]byte, lnwire.MaxPongBytes),
ChannelNotifier: channelNotifier,
})
// Set up the init sequence.

View File

@ -16,6 +16,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
@ -377,6 +378,14 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
return nil, nil, err
}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
cfg := &Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
@ -392,6 +401,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
ChanStatusMgr: chanStatusMgr,
Features: lnwire.NewFeatureVector(nil, lnwire.Features),
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
ChannelNotifier: channelNotifier,
}
alicePeer := NewBrontide(*cfg)

View File

@ -4624,9 +4624,12 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
},
}
// Completely ignore ActiveLinkEvent as this is explicitly not
// exposed to the RPC.
case channelnotifier.ActiveLinkEvent:
// Completely ignore ActiveLinkEvent and
// InactiveLinkEvent as this is explicitly not exposed
// to the RPC.
case channelnotifier.ActiveLinkEvent,
channelnotifier.InactiveLinkEvent:
continue
case channelnotifier.FullyResolvedChannelEvent:
@ -7323,6 +7326,8 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
continue
case channelnotifier.ActiveLinkEvent:
continue
case channelnotifier.InactiveLinkEvent:
continue
}
// Now that we know the channel state has changed,