From 2dae0ba96ebc4575aca4483324abcd20fe4764be Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 31 Jan 2024 19:48:15 -0800 Subject: [PATCH] peer: add new composite chanCloserFsm type In this commit, we add a new composite chanCloserFsm type. This'll allow us to store a single value that might be a negotiator or and rbf-er. In a follow up commit, we'll use this to conditionally create the new rbf closer. --- peer/brontide.go | 64 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 15 deletions(-) diff --git a/peer/brontide.go b/peer/brontide.go index 5214120a0..816fd37d7 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -445,6 +445,18 @@ type Config struct { Quit chan struct{} } +// chanCloserFsm is a union-like type that can hold the two versions of co-op +// close we support: negotiation, and RBF based. +// +// TODO(roasbeef): rename to chancloser.Negotiator and chancloser.RBF? +type chanCloserFsm = fn.Either[*chancloser.ChanCloser, *chancloser.RbfChanCloser] //nolint:ll + +func makeNegotiateCloser(chanCloser *chancloser.ChanCloser) chanCloserFsm { + return fn.NewLeft[*chancloser.ChanCloser, *chancloser.RbfChanCloser]( + chanCloser, + ) +} + // Brontide is an active peer on the Lightning Network. This struct is responsible // for managing any channel state related to this peer. To do so, it has // several helper goroutines to handle events such as HTLC timeouts, new @@ -543,7 +555,7 @@ type Brontide struct { // cooperative channel closures. Any channel closing messages are directed // to one of these active state machines. Once the channel has been closed, // the state machine will be deleted from the map. - activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser + activeChanCloses map[lnwire.ChannelID]chanCloserFsm // localCloseChanReqs is a channel in which any local requests to close // a particular channel are sent over. @@ -629,7 +641,7 @@ func NewBrontide(cfg Config) *Brontide { removePendingChannel: make(chan *newChannelMsg), activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), - activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser), + activeChanCloses: make(map[lnwire.ChannelID]chanCloserFsm), localCloseChanReqs: make(chan *htlcswitch.ChanClose), linkFailures: make(chan linkFailureReport), chanCloseMsgs: make(chan *closeMsg), @@ -1218,8 +1230,9 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( "delivery addr: %w", err) return } - chanCloser, err := p.createChanCloser( - lnChan, addr, feePerKw, nil, info.Closer(), + negotiateChanCloser, err := p.createChanCloser( + lnChan, addr, feePerKw, nil, + info.Closer(), ) if err != nil { shutdownInfoErr = fmt.Errorf("unable to "+ @@ -1232,10 +1245,14 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( lnChan.State().FundingOutpoint, ) + chanCloser := makeNegotiateCloser( + negotiateChanCloser, + ) + p.activeChanCloses[chanID] = chanCloser // Create the Shutdown message. - shutdown, err := chanCloser.ShutdownChan() + shutdown, err := negotiateChanCloser.ShutdownChan() if err != nil { delete(p.activeChanCloses, chanID) shutdownInfoErr = err @@ -2947,7 +2964,7 @@ func (p *Brontide) reenableActiveChannels() { // Otherwise, either an existing state machine will be returned, or a new one // will be created. func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( - *chancloser.ChanCloser, error) { + *chanCloserFsm, error) { chanCloser, found := p.activeChanCloses[chanID] if found { @@ -2955,7 +2972,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( // created for a non-pending channel or for a channel that had // previously started the shutdown process but the connection // was restarted. - return chanCloser, nil + return &chanCloser, nil } // First, we'll ensure that we actually know of the target channel. If @@ -3001,7 +3018,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( if err != nil { return nil, fmt.Errorf("unable to parse addr: %w", err) } - chanCloser, err = p.createChanCloser( + negotiateChanCloser, err := p.createChanCloser( channel, addr, feePerKw, nil, lntypes.Remote, ) if err != nil { @@ -3009,9 +3026,11 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) ( return nil, fmt.Errorf("unable to create chan closer") } + chanCloser = makeNegotiateCloser(negotiateChanCloser) + p.activeChanCloses[chanID] = chanCloser - return chanCloser, nil + return &chanCloser, nil } // filterChannelsToEnable filters a list of channels to be enabled upon start. @@ -3265,7 +3284,7 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) ( // goroutine since this is done before the channelManager goroutine is // created. chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint) - p.activeChanCloses[chanID] = chanCloser + p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser) // Create the Shutdown message. shutdownMsg, err := chanCloser.ShutdownChan() @@ -3365,7 +3384,7 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose, } chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint) - p.activeChanCloses[chanID] = chanCloser + p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser) // Finally, we'll initiate the channel shutdown within the // chanCloser, and send the shutdown message to the remote @@ -3469,6 +3488,7 @@ func (p *Brontide) initRbfChanCloser( return p.genDeliveryScript() }, FeeEstimator: &chancloser.SimpleCoopFeeEstimator{}, + CloseSigner: channel, ChanObserver: newChanObserver( channel, link, p.cfg.ChanStatusMgr, ), @@ -4165,7 +4185,7 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) { // We'll now fetch the matching closing state machine in order to continue, // or finalize the channel closure process. - chanCloser, err := p.fetchActiveChanCloser(msg.cid) + chanCloserE, err := p.fetchActiveChanCloser(msg.cid) if err != nil { // If the channel is not known to us, we'll simply ignore this message. if err == ErrChannelNotFound { @@ -4182,17 +4202,31 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) { return } + if chanCloserE.IsRight() { + // TODO(roasbeef): assert? + return + } + + // At this point, we'll only enter this call path if a negotiate chan + // closer was used. So we'll extract that from the either now. + // + // TODO(roabeef): need extra helper func for either to make cleaner + var chanCloser *chancloser.ChanCloser + chanCloserE.WhenLeft(func(c *chancloser.ChanCloser) { + chanCloser = c + }) + handleErr := func(err error) { err = fmt.Errorf("unable to process close msg: %w", err) p.log.Error(err) - // As the negotiations failed, we'll reset the channel state machine to - // ensure we act to on-chain events as normal. + // As the negotiations failed, we'll reset the channel state + // machine to ensure we act to on-chain events as normal. chanCloser.Channel().ResetState() - if chanCloser.CloseRequest() != nil { chanCloser.CloseRequest().Err <- err } + delete(p.activeChanCloses, msg.cid) p.Disconnect(err)