From 927572583b04294d72d2f6a85bb3fe02f48ffc0d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 8 Jun 2023 19:42:07 +0800 Subject: [PATCH] multi: remove pending channel from Brontide when funding flow failed This commit adds a new interface method, `RemovePendingChannel`, to be used when the funding flow is failed after calling `AddPendingChannel` such that the Brontide has the most up-to-date view of the active channels. --- discovery/mock_test.go | 4 +++ funding/manager.go | 14 ++++++++- funding/manager_test.go | 4 +++ htlcswitch/link_test.go | 4 +++ htlcswitch/mock.go | 4 +++ lnpeer/peer.go | 3 ++ peer/brontide.go | 65 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 97 insertions(+), 1 deletion(-) diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 3d3ee8229..446999233 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -69,6 +69,10 @@ func (p *mockPeer) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (p *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + // mockMessageStore is an in-memory implementation of the MessageStore interface // used for the gossiper's unit tests. type mockMessageStore struct { diff --git a/funding/manager.go b/funding/manager.go index df4c060ed..f4452470b 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -894,9 +894,21 @@ func (c *chanIdentifier) hasChanID() bool { func (f *Manager) failFundingFlow(peer lnpeer.Peer, cid *chanIdentifier, fundingErr error) { - log.Debugf("Failing funding flow for pending_id=%x: %v", + log.Debugf("Failing funding flow for pending_id=%v: %v", cid.tempChanID, fundingErr) + // First, notify Brontide to remove the pending channel. + // + // NOTE: depending on where we fail the flow, we may not have the + // active channel ID yet. + if cid.hasChanID() { + err := peer.RemovePendingChannel(cid.chanID) + if err != nil { + log.Errorf("Unable to remove channel %v with peer %x: "+ + "%v", cid, peer.IdentityKey(), err) + } + } + ctx, err := f.cancelReservationCtx( peer.IdentityKey(), cid.tempChanID, false, ) diff --git a/funding/manager_test.go b/funding/manager_test.go index 0360bd4b5..73b6fd1d3 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -327,6 +327,10 @@ func (n *testNode) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (n *testNode) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + func createTestWallet(cdb *channeldb.ChannelStateDB, netParams *chaincfg.Params, notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController, signer input.Signer, keyRing keychain.SecretKeyRing, diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index e691ecec1..35eb6a9ea 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1894,6 +1894,10 @@ func (m *mockPeer) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (m *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount) ( ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error, func() (*lnwallet.LightningChannel, error), error) { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e339d8d52..a17045b1e 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -678,6 +678,10 @@ func (s *mockServer) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (s *mockServer) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + func (s *mockServer) WipeChannel(*wire.OutPoint) {} func (s *mockServer) LocalFeatures() *lnwire.FeatureVector { diff --git a/lnpeer/peer.go b/lnpeer/peer.go index c9217ce82..f66991e42 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -31,6 +31,9 @@ type Peer interface { // channel should fail to be added if the cancel chan is closed. AddPendingChannel(cid lnwire.ChannelID, cancel <-chan struct{}) error + // RemovePendingChannel removes a pending open channel ID to the peer. + RemovePendingChannel(cid lnwire.ChannelID) error + // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. WipeChannel(*wire.OutPoint) diff --git a/peer/brontide.go b/peer/brontide.go index 21c00ff25..e85e3d8b0 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -436,6 +436,10 @@ type Brontide struct { // channels to the source peer which handled the funding workflow. newPendingChannel chan *newChannelMsg + // removePendingChannel is used by the fundingManager to cancel pending + // open channels to the source peer when the funding flow is failed. + removePendingChannel chan *newChannelMsg + // activeMsgStreams is a map from channel id to the channel streams that // proxy messages to individual, active links. activeMsgStreams map[lnwire.ChannelID]*msgStream @@ -2393,6 +2397,11 @@ out: case req := <-p.newActiveChannel: p.handleNewActiveChannel(req) + // The funding flow for a pending channel is failed, we will + // remove it from Brontide. + case req := <-p.removePendingChannel: + p.handleRemovePendingChannel(req) + // We've just received a local request to close an active // channel. It will either kick of a cooperative channel // closure negotiation, or be a notification of a breached @@ -3527,6 +3536,34 @@ func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID, } } +// RemovePendingChannel removes a pending open channel from the peer. +// +// NOTE: Part of the lnpeer.Peer interface. +func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error { + errChan := make(chan error, 1) + newChanMsg := &newChannelMsg{ + channelID: cid, + err: errChan, + } + + select { + case p.removePendingChannel <- newChanMsg: + case <-p.quit: + return lnpeer.ErrPeerExiting + } + + // We pause here to wait for the peer to respond to the cancellation of + // the pending channel before we close the channel barrier + // corresponding to the channel. + select { + case err := <-errChan: + return err + + case <-p.quit: + return lnpeer.ErrPeerExiting + } +} + // StartTime returns the time at which the connection was established if the // peer started successfully, and zero otherwise. func (p *Brontide) StartTime() time.Time { @@ -3871,6 +3908,34 @@ func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) { p.addedChannels.Store(chanID, struct{}{}) } +// handleRemovePendingChannel takes a `newChannelMsg` request and removes it +// from `activeChannels` map. The request will be ignored if the channel is +// considered active by Brontide. Noop if the channel ID cannot be found. +func (p *Brontide) handleRemovePendingChannel(req *newChannelMsg) { + defer close(req.err) + + chanID := req.channelID + + // If we already have this channel, something is wrong with the funding + // flow as it will only be marked as active after `ChannelReady` is + // handled. In this case, we will log an error and exit. + if p.isActiveChannel(chanID) { + p.log.Errorf("Channel(%v) is active, ignoring remove request", + chanID) + return + } + + // The channel has not been added yet, we will log a warning as there + // is an unexpected call from funding manager. + if !p.isPendingChannel(chanID) { + p.log.Warnf("Channel(%v) not found, removing it anyway", chanID) + } + + // Remove the record of this pending channel. + p.activeChannels.Delete(chanID) + p.addedChannels.Delete(chanID) +} + // sendLinkUpdateMsg sends a message that updates the channel to the // channel's message stream. func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {