From f39c568c9477d9cfe979b1f3f67b90a6968e1d8a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Mar 2023 21:24:04 +0800 Subject: [PATCH 01/11] peer: track pending channels in `Brontide` This commit adds a new channel `newPendingChannel` and its dedicated handler `handleNewPendingChannel` to keep track of pending open channels. This should not affect the original handling of new active channels, except `addedChannels` is now updated in `handleNewPendingChannel` such that this new pending channel won't be reestablished in link. --- peer/brontide.go | 96 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 591f6601a..d430f41b0 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -88,8 +88,13 @@ type outgoingMsg struct { // the receiver of the request to report when the channel creation process has // completed. type newChannelMsg struct { + // channel is used when the pending channel becomes active. channel *channeldb.OpenChannel - err chan error + + // channelID is used when there's a new pending channel. + channelID lnwire.ChannelID + + err chan error } type customMsg struct { @@ -427,6 +432,10 @@ type Brontide struct { // channels to the source peer which handled the funding workflow. newActiveChannel chan *newChannelMsg + // newPendingChannel is used by the fundingManager to send pending open + // channels to the source peer which handled the funding workflow. + newPendingChannel chan *newChannelMsg + // activeMsgStreams is a map from channel id to the channel streams that // proxy messages to individual, active links. activeMsgStreams map[lnwire.ChannelID]*msgStream @@ -493,7 +502,8 @@ func NewBrontide(cfg Config) *Brontide { activeChannels: &lnutils.SyncMap[ lnwire.ChannelID, *lnwallet.LightningChannel, ]{}, - newActiveChannel: make(chan *newChannelMsg, 1), + newActiveChannel: make(chan *newChannelMsg, 1), + newPendingChannel: make(chan *newChannelMsg, 1), activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser), @@ -2382,6 +2392,14 @@ func (p *Brontide) channelManager() { out: for { select { + // A new pending channel has arrived which means we are about + // to complete a funding workflow and is waiting for the final + // `ChannelReady` messages to be exchanged. We will add this + // channel to the `activeChannels` with a nil value to indicate + // this is a pending channel. + case req := <-p.newPendingChannel: + p.handleNewPendingChannel(req) + // A new channel has arrived which means we've just completed a // funding workflow. We'll initialize the necessary local // state, and notify the htlc switch of a new link. @@ -3484,6 +3502,44 @@ func (p *Brontide) AddNewChannel(channel *channeldb.OpenChannel, } } +// AddPendingChannel adds a pending open channel to the peer. The channel +// should fail to be added if the cancel channel is closed. +// +// NOTE: Part of the lnpeer.Peer interface. +func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID, + cancel <-chan struct{}) error { + + errChan := make(chan error, 1) + newChanMsg := &newChannelMsg{ + channelID: cid, + err: errChan, + } + + select { + case p.newPendingChannel <- newChanMsg: + + case <-cancel: + return errors.New("canceled adding pending channel") + + case <-p.quit: + return lnpeer.ErrPeerExiting + } + + // We pause here to wait for the peer to recognize the new pending + // channel before we close the channel barrier corresponding to the + // channel. + select { + case err := <-errChan: + return err + + case <-cancel: + return errors.New("canceled adding pending channel") + + case <-p.quit: + return lnpeer.ErrPeerExiting + } +} + // StartTime returns the time at which the connection was established if the // peer started successfully, and zero otherwise. func (p *Brontide) StartTime() time.Time { @@ -3718,7 +3774,6 @@ func (p *Brontide) addActiveChannel(c *channeldb.OpenChannel) error { // Store the channel in the activeChannels map. p.activeChannels.Store(chanID, lnChan) - p.addedChannels.Store(chanID, struct{}{}) p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint) @@ -3793,3 +3848,38 @@ func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) { // Close the err chan if everything went fine. close(req.err) } + +// handleNewPendingChannel takes a `newChannelMsg` request and add it to +// `activeChannels` map with nil value. This pending channel will be saved as +// it may become active in the future. Once active, the funding manager will +// send it again via `AddNewChannel`, and we'd handle the link creation there. +func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) { + defer close(req.err) + + chanID := req.channelID + + // If we already have this channel, something is wrong with the funding + // flow as it will only be marked as active after `ChannelReady` is + // handled. In this case, we will do nothing but log an error, just in + // case this is a legit channel. + if p.isActiveChannel(chanID) { + p.log.Errorf("Channel(%v) is already active, ignoring "+ + "pending channel request", chanID) + + return + } + + // The channel has already been added, we will do nothing and return. + if p.isPendingChannel(chanID) { + p.log.Infof("Channel(%v) is already added, ignoring "+ + "pending channel request", chanID) + + return + } + + // This is a new channel, we now add it to the map `activeChannels` + // with nil value and mark it as a newly added channel in + // `addedChannels`. + p.activeChannels.Store(chanID, nil) + p.addedChannels.Store(chanID, struct{}{}) +} From e46bd8e1779d70bd1ec41b51d3d67393f4a72910 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Mar 2023 21:25:17 +0800 Subject: [PATCH 02/11] multi: add `AddPendingChannel` to peer interface The funding manager has been updated to use `AddPendingChannel`. Note that we track the pending channel before it's confirmed as the peer may have a block height in the future(from our view), thus they may start operating in this channel before we consider it as fully open. The mocked peers have been updated to implement the new interface method. --- discovery/mock_test.go | 6 ++++++ funding/manager.go | 19 ++++++++++++++++++- funding/manager_test.go | 6 ++++++ htlcswitch/link_test.go | 6 ++++++ htlcswitch/mock.go | 6 ++++++ lnpeer/peer.go | 4 ++++ 6 files changed, 46 insertions(+), 1 deletion(-) diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 0040ab8b6..3d3ee8229 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -63,6 +63,12 @@ func (p *mockPeer) RemoteFeatures() *lnwire.FeatureVector { return nil } +func (p *mockPeer) AddPendingChannel(_ lnwire.ChannelID, + _ <-chan struct{}) error { + + return nil +} + // mockMessageStore is an in-memory implementation of the MessageStore interface // used for the gossiper's unit tests. type mockMessageStore struct { diff --git a/funding/manager.go b/funding/manager.go index f243a8037..554c3ab06 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -2172,7 +2172,16 @@ func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx, log.Infof("Generated ChannelPoint(%v) for pending_id(%x)", outPoint, pendingChanID[:]) - var err error + // Before sending FundingCreated sent, we notify Brontide to keep track + // of this pending open channel. + err := resCtx.peer.AddPendingChannel(channelID, f.quit) + if err != nil { + pubKey := resCtx.peer.IdentityKey().SerializeCompressed() + log.Errorf("Unable to add pending channel %v with peer %x: %v", + channelID, pubKey, err) + } + + // Send the FundingCreated msg. fundingCreated := &lnwire.FundingCreated{ PendingChannelID: pendingChanID, FundingPoint: *outPoint, @@ -2294,6 +2303,14 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, return } + // Before sending FundingSigned, we notify Brontide first to keep track + // of this pending open channel. + if err := peer.AddPendingChannel(channelID, f.quit); err != nil { + pubKey := peer.IdentityKey().SerializeCompressed() + log.Errorf("Unable to add pending channel %v with peer %x: %v", + channelID, pubKey, err) + } + fundingSigned := &lnwire.FundingSigned{ ChanID: channelID, CommitSig: ourCommitSig, diff --git a/funding/manager_test.go b/funding/manager_test.go index 1ca66904b..0360bd4b5 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -321,6 +321,12 @@ func (n *testNode) AddNewChannel(channel *channeldb.OpenChannel, } } +func (n *testNode) AddPendingChannel(_ lnwire.ChannelID, + quit <-chan struct{}) error { + + return nil +} + func createTestWallet(cdb *channeldb.ChannelStateDB, netParams *chaincfg.Params, notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController, signer input.Signer, keyRing keychain.SecretKeyRing, diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 6817ff930..e691ecec1 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1888,6 +1888,12 @@ func (m *mockPeer) RemoteFeatures() *lnwire.FeatureVector { return nil } +func (m *mockPeer) AddPendingChannel(_ lnwire.ChannelID, + _ <-chan struct{}) error { + + return nil +} + func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount) ( ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error, func() (*lnwallet.LightningChannel, error), error) { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index bb36eda20..e339d8d52 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -672,6 +672,12 @@ func (s *mockServer) AddNewChannel(channel *channeldb.OpenChannel, return nil } +func (s *mockServer) AddPendingChannel(_ lnwire.ChannelID, + cancel <-chan struct{}) error { + + return nil +} + func (s *mockServer) WipeChannel(*wire.OutPoint) {} func (s *mockServer) LocalFeatures() *lnwire.FeatureVector { diff --git a/lnpeer/peer.go b/lnpeer/peer.go index 465a41cb9..c9217ce82 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -27,6 +27,10 @@ type Peer interface { // to be added if the cancel channel is closed. AddNewChannel(channel *channeldb.OpenChannel, cancel <-chan struct{}) error + // AddPendingChannel adds a pending open channel ID to the peer. The + // channel should fail to be added if the cancel chan is closed. + AddPendingChannel(cid lnwire.ChannelID, cancel <-chan struct{}) error + // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. WipeChannel(*wire.OutPoint) From 3eb7f54a6d635fb3236d251eabc56767348b52ad Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Mar 2023 12:32:42 +0800 Subject: [PATCH 03/11] peer: add method `handleLinkUpdateMsg` to handle channel update msgs --- peer/brontide.go | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index d430f41b0..21c00ff25 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1657,20 +1657,7 @@ out: if isLinkUpdate { // If this is a channel update, then we need to feed it // into the channel's in-order message stream. - chanStream, ok := p.activeMsgStreams[targetChan] - if !ok { - // If a stream hasn't yet been created, then - // we'll do so, add it to the map, and finally - // start it. - chanStream = newChanMsgStream(p, targetChan) - p.activeMsgStreams[targetChan] = chanStream - chanStream.Start() - defer chanStream.Stop() - } - - // With the stream obtained, add the message to the - // stream so we can continue processing message. - chanStream.AddMsg(nextMsg) + p.sendLinkUpdateMsg(targetChan, nextMsg) } idleTimer.Reset(idleTimeout) @@ -3883,3 +3870,28 @@ func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) { p.activeChannels.Store(chanID, nil) p.addedChannels.Store(chanID, struct{}{}) } + +// sendLinkUpdateMsg sends a message that updates the channel to the +// channel's message stream. +func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) { + p.log.Tracef("Sending link update msg=%v", msg.MsgType()) + + chanStream, ok := p.activeMsgStreams[cid] + if !ok { + // If a stream hasn't yet been created, then we'll do so, add + // it to the map, and finally start it. + chanStream = newChanMsgStream(p, cid) + p.activeMsgStreams[cid] = chanStream + chanStream.Start() + + // Stop the stream when quit. + go func() { + <-p.quit + chanStream.Stop() + }() + } + + // With the stream obtained, add the message to the stream so we can + // continue processing message. + chanStream.AddMsg(msg) +} From 048d7d7c36f7e119e6b7f83bd6f3103828a4b75c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Mar 2023 21:59:15 +0800 Subject: [PATCH 04/11] docs: update release note for race fix --- docs/release-notes/release-notes-0.17.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.17.0.md b/docs/release-notes/release-notes-0.17.0.md index 6cd76fb79..883c46376 100644 --- a/docs/release-notes/release-notes-0.17.0.md +++ b/docs/release-notes/release-notes-0.17.0.md @@ -216,6 +216,9 @@ compare the default values between lnd and sample-lnd.conf. creation](https://github.com/lightningnetwork/lnd/pull/7856) that can arise under rare scenarios. +- A race condition found between `channel_ready` and link updates is [now + fixed](https://github.com/lightningnetwork/lnd/pull/7518). + ### Tooling and documentation * Add support for [custom `RPCHOST` and From 3ed579d06f7e0cdd58ad9183af4568071ea8d4d3 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 27 Jul 2023 19:21:39 +0200 Subject: [PATCH 05/11] funding: make `failFundingFlow` takes both channel IDs This commit adds a new struct `chanIdentifier` which wraps the pending channel ID and active channel ID. This struct is then used in `failFundingFlow` so the channel ID can be access there. --- funding/manager.go | 258 ++++++++++++++++++++++++++++----------------- 1 file changed, 160 insertions(+), 98 deletions(-) diff --git a/funding/manager.go b/funding/manager.go index 554c3ab06..df4c060ed 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -842,6 +842,48 @@ func (f *Manager) CancelPeerReservations(nodePub [33]byte) { delete(f.activeReservations, nodePub) } +// chanIdentifier wraps pending channel ID and channel ID into one struct so +// it's easier to identify a specific channel. +// +// TODO(yy): move to a different package to hide the private fields so direct +// access is disabled. +type chanIdentifier struct { + // tempChanID is the pending channel ID created by the funder when + // initializing the funding flow. For fundee, it's received from the + // `open_channel` message. + tempChanID lnwire.ChannelID + + // chanID is the channel ID created by the funder once the + // `accept_channel` message is received. For fundee, it's received from + // the `funding_created` message. + chanID lnwire.ChannelID + + // chanIDSet is a boolean indicates whether the active channel ID is + // set for this identifier. For zero conf channels, the `chanID` can be + // all-zero, which is the same as the empty value of `ChannelID`. To + // avoid the confusion, we use this boolean to explicitly signal + // whether the `chanID` is set or not. + chanIDSet bool +} + +// newChanIdentifier creates a new chanIdentifier. +func newChanIdentifier(tempChanID lnwire.ChannelID) *chanIdentifier { + return &chanIdentifier{ + tempChanID: tempChanID, + } +} + +// setChanID updates the `chanIdentifier` with the active channel ID. +func (c *chanIdentifier) setChanID(chanID lnwire.ChannelID) { + c.chanID = chanID + c.chanIDSet = true +} + +// hasChanID returns true if the active channel ID has been set. +func (c *chanIdentifier) hasChanID() bool { + return c.chanIDSet +} + // failFundingFlow will fail the active funding flow with the target peer, // identified by its unique temporary channel ID. This method will send an // error to the remote peer, and also remove the reservation from our set of @@ -849,14 +891,14 @@ func (f *Manager) CancelPeerReservations(nodePub [33]byte) { // // TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding // transaction, then all reservations should be cleared. -func (f *Manager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte, +func (f *Manager) failFundingFlow(peer lnpeer.Peer, cid *chanIdentifier, fundingErr error) { log.Debugf("Failing funding flow for pending_id=%x: %v", - tempChanID, fundingErr) + cid.tempChanID, fundingErr) ctx, err := f.cancelReservationCtx( - peer.IdentityKey(), tempChanID, false, + peer.IdentityKey(), cid.tempChanID, false, ) if err != nil { log.Errorf("unable to cancel reservation: %v", err) @@ -887,7 +929,7 @@ func (f *Manager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte, } errMsg := &lnwire.Error{ - ChanID: tempChanID, + ChanID: cid.tempChanID, Data: msg, } @@ -1325,13 +1367,14 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, } f.resMtx.RUnlock() + // Create the channel identifier. + cid := newChanIdentifier(msg.PendingChannelID) + // Also count the channels that are already pending. There we don't know // the underlying intent anymore, unfortunately. channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey) if err != nil { - f.failFundingFlow( - peer, msg.PendingChannelID, err, - ) + f.failFundingFlow(peer, cid, err) return } @@ -1350,29 +1393,20 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, // TODO(roasbeef): modify to only accept a _single_ pending channel per // block unless white listed if numPending >= f.cfg.MaxPendingChannels { - f.failFundingFlow( - peer, msg.PendingChannelID, - lnwire.ErrMaxPendingChannels, - ) + f.failFundingFlow(peer, cid, lnwire.ErrMaxPendingChannels) + return } // Ensure that the pendingChansLimit is respected. pendingChans, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels() if err != nil { - f.failFundingFlow( - peer, msg.PendingChannelID, err, - ) - + f.failFundingFlow(peer, cid, err) return } if len(pendingChans) > pendingChansLimit { - f.failFundingFlow( - peer, msg.PendingChannelID, - lnwire.ErrMaxPendingChannels, - ) - + f.failFundingFlow(peer, cid, lnwire.ErrMaxPendingChannels) return } @@ -1385,17 +1419,14 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, log.Errorf("unable to query wallet: %v", err) } err := errors.New("Synchronizing blockchain") - f.failFundingFlow( - peer, msg.PendingChannelID, - err, - ) + f.failFundingFlow(peer, cid, err) return } // Ensure that the remote party respects our maximum channel size. if amt > f.cfg.MaxChanSize { f.failFundingFlow( - peer, msg.PendingChannelID, + peer, cid, lnwallet.ErrChanTooLarge(amt, f.cfg.MaxChanSize), ) return @@ -1405,7 +1436,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, // a channel that's below our current min channel size. if amt < f.cfg.MinChanSize { f.failFundingFlow( - peer, msg.PendingChannelID, + peer, cid, lnwallet.ErrChanTooSmall(amt, f.cfg.MinChanSize), ) return @@ -1414,10 +1445,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, // If request specifies non-zero push amount and 'rejectpush' is set, // signal an error. if f.cfg.RejectPush && msg.PushAmount > 0 { - f.failFundingFlow( - peer, msg.PendingChannelID, - lnwallet.ErrNonZeroPushAmount(), - ) + f.failFundingFlow(peer, cid, lnwallet.ErrNonZeroPushAmount()) return } @@ -1432,10 +1460,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, // the channel. acceptorResp := f.cfg.OpenChannelPredicate.Accept(chanReq) if acceptorResp.RejectChannel() { - f.failFundingFlow( - peer, msg.PendingChannelID, - acceptorResp.ChanAcceptError, - ) + f.failFundingFlow(peer, cid, acceptorResp.ChanAcceptError) return } @@ -1462,7 +1487,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, if err != nil { // TODO(roasbeef): should be using soft errors log.Errorf("channel type negotiation failed: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1498,9 +1523,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, // Fail the funding flow. flowErr := fmt.Errorf("channel acceptor blocked " + "zero-conf channel negotiation") - f.failFundingFlow( - peer, msg.PendingChannelID, flowErr, - ) + f.failFundingFlow(peer, cid, flowErr) return } @@ -1514,9 +1537,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, // Fail the funding flow. flowErr := fmt.Errorf("scid-alias feature " + "must be negotiated for zero-conf") - f.failFundingFlow( - peer, msg.PendingChannelID, flowErr, - ) + f.failFundingFlow(peer, cid, flowErr) return } @@ -1532,7 +1553,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, err = fmt.Errorf("option-scid-alias chantype for public " + "channel") log.Error(err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1557,7 +1578,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, reservation, err := f.cfg.Wallet.InitChannelReservation(req) if err != nil { log.Errorf("Unable to initialize reservation: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1571,7 +1592,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, aliasScid, err := f.cfg.AliasManager.RequestAlias() if err != nil { log.Errorf("Unable to request alias: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1612,7 +1633,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, ) if err != nil { log.Errorf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1626,7 +1647,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, ) if err != nil { f.failFundingFlow( - peer, msg.PendingChannelID, + peer, cid, fmt.Errorf("getUpfrontShutdownScript error: %v", err), ) return @@ -1638,7 +1659,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, if commitType == lnwallet.CommitmentTypeScriptEnforcedLease { if msg.LeaseExpiry == nil { err := errors.New("missing lease expiry") - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1651,9 +1672,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, reservation.LeaseExpiry() { err := errors.New("lease expiry mismatch") - f.failFundingFlow( - peer, msg.PendingChannelID, err, - ) + f.failFundingFlow(peer, cid, err) return } } @@ -1775,7 +1794,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, err = reservation.ProcessSingleContribution(remoteContribution) if err != nil { log.Errorf("unable to add contribution reservation: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1809,7 +1828,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, if err := peer.SendMessage(true, &fundingAccept); err != nil { log.Errorf("unable to send funding response to peer: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } } @@ -1840,6 +1859,9 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, log.Infof("Recv'd fundingResponse for pending_id(%x)", pendingChanID[:]) + // Create the channel identifier. + cid := newChanIdentifier(msg.PendingChannelID) + // Perform some basic validation of any custom TLV records included. // // TODO: Return errors as funding.Error to give context to remote peer? @@ -1849,14 +1871,14 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, if msg.ChannelType == nil { err := errors.New("explicit channel type not echoed " + "back") - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } proposedFeatures := lnwire.RawFeatureVector(*resCtx.channelType) ackedFeatures := lnwire.RawFeatureVector(*msg.ChannelType) if !proposedFeatures.Equals(&ackedFeatures) { err := errors.New("channel type mismatch") - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1866,18 +1888,14 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, if msg.LeaseExpiry == nil { err := errors.New("lease expiry not echoed " + "back") - f.failFundingFlow( - peer, msg.PendingChannelID, err, - ) + f.failFundingFlow(peer, cid, err) return } if uint32(*msg.LeaseExpiry) != resCtx.reservation.LeaseExpiry() { err := errors.New("lease expiry mismatch") - f.failFundingFlow( - peer, msg.PendingChannelID, err, - ) + f.failFundingFlow(peer, cid, err) return } } @@ -1899,7 +1917,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, ) if err != nil { err := errors.New("received unexpected channel type") - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1909,7 +1927,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, // it's another type, we fail the flow. if implicitChannelType != negotiatedChannelType { err := errors.New("negotiated unexpected channel type") - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } } @@ -1922,7 +1940,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, msg.MinAcceptDepth, chainntnfs.MaxNumConfs, ) log.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1930,7 +1948,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, if resCtx.reservation.IsZeroConf() && msg.MinAcceptDepth != 0 { err = fmt.Errorf("zero-conf channel has min_depth non-zero") log.Warn(err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1939,7 +1957,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, if !resCtx.reservation.IsZeroConf() && msg.MinAcceptDepth == 0 { err = fmt.Errorf("non-zero-conf channel has min depth zero") log.Warn(err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -1960,7 +1978,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, ) if err != nil { log.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2012,7 +2030,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, log.Errorf("Unable to process PSBT funding params "+ "for contribution from %x: %v", peerKeyBytes, err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } var buf bytes.Buffer @@ -2020,7 +2038,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, if err != nil { log.Errorf("Unable to serialize PSBT for "+ "contribution from %x: %v", peerKeyBytes, err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } resCtx.updates <- &lnrpc.OpenStatusUpdate{ @@ -2037,7 +2055,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, } else if err != nil { log.Errorf("Unable to process contribution from %x: %v", peerKeyBytes, err) - f.failFundingFlow(peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2056,7 +2074,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, go func() { defer f.wg.Done() - f.waitForPsbt(psbtIntent, resCtx, pendingChanID) + f.waitForPsbt(psbtIntent, resCtx, cid) }() // With the new goroutine spawned, we can now exit to unblock @@ -2066,7 +2084,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, // In a normal, non-PSBT funding flow, we can jump directly to the next // step where we expect our contribution to be finalized. - f.continueFundingAccept(resCtx, pendingChanID) + f.continueFundingAccept(resCtx, cid) } // waitForPsbt blocks until either a signed PSBT arrives, an error occurs or @@ -2075,7 +2093,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer, // // NOTE: This method must be called as a goroutine. func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, - resCtx *reservationWithCtx, pendingChanID [32]byte) { + resCtx *reservationWithCtx, cid *chanIdentifier) { // failFlow is a helper that logs an error message with the current // context and then fails the funding flow. @@ -2083,9 +2101,9 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, failFlow := func(errMsg string, cause error) { log.Errorf("Unable to handle funding accept message "+ "for peer_key=%x, pending_chan_id=%x: %s: %v", - peerKey.SerializeCompressed(), pendingChanID, errMsg, + peerKey.SerializeCompressed(), cid.tempChanID, errMsg, cause) - f.failFundingFlow(resCtx.peer, pendingChanID, cause) + f.failFundingFlow(resCtx.peer, cid, cause) } // We'll now wait until the intent has received the final and complete @@ -2106,7 +2124,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, case chanfunding.ErrRemoteCanceled: log.Infof("Remote canceled, aborting PSBT flow "+ "for peer_key=%x, pending_chan_id=%x", - peerKey.SerializeCompressed(), pendingChanID) + peerKey.SerializeCompressed(), cid.tempChanID) return // Nil error means the flow continues normally now. @@ -2128,7 +2146,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, } // We are now ready to continue the funding flow. - f.continueFundingAccept(resCtx, pendingChanID) + f.continueFundingAccept(resCtx, cid) // Handle a server shutdown as well because the reservation won't // survive a restart as it's in memory only. @@ -2136,7 +2154,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, log.Errorf("Unable to handle funding accept message "+ "for peer_key=%x, pending_chan_id=%x: funding manager "+ "shutting down", peerKey.SerializeCompressed(), - pendingChanID) + cid.tempChanID) return } } @@ -2145,7 +2163,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent, // contribution is finalized, the channel output is known and the funding // transaction is signed. func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx, - pendingChanID [32]byte) { + cid *chanIdentifier) { // Now that we have their contribution, we can extract, then send over // both the funding out point and our signature for their version of @@ -2166,11 +2184,11 @@ func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx, // so we can retrieve the reservation context once we get the // FundingSigned message. f.resMtx.Lock() - f.signedReservations[channelID] = pendingChanID + f.signedReservations[channelID] = cid.tempChanID f.resMtx.Unlock() log.Infof("Generated ChannelPoint(%v) for pending_id(%x)", outPoint, - pendingChanID[:]) + cid.tempChanID[:]) // Before sending FundingCreated sent, we notify Brontide to keep track // of this pending open channel. @@ -2181,20 +2199,25 @@ func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx, channelID, pubKey, err) } + // Once Brontide is aware of this channel, we need to set it in + // chanIdentifier so this channel will be removed from Brontide if the + // funding flow fails. + cid.setChanID(channelID) + // Send the FundingCreated msg. fundingCreated := &lnwire.FundingCreated{ - PendingChannelID: pendingChanID, + PendingChannelID: cid.tempChanID, FundingPoint: *outPoint, } fundingCreated.CommitSig, err = lnwire.NewSigFromSignature(sig) if err != nil { log.Errorf("Unable to parse signature: %v", err) - f.failFundingFlow(resCtx.peer, pendingChanID, err) + f.failFundingFlow(resCtx.peer, cid, err) return } if err := resCtx.peer.SendMessage(true, fundingCreated); err != nil { log.Errorf("Unable to send funding complete message: %v", err) - f.failFundingFlow(resCtx.peer, pendingChanID, err) + f.failFundingFlow(resCtx.peer, cid, err) return } } @@ -2225,10 +2248,13 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, log.Infof("completing pending_id(%x) with ChannelPoint(%v)", pendingChanID[:], fundingOut) + // Create the channel identifier without setting the active channel ID. + cid := newChanIdentifier(pendingChanID) + commitSig, err := msg.CommitSig.ToSignature() if err != nil { log.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2243,7 +2269,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, if err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. log.Errorf("unable to complete single reservation: %v", err) - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2252,7 +2278,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, // The channel is marked IsPending in the database, and can be removed // from the set of active reservations. - f.deleteReservationCtx(peerKey, msg.PendingChannelID) + f.deleteReservationCtx(peerKey, cid.tempChanID) // If something goes wrong before the funding transaction is confirmed, // we use this convenience method to delete the pending OpenChannel @@ -2298,7 +2324,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, ourCommitSig, err := lnwire.NewSigFromSignature(sig) if err != nil { log.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) deleteFromDatabase() return } @@ -2308,16 +2334,21 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, if err := peer.AddPendingChannel(channelID, f.quit); err != nil { pubKey := peer.IdentityKey().SerializeCompressed() log.Errorf("Unable to add pending channel %v with peer %x: %v", - channelID, pubKey, err) + cid.chanID, pubKey, err) } + // Once Brontide is aware of this channel, we need to set it in + // chanIdentifier so this channel will be removed from Brontide if the + // funding flow fails. + cid.setChanID(channelID) + fundingSigned := &lnwire.FundingSigned{ - ChanID: channelID, + ChanID: cid.chanID, CommitSig: ourCommitSig, } if err := peer.SendMessage(true, fundingSigned); err != nil { log.Errorf("unable to send FundingSigned message: %v", err) - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) deleteFromDatabase() return } @@ -2325,7 +2356,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, // With a permanent channel id established we can save the respective // forwarding policy in the database. In the channel announcement phase // this forwarding policy is retrieved and applied. - err = f.saveInitialFwdingPolicy(channelID, &forwardingPolicy) + err = f.saveInitialFwdingPolicy(cid.chanID, &forwardingPolicy) if err != nil { log.Errorf("Unable to store the forwarding policy: %v", err) } @@ -2341,7 +2372,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, // Create an entry in the local discovery map so we can ensure that we // process the channel confirmation fully before we receive a // channel_ready message. - f.localDiscoverySignals.Store(channelID, make(chan struct{})) + f.localDiscoverySignals.Store(cid.chanID, make(chan struct{})) // Inform the ChannelNotifier that the channel has entered // pending open state. @@ -2382,11 +2413,29 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer, pendingChanID, ok := f.signedReservations[msg.ChanID] delete(f.signedReservations, msg.ChanID) f.resMtx.Unlock() + + // Create the channel identifier and set the channel ID. + // + // NOTE: we may get an empty pending channel ID here if the key cannot + // be found, which means when we cancel the reservation context in + // `failFundingFlow`, we will get an error. In this case, we will send + // an error msg to our peer using the active channel ID. + // + // TODO(yy): refactor the funding flow to fix this case. + cid := newChanIdentifier(pendingChanID) + cid.setChanID(msg.ChanID) + + // If the pending channel ID is not found, fail the funding flow. if !ok { + // NOTE: we directly overwrite the pending channel ID here for + // this rare case since we don't have a valid pending channel + // ID. + cid.tempChanID = msg.ChanID + err := fmt.Errorf("unable to find signed reservation for "+ "chan_id=%x", msg.ChanID) log.Warnf(err.Error()) - f.failFundingFlow(peer, msg.ChanID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2396,7 +2445,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer, log.Warnf("Unable to find reservation (peer_id:%v, "+ "chan_id:%x)", peerKey, pendingChanID[:]) // TODO: add ErrChanNotFound? - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2421,7 +2470,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer, commitSig, err := msg.CommitSig.ToSignature() if err != nil { log.Errorf("Unable to parse signature: %v", err) - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2431,7 +2480,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer, if err != nil { log.Errorf("Unable to complete reservation sign "+ "complete: %v", err) - f.failFundingFlow(peer, pendingChanID, err) + f.failFundingFlow(peer, cid, err) return } @@ -2592,12 +2641,16 @@ func (f *Manager) fundingTimeout(c *channeldb.OpenChannel, "to come online: %v", err) } + // Create channel identifier and set the channel ID. + cid := newChanIdentifier(pendingID) + cid.setChanID(lnwire.NewChanIDFromOutPoint(&c.FundingOutpoint)) + // TODO(halseth): should this send be made // reliable? // The reservation won't exist at this point, but we'll send an // Error message over anyways with ChanID set to pendingID. - f.failFundingFlow(peer, pendingID, timeoutErr) + f.failFundingFlow(peer, cid, timeoutErr) }() return timeoutErr @@ -4403,7 +4456,16 @@ func (f *Manager) pruneZombieReservations() { resCtx.peer.IdentityKey().SerializeCompressed(), pendingChanID[:]) log.Warnf(err.Error()) - f.failFundingFlow(resCtx.peer, pendingChanID, err) + + chanID := lnwire.NewChanIDFromOutPoint( + resCtx.reservation.FundingOutpoint(), + ) + + // Create channel identifier and set the channel ID. + cid := newChanIdentifier(pendingChanID) + cid.setChanID(chanID) + + f.failFundingFlow(resCtx.peer, cid, err) } } From 927572583b04294d72d2f6a85bb3fe02f48ffc0d Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 8 Jun 2023 19:42:07 +0800 Subject: [PATCH 06/11] multi: remove pending channel from Brontide when funding flow failed This commit adds a new interface method, `RemovePendingChannel`, to be used when the funding flow is failed after calling `AddPendingChannel` such that the Brontide has the most up-to-date view of the active channels. --- discovery/mock_test.go | 4 +++ funding/manager.go | 14 ++++++++- funding/manager_test.go | 4 +++ htlcswitch/link_test.go | 4 +++ htlcswitch/mock.go | 4 +++ lnpeer/peer.go | 3 ++ peer/brontide.go | 65 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 97 insertions(+), 1 deletion(-) diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 3d3ee8229..446999233 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -69,6 +69,10 @@ func (p *mockPeer) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (p *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + // mockMessageStore is an in-memory implementation of the MessageStore interface // used for the gossiper's unit tests. type mockMessageStore struct { diff --git a/funding/manager.go b/funding/manager.go index df4c060ed..f4452470b 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -894,9 +894,21 @@ func (c *chanIdentifier) hasChanID() bool { func (f *Manager) failFundingFlow(peer lnpeer.Peer, cid *chanIdentifier, fundingErr error) { - log.Debugf("Failing funding flow for pending_id=%x: %v", + log.Debugf("Failing funding flow for pending_id=%v: %v", cid.tempChanID, fundingErr) + // First, notify Brontide to remove the pending channel. + // + // NOTE: depending on where we fail the flow, we may not have the + // active channel ID yet. + if cid.hasChanID() { + err := peer.RemovePendingChannel(cid.chanID) + if err != nil { + log.Errorf("Unable to remove channel %v with peer %x: "+ + "%v", cid, peer.IdentityKey(), err) + } + } + ctx, err := f.cancelReservationCtx( peer.IdentityKey(), cid.tempChanID, false, ) diff --git a/funding/manager_test.go b/funding/manager_test.go index 0360bd4b5..73b6fd1d3 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -327,6 +327,10 @@ func (n *testNode) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (n *testNode) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + func createTestWallet(cdb *channeldb.ChannelStateDB, netParams *chaincfg.Params, notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController, signer input.Signer, keyRing keychain.SecretKeyRing, diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index e691ecec1..35eb6a9ea 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1894,6 +1894,10 @@ func (m *mockPeer) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (m *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount) ( ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error, func() (*lnwallet.LightningChannel, error), error) { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e339d8d52..a17045b1e 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -678,6 +678,10 @@ func (s *mockServer) AddPendingChannel(_ lnwire.ChannelID, return nil } +func (s *mockServer) RemovePendingChannel(_ lnwire.ChannelID) error { + return nil +} + func (s *mockServer) WipeChannel(*wire.OutPoint) {} func (s *mockServer) LocalFeatures() *lnwire.FeatureVector { diff --git a/lnpeer/peer.go b/lnpeer/peer.go index c9217ce82..f66991e42 100644 --- a/lnpeer/peer.go +++ b/lnpeer/peer.go @@ -31,6 +31,9 @@ type Peer interface { // channel should fail to be added if the cancel chan is closed. AddPendingChannel(cid lnwire.ChannelID, cancel <-chan struct{}) error + // RemovePendingChannel removes a pending open channel ID to the peer. + RemovePendingChannel(cid lnwire.ChannelID) error + // WipeChannel removes the channel uniquely identified by its channel // point from all indexes associated with the peer. WipeChannel(*wire.OutPoint) diff --git a/peer/brontide.go b/peer/brontide.go index 21c00ff25..e85e3d8b0 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -436,6 +436,10 @@ type Brontide struct { // channels to the source peer which handled the funding workflow. newPendingChannel chan *newChannelMsg + // removePendingChannel is used by the fundingManager to cancel pending + // open channels to the source peer when the funding flow is failed. + removePendingChannel chan *newChannelMsg + // activeMsgStreams is a map from channel id to the channel streams that // proxy messages to individual, active links. activeMsgStreams map[lnwire.ChannelID]*msgStream @@ -2393,6 +2397,11 @@ out: case req := <-p.newActiveChannel: p.handleNewActiveChannel(req) + // The funding flow for a pending channel is failed, we will + // remove it from Brontide. + case req := <-p.removePendingChannel: + p.handleRemovePendingChannel(req) + // We've just received a local request to close an active // channel. It will either kick of a cooperative channel // closure negotiation, or be a notification of a breached @@ -3527,6 +3536,34 @@ func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID, } } +// RemovePendingChannel removes a pending open channel from the peer. +// +// NOTE: Part of the lnpeer.Peer interface. +func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error { + errChan := make(chan error, 1) + newChanMsg := &newChannelMsg{ + channelID: cid, + err: errChan, + } + + select { + case p.removePendingChannel <- newChanMsg: + case <-p.quit: + return lnpeer.ErrPeerExiting + } + + // We pause here to wait for the peer to respond to the cancellation of + // the pending channel before we close the channel barrier + // corresponding to the channel. + select { + case err := <-errChan: + return err + + case <-p.quit: + return lnpeer.ErrPeerExiting + } +} + // StartTime returns the time at which the connection was established if the // peer started successfully, and zero otherwise. func (p *Brontide) StartTime() time.Time { @@ -3871,6 +3908,34 @@ func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) { p.addedChannels.Store(chanID, struct{}{}) } +// handleRemovePendingChannel takes a `newChannelMsg` request and removes it +// from `activeChannels` map. The request will be ignored if the channel is +// considered active by Brontide. Noop if the channel ID cannot be found. +func (p *Brontide) handleRemovePendingChannel(req *newChannelMsg) { + defer close(req.err) + + chanID := req.channelID + + // If we already have this channel, something is wrong with the funding + // flow as it will only be marked as active after `ChannelReady` is + // handled. In this case, we will log an error and exit. + if p.isActiveChannel(chanID) { + p.log.Errorf("Channel(%v) is active, ignoring remove request", + chanID) + return + } + + // The channel has not been added yet, we will log a warning as there + // is an unexpected call from funding manager. + if !p.isPendingChannel(chanID) { + p.log.Warnf("Channel(%v) not found, removing it anyway", chanID) + } + + // Remove the record of this pending channel. + p.activeChannels.Delete(chanID) + p.addedChannels.Delete(chanID) +} + // sendLinkUpdateMsg sends a message that updates the channel to the // channel's message stream. func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) { From d28242c66417448774a45665f0da80f6034dac7c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 12 Jun 2023 23:41:20 +0800 Subject: [PATCH 07/11] peer: return an error from `updateNextRevocation` and patch unit tests This commit makes the `updateNextRevocation` to return an error and further feed it through the request's error chan so it'd be handled by the caller. --- peer/brontide.go | 23 ++++++++++++------- peer/brontide_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index e85e3d8b0..3e84042de 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -3734,7 +3734,7 @@ func (p *Brontide) attachChannelEventSubscription() error { // updateNextRevocation updates the existing channel's next revocation if it's // nil. -func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) { +func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) error { chanPoint := &c.FundingOutpoint chanID := lnwire.NewChanIDFromOutPoint(chanPoint) @@ -3744,32 +3744,35 @@ func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) { // currentChan should exist, but we perform a check anyway to avoid nil // pointer dereference. if !loaded { - p.log.Errorf("missing active channel with chanID=%v", chanID) - return + return fmt.Errorf("missing active channel with chanID=%v", + chanID) } // currentChan should not be nil, but we perform a check anyway to // avoid nil pointer dereference. if currentChan == nil { - p.log.Errorf("found nil active channel with chanID=%v", chanID) - return + return fmt.Errorf("found nil active channel with chanID=%v", + chanID) } // If we're being sent a new channel, and our existing channel doesn't // have the next revocation, then we need to update the current // existing channel. if currentChan.RemoteNextRevocation() != nil { - return + return nil } p.log.Infof("Processing retransmitted ChannelReady for "+ "ChannelPoint(%v)", chanPoint) nextRevoke := c.RemoteNextRevocation + err := currentChan.InitNextRevocation(nextRevoke) if err != nil { - p.log.Errorf("unable to init chan revocation: %v", err) + return fmt.Errorf("unable to init next revocation: %w", err) } + + return nil } // addActiveChannel adds a new active channel to the `activeChannels` map. It @@ -3855,7 +3858,11 @@ func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) { // Handle it and close the err chan on the request. close(req.err) - p.updateNextRevocation(newChan) + + // Update the next revocation point. + if err := p.updateNextRevocation(newChan); err != nil { + p.log.Errorf(err.Error()) + } return } diff --git a/peer/brontide_test.go b/peer/brontide_test.go index cf71e0b92..4b0929574 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -1118,3 +1118,56 @@ func TestPeerCustomMessage(t *testing.T) { require.Equal(t, remoteKey, receivedCustom.peer) require.Equal(t, receivedCustomMsg, &receivedCustom.msg) } + +// TestUpdateNextRevocation checks that the method `updateNextRevocation` is +// behave as expected. +func TestUpdateNextRevocation(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // TODO(yy): create interface for lnwallet.LightningChannel so we can + // easily mock it without the following setups. + notifier := &mock.ChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + } + broadcastTxChan := make(chan *wire.MsgTx) + mockSwitch := &mockMessageSwitch{} + + alicePeer, bobChan, err := createTestPeer( + t, notifier, broadcastTxChan, noUpdate, mockSwitch, + ) + require.NoError(err, "unable to create test channels") + + // testChannel is used to test the updateNextRevocation function. + testChannel := bobChan.State() + + // Update the next revocation for a known channel should give us no + // error. + err = alicePeer.updateNextRevocation(testChannel) + require.NoError(err, "expected no error") + + // Test an error is returned when the chanID cannot be found in + // `activeChannels` map. + testChannel.FundingOutpoint = wire.OutPoint{Index: 0} + err = alicePeer.updateNextRevocation(testChannel) + require.Error(err, "expected an error") + + // Test an error is returned when the chanID's corresponding channel is + // nil. + testChannel.FundingOutpoint = wire.OutPoint{Index: 1} + chanID := lnwire.NewChanIDFromOutPoint(&testChannel.FundingOutpoint) + alicePeer.activeChannels.Store(chanID, nil) + + err = alicePeer.updateNextRevocation(testChannel) + require.Error(err, "expected an error") + + // TODO(yy): should also test `InitNextRevocation` is called on + // `lnwallet.LightningWallet` once it's interfaced. +} + +// TODO(yy): add test for `addActiveChannel` and `handleNewActiveChannel` once +// we have interfaced `lnwallet.LightningChannel` and +// `*contractcourt.ChainArbitrator`. From 6b41289538724443524b6db1b2057ca5ad61652c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 13 Jun 2023 18:46:17 +0800 Subject: [PATCH 08/11] multi: patch unit tests for handling pending channels --- lnpeer/mock_peer.go | 82 ++++++++++++++++++++ peer/brontide_test.go | 169 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 lnpeer/mock_peer.go diff --git a/lnpeer/mock_peer.go b/lnpeer/mock_peer.go new file mode 100644 index 000000000..a8953092c --- /dev/null +++ b/lnpeer/mock_peer.go @@ -0,0 +1,82 @@ +package lnpeer + +import ( + "net" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/mock" +) + +// MockPeer implements the `lnpeer.Peer` interface. +type MockPeer struct { + mock.Mock +} + +// Compile time assertion that MockPeer implements lnpeer.Peer. +var _ Peer = (*MockPeer)(nil) + +func (m *MockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error { + args := m.Called(sync, msgs) + return args.Error(0) +} + +func (m *MockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error { + args := m.Called(sync, msgs) + return args.Error(0) +} + +func (m *MockPeer) AddNewChannel(channel *channeldb.OpenChannel, + cancel <-chan struct{}) error { + + args := m.Called(channel, cancel) + return args.Error(0) +} + +func (m *MockPeer) AddPendingChannel(cid lnwire.ChannelID, + cancel <-chan struct{}) error { + + args := m.Called(cid, cancel) + return args.Error(0) +} + +func (m *MockPeer) RemovePendingChannel(cid lnwire.ChannelID) error { + args := m.Called(cid) + return args.Error(0) +} + +func (m *MockPeer) WipeChannel(op *wire.OutPoint) { + m.Called(op) +} + +func (m *MockPeer) PubKey() [33]byte { + args := m.Called() + return args.Get(0).([33]byte) +} + +func (m *MockPeer) IdentityKey() *btcec.PublicKey { + args := m.Called() + return args.Get(0).(*btcec.PublicKey) +} + +func (m *MockPeer) Address() net.Addr { + args := m.Called() + return args.Get(0).(net.Addr) +} + +func (m *MockPeer) QuitSignal() <-chan struct{} { + args := m.Called() + return args.Get(0).(<-chan struct{}) +} + +func (m *MockPeer) LocalFeatures() *lnwire.FeatureVector { + args := m.Called() + return args.Get(0).(*lnwire.FeatureVector) +} + +func (m *MockPeer) RemoteFeatures() *lnwire.FeatureVector { + args := m.Called() + return args.Get(0).(*lnwire.FeatureVector) +} diff --git a/peer/brontide_test.go b/peer/brontide_test.go index 4b0929574..428ea934e 100644 --- a/peer/brontide_test.go +++ b/peer/brontide_test.go @@ -16,6 +16,7 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lntest/mock" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chancloser" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/pool" @@ -1171,3 +1172,171 @@ func TestUpdateNextRevocation(t *testing.T) { // TODO(yy): add test for `addActiveChannel` and `handleNewActiveChannel` once // we have interfaced `lnwallet.LightningChannel` and // `*contractcourt.ChainArbitrator`. + +// TestHandleNewPendingChannel checks the method `handleNewPendingChannel` +// behaves as expected. +func TestHandleNewPendingChannel(t *testing.T) { + t.Parallel() + + // Create three channel IDs for testing. + chanIDActive := lnwire.ChannelID{0} + chanIDNotExist := lnwire.ChannelID{1} + chanIDPending := lnwire.ChannelID{2} + + // Create a test brontide. + dummyConfig := Config{} + peer := NewBrontide(dummyConfig) + + // Create the test state. + peer.activeChannels.Store(chanIDActive, &lnwallet.LightningChannel{}) + peer.activeChannels.Store(chanIDPending, nil) + + // Assert test state, we should have two channels store, one active and + // one pending. + require.Equal(t, 2, peer.activeChannels.Len()) + + testCases := []struct { + name string + chanID lnwire.ChannelID + + // expectChanAdded specifies whether this chanID will be added + // to the peer's state. + expectChanAdded bool + }{ + { + name: "noop on active channel", + chanID: chanIDActive, + expectChanAdded: false, + }, + { + name: "noop on pending channel", + chanID: chanIDPending, + expectChanAdded: false, + }, + { + name: "new channel should be added", + chanID: chanIDNotExist, + expectChanAdded: true, + }, + } + + for _, tc := range testCases { + tc := tc + + // Create a request for testing. + errChan := make(chan error, 1) + req := &newChannelMsg{ + channelID: tc.chanID, + err: errChan, + } + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Get the number of channels before mutating the + // state. + numChans := peer.activeChannels.Len() + + // Call the method. + peer.handleNewPendingChannel(req) + + // Add one if we expect this channel to be added. + if tc.expectChanAdded { + numChans++ + } + + // Assert the number of channels is correct. + require.Equal(numChans, peer.activeChannels.Len()) + + // Assert the request's error chan is closed. + err, ok := <-req.err + require.False(ok, "expect err chan to be closed") + require.NoError(err, "expect no error") + }) + } +} + +// TestHandleRemovePendingChannel checks the method +// `handleRemovePendingChannel` behaves as expected. +func TestHandleRemovePendingChannel(t *testing.T) { + t.Parallel() + + // Create three channel IDs for testing. + chanIDActive := lnwire.ChannelID{0} + chanIDNotExist := lnwire.ChannelID{1} + chanIDPending := lnwire.ChannelID{2} + + // Create a test brontide. + dummyConfig := Config{} + peer := NewBrontide(dummyConfig) + + // Create the test state. + peer.activeChannels.Store(chanIDActive, &lnwallet.LightningChannel{}) + peer.activeChannels.Store(chanIDPending, nil) + + // Assert test state, we should have two channels store, one active and + // one pending. + require.Equal(t, 2, peer.activeChannels.Len()) + + testCases := []struct { + name string + chanID lnwire.ChannelID + + // expectDeleted specifies whether this chanID will be removed + // from the peer's state. + expectDeleted bool + }{ + { + name: "noop on active channel", + chanID: chanIDActive, + expectDeleted: false, + }, + { + name: "pending channel should be removed", + chanID: chanIDPending, + expectDeleted: true, + }, + { + name: "noop on non-exist channel", + chanID: chanIDNotExist, + expectDeleted: false, + }, + } + + for _, tc := range testCases { + tc := tc + + // Create a request for testing. + errChan := make(chan error, 1) + req := &newChannelMsg{ + channelID: tc.chanID, + err: errChan, + } + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Get the number of channels before mutating the + // state. + numChans := peer.activeChannels.Len() + + // Call the method. + peer.handleRemovePendingChannel(req) + + // Minus one if we expect this channel to be removed. + if tc.expectDeleted { + numChans-- + } + + // Assert the number of channels is correct. + require.Equal(numChans, peer.activeChannels.Len()) + + // Assert the request's error chan is closed. + err, ok := <-req.err + require.False(ok, "expect err chan to be closed") + require.NoError(err, "expect no error") + }) + } +} From 0dd2aa0aef99c336d91da52718405976089ed14f Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 14 Jun 2023 06:14:17 +0800 Subject: [PATCH 09/11] multi: add itest to check race between channel_ready and update_add_htlc This commit adds a new itest case to check the race condition found in issue #7401. In order to control the funding manager's state, a new dev config for the funding manager is introduced to specify a duration we should hold before processing remote node's channel_ready message. A new development config, `DevConfig` is introduced in `lncfg` and will only have effect if built with flag `integration`. This can also be extended for future integration tests if more dev-only flags are needed. --- config.go | 4 + funding/manager.go | 30 ++++++ itest/list_on_test.go | 4 + itest/lnd_open_channel_test.go | 189 +++++++++++++++++++++++++++++++++ lncfg/dev.go | 23 ++++ lncfg/dev_integration.go | 26 +++++ server.go | 10 ++ 7 files changed, 286 insertions(+) create mode 100644 lncfg/dev.go create mode 100644 lncfg/dev_integration.go diff --git a/config.go b/config.go index 706fb7892..e2ee0ca68 100644 --- a/config.go +++ b/config.go @@ -490,6 +490,10 @@ type Config struct { // Estimator is used to estimate routing probabilities. Estimator routing.Estimator + + // Dev specifies configs used for integration tests, which is always + // empty if not built with `integration` flag. + Dev *lncfg.DevConfig `group:"dev" namespace:"dev"` } // GRPCConfig holds the configuration options for the gRPC server. diff --git a/funding/manager.go b/funding/manager.go index f4452470b..ddd92491d 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -337,10 +337,22 @@ func newSerializedKey(pubKey *btcec.PublicKey) serializedPubKey { return s } +// DevConfig specifies configs used for integration test only. +type DevConfig struct { + // ProcessChannelReadyWait is the duration to sleep before processing + // remote node's channel ready message once the channel as been marked + // as `channelReadySent`. + ProcessChannelReadyWait time.Duration +} + // Config defines the configuration for the FundingManager. All elements // within the configuration MUST be non-nil for the FundingManager to carry out // its duties. type Config struct { + // Dev specifies config values used in integration test. For + // production, this config will always be an empty struct. + Dev *DevConfig + // NoWumboChans indicates if we're to reject all incoming wumbo channel // requests, and also reject all outgoing wumbo channel requests. NoWumboChans bool @@ -3523,6 +3535,24 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer, msg *lnwire.ChannelReady) { defer f.wg.Done() + + // If we are in development mode, we'll wait for specified duration + // before processing the channel ready message. + if f.cfg.Dev != nil { + duration := f.cfg.Dev.ProcessChannelReadyWait + log.Warnf("Channel(%v): sleeping %v before processing "+ + "channel_ready", msg.ChanID, duration) + + select { + case <-time.After(duration): + log.Warnf("Channel(%v): slept %v before processing "+ + "channel_ready", msg.ChanID, duration) + case <-f.quit: + log.Warnf("Channel(%v): quit sleeping", msg.ChanID) + return + } + } + log.Debugf("Received ChannelReady for ChannelID(%v) from "+ "peer %x", msg.ChanID, peer.IdentityKey().SerializeCompressed()) diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 970dc32e3..613e572fe 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -546,4 +546,8 @@ var allTestCases = []*lntest.TestCase{ Name: "utxo selection funding", TestFunc: testChannelUtxoSelection, }, + { + Name: "update pending open channels", + TestFunc: testUpdateOnPendingOpenChannels, + }, } diff --git a/itest/lnd_open_channel_test.go b/itest/lnd_open_channel_test.go index 12b13a14f..674d9a62b 100644 --- a/itest/lnd_open_channel_test.go +++ b/itest/lnd_open_channel_test.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" "github.com/lightningnetwork/lnd/lntest/rpc" @@ -488,6 +489,194 @@ func runBasicChannelCreationAndUpdates(ht *lntest.HarnessTest, ) } +// testUpdateOnPendingOpenChannels checks that `update_add_htlc` followed by +// `channel_ready` is properly handled. In specific, when a node is in a state +// that it's still processing a remote `channel_ready` message, meanwhile an +// `update_add_htlc` is received, this HTLC message is cached and settled once +// processing `channel_ready` is complete. +func testUpdateOnPendingOpenChannels(ht *lntest.HarnessTest) { + // Test funder's behavior. Funder sees the channel pending, but fundee + // sees it active and sends an HTLC. + ht.Run("pending on funder side", func(t *testing.T) { + st := ht.Subtest(t) + testUpdateOnFunderPendingOpenChannels(st) + }) + + // Test fundee's behavior. Fundee sees the channel pending, but funder + // sees it active and sends an HTLC. + ht.Run("pending on fundee side", func(t *testing.T) { + st := ht.Subtest(t) + testUpdateOnFundeePendingOpenChannels(st) + }) +} + +// testUpdateOnFunderPendingOpenChannels checks that when the fundee sends an +// `update_add_htlc` followed by `channel_ready` while the funder is still +// processing the fundee's `channel_ready`, the HTLC will be cached and +// eventually settled. +func testUpdateOnFunderPendingOpenChannels(ht *lntest.HarnessTest) { + // Grab the channel participants. + alice, bob := ht.Alice, ht.Bob + + // Restart Alice with the config so she won't process Bob's + // channel_ready msg immediately. + ht.RestartNodeWithExtraArgs(alice, []string{ + "--dev.processchannelreadywait=10s", + }) + + // Make sure Alice and Bob are connected. + ht.EnsureConnected(alice, bob) + + // Create a new channel that requires 1 confs before it's considered + // open. + params := lntest.OpenChannelParams{ + Amt: funding.MaxBtcFundingAmount, + PushAmt: funding.MaxBtcFundingAmount / 2, + } + pendingChan := ht.OpenChannelAssertPending(alice, bob, params) + chanPoint := &lnrpc.ChannelPoint{ + FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ + FundingTxidBytes: pendingChan.Txid, + }, + OutputIndex: pendingChan.OutputIndex, + } + + // Alice and Bob should both consider the channel pending open. + ht.AssertNumPendingOpenChannels(alice, 1) + ht.AssertNumPendingOpenChannels(bob, 1) + + // Mine one block to confirm the funding transaction. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // TODO(yy): we've prematurely marked the channel as open before + // processing channel ready messages. We need to mark it as open after + // we've processed channel ready messages and change the check to, + // ht.AssertNumPendingOpenChannels(alice, 1) + ht.AssertNumPendingOpenChannels(alice, 0) + + // Bob will consider the channel open as there's no wait time to send + // and receive Alice's channel_ready message. + ht.AssertNumPendingOpenChannels(bob, 0) + + // Alice and Bob now have different view of the channel. For Bob, + // since the channel_ready messages are processed, he will have a + // working link to route HTLCs. For Alice, because she hasn't handled + // Bob's channel_ready, there's no active link yet. + // + // Alice now adds an invoice. + req := &lnrpc.Invoice{ + RPreimage: ht.Random32Bytes(), + Value: 10_000, + } + invoice := alice.RPC.AddInvoice(req) + + // Bob sends an `update_add_htlc`, which would result in this message + // being cached in Alice's `peer.Brontide` and the payment will stay + // in-flight instead of being failed by Alice. + bobReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + TimeoutSeconds: 60, + FeeLimitMsat: noFeeLimitMsat, + } + bobStream := bob.RPC.SendPayment(bobReq) + ht.AssertPaymentStatusFromStream(bobStream, lnrpc.Payment_IN_FLIGHT) + + // Wait until Alice finishes processing Bob's channel_ready. + // + // NOTE: no effect before fixing the above TODO. + ht.AssertNumPendingOpenChannels(alice, 0) + + // Once Alice sees the channel as active, she will process the cached + // premature `update_add_htlc` and settles the payment. + ht.AssertPaymentStatusFromStream(bobStream, lnrpc.Payment_SUCCEEDED) + + // Close the channel. + ht.CloseChannel(alice, chanPoint) +} + +// testUpdateOnFundeePendingOpenChannels checks that when the funder sends an +// `update_add_htlc` followed by `channel_ready` while the fundee is still +// processing the funder's `channel_ready`, the HTLC will be cached and +// eventually settled. +func testUpdateOnFundeePendingOpenChannels(ht *lntest.HarnessTest) { + // Grab the channel participants. + alice, bob := ht.Alice, ht.Bob + + // Restart Bob with the config so he won't process Alice's + // channel_ready msg immediately. + ht.RestartNodeWithExtraArgs(bob, []string{ + "--dev.processchannelreadywait=10s", + }) + + // Make sure Alice and Bob are connected. + ht.EnsureConnected(alice, bob) + + // Create a new channel that requires 1 confs before it's considered + // open. + params := lntest.OpenChannelParams{ + Amt: funding.MaxBtcFundingAmount, + } + pendingChan := ht.OpenChannelAssertPending(alice, bob, params) + chanPoint := &lnrpc.ChannelPoint{ + FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ + FundingTxidBytes: pendingChan.Txid, + }, + OutputIndex: pendingChan.OutputIndex, + } + + // Alice and Bob should both consider the channel pending open. + ht.AssertNumPendingOpenChannels(alice, 1) + ht.AssertNumPendingOpenChannels(bob, 1) + + // Mine one block to confirm the funding transaction. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Alice will consider the channel open as there's no wait time to send + // and receive Bob's channel_ready message. + ht.AssertNumPendingOpenChannels(alice, 0) + + // TODO(yy): we've prematurely marked the channel as open before + // processing channel ready messages. We need to mark it as open after + // we've processed channel ready messages and change the check to, + // ht.AssertNumPendingOpenChannels(bob, 1) + ht.AssertNumPendingOpenChannels(bob, 0) + + // Alice and Bob now have different view of the channel. For Alice, + // since the channel_ready messages are processed, she will have a + // working link to route HTLCs. For Bob, because he hasn't handled + // Alice's channel_ready, there's no active link yet. + // + // Bob now adds an invoice. + req := &lnrpc.Invoice{ + RPreimage: ht.Random32Bytes(), + Value: 10_000, + } + bobInvoice := bob.RPC.AddInvoice(req) + + // Alice sends an `update_add_htlc`, which would result in this message + // being cached in Bob's `peer.Brontide` and the payment will stay + // in-flight instead of being failed by Bob. + aliceReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: bobInvoice.PaymentRequest, + TimeoutSeconds: 60, + FeeLimitMsat: noFeeLimitMsat, + } + aliceStream := alice.RPC.SendPayment(aliceReq) + ht.AssertPaymentStatusFromStream(aliceStream, lnrpc.Payment_IN_FLIGHT) + + // Wait until Bob finishes processing Alice's channel_ready. + // + // NOTE: no effect before fixing the above TODO. + ht.AssertNumPendingOpenChannels(bob, 0) + + // Once Bob sees the channel as active, he will process the cached + // premature `update_add_htlc` and settles the payment. + ht.AssertPaymentStatusFromStream(aliceStream, lnrpc.Payment_SUCCEEDED) + + // Close the channel. + ht.CloseChannel(alice, chanPoint) +} + // verifyCloseUpdate is used to verify that a closed channel update is of the // expected type. func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate, diff --git a/lncfg/dev.go b/lncfg/dev.go new file mode 100644 index 000000000..ac47b0f5e --- /dev/null +++ b/lncfg/dev.go @@ -0,0 +1,23 @@ +//go:build !integration + +package lncfg + +import "time" + +// IsDevBuild returns a bool to indicate whether we are in a development +// environment. +// +// NOTE: always return false here. +func IsDevBuild() bool { + return false +} + +// DevConfig specifies development configs used for production. This struct +// should always remain empty. +type DevConfig struct{} + +// ChannelReadyWait returns the config value, which is always 0 for production +// build. +func (d *DevConfig) ChannelReadyWait() time.Duration { + return 0 +} diff --git a/lncfg/dev_integration.go b/lncfg/dev_integration.go new file mode 100644 index 000000000..c55b2efa7 --- /dev/null +++ b/lncfg/dev_integration.go @@ -0,0 +1,26 @@ +//go:build integration + +package lncfg + +import "time" + +// IsDevBuild returns a bool to indicate whether we are in a development +// environment. +// +// NOTE: always return true here. +func IsDevBuild() bool { + return true +} + +// DevConfig specifies configs used for integration tests. These configs can +// only be used in tests and must NOT be exported for production usage. +// +//nolint:lll +type DevConfig struct { + ProcessChannelReadyWait time.Duration `long:"processchannelreadywait" description:"Time to sleep before processing remote node's channel_ready message."` +} + +// ChannelReadyWait returns the config value `ProcessChannelReadyWait`. +func (d *DevConfig) ChannelReadyWait() time.Duration { + return d.ProcessChannelReadyWait +} diff --git a/server.go b/server.go index 5b421ae2f..e3fc80113 100644 --- a/server.go +++ b/server.go @@ -1276,8 +1276,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, return ourPolicy, err } + // Get the development config for funding manager. If we are not in + // development mode, this would be nil. + var devCfg *funding.DevConfig + if lncfg.IsDevBuild() { + devCfg = &funding.DevConfig{ + ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(), + } + } + //nolint:lll s.fundingMgr, err = funding.NewFundingManager(funding.Config{ + Dev: devCfg, NoWumboChans: !cfg.ProtocolOptions.Wumbo(), IDKey: nodeKeyDesc.PubKey, IDKeyLoc: nodeKeyDesc.KeyLocator, From a9da25b2381dc9deee9d7831ec9aba1840f1bfe9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 14 Jun 2023 07:03:35 +0800 Subject: [PATCH 10/11] golangci: update linter settings for test files --- .golangci.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.golangci.yml b/.golangci.yml index a68dd81ee..3ac45f3c7 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -214,10 +214,17 @@ issues: - gosec - funlen - revive + # Allow duplications in tests so it's easier to follow a single unit + # test. + - dupl - path: mock* linters: - revive + # forcetypeassert is skipped for the mock because the test would fail + # if the returned value doesn't match the type, so there's no need to + # check the convert. + - forcetypeassert - path: test* linters: From f9d4212eccc1760a29ef8f1bb8c350428147bec9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 12 Jul 2023 14:36:09 +0800 Subject: [PATCH 11/11] peer: send msgs to `chanStream` for both active and pending channels This commit now sends messages to `chanStream` for both pending and active channels. If the message is sent to a pending channel, it will be queued in `chanStream`. Once the channel link becomes active, the early messages will be processed. --- peer/brontide.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 3e84042de..43d4e57f3 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1607,8 +1607,7 @@ out: case *lnwire.ChannelReestablish: targetChan = msg.ChanID - isLinkUpdate = p.isActiveChannel(targetChan) || - p.isPendingChannel(targetChan) + isLinkUpdate = p.hasChannel(targetChan) // If we failed to find the link in question, and the // message received was a channel sync message, then @@ -1625,9 +1624,22 @@ out: } } + // For messages that implement the LinkUpdater interface, we + // will consider them as link updates and send them to + // chanStream. These messages will be queued inside chanStream + // if the channel is not active yet. case LinkUpdater: targetChan = msg.TargetChanID() - isLinkUpdate = p.isActiveChannel(targetChan) + isLinkUpdate = p.hasChannel(targetChan) + + // Log an error if we don't have this channel. This + // means the peer has sent us a message with unknown + // channel ID. + if !isLinkUpdate { + p.log.Errorf("Unknown channel ID: %v found "+ + "in received msg=%s", targetChan, + nextMsg.MsgType()) + } case *lnwire.ChannelUpdate, *lnwire.ChannelAnnouncement, @@ -1729,6 +1741,13 @@ func (p *Brontide) isPendingChannel(chanID lnwire.ChannelID) bool { return channel == nil } +// hasChannel returns true if the peer has a pending/active channel specified +// by the channel ID. +func (p *Brontide) hasChannel(chanID lnwire.ChannelID) bool { + _, ok := p.activeChannels.Load(chanID) + return ok +} + // storeError stores an error in our peer's buffer of recent errors with the // current timestamp. Errors are only stored if we have at least one active // channel with the peer to mitigate a dos vector where a peer costlessly