peer: add new composite chanCloserFsm type

In this commit, we add a new composite chanCloserFsm type. This'll allow
us to store a single value that might be a negotiator or and rbf-er.

In a follow up commit, we'll use this to conditionally create the new
rbf closer.
This commit is contained in:
Olaoluwa Osuntokun 2024-01-31 19:48:15 -08:00
parent 23a9a761e3
commit 2dae0ba96e

View File

@ -445,6 +445,18 @@ type Config struct {
Quit chan struct{}
}
// chanCloserFsm is a union-like type that can hold the two versions of co-op
// close we support: negotiation, and RBF based.
//
// TODO(roasbeef): rename to chancloser.Negotiator and chancloser.RBF?
type chanCloserFsm = fn.Either[*chancloser.ChanCloser, *chancloser.RbfChanCloser] //nolint:ll
func makeNegotiateCloser(chanCloser *chancloser.ChanCloser) chanCloserFsm {
return fn.NewLeft[*chancloser.ChanCloser, *chancloser.RbfChanCloser](
chanCloser,
)
}
// Brontide is an active peer on the Lightning Network. This struct is responsible
// for managing any channel state related to this peer. To do so, it has
// several helper goroutines to handle events such as HTLC timeouts, new
@ -543,7 +555,7 @@ type Brontide struct {
// cooperative channel closures. Any channel closing messages are directed
// to one of these active state machines. Once the channel has been closed,
// the state machine will be deleted from the map.
activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser
activeChanCloses map[lnwire.ChannelID]chanCloserFsm
// localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over.
@ -629,7 +641,7 @@ func NewBrontide(cfg Config) *Brontide {
removePendingChannel: make(chan *newChannelMsg),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
activeChanCloses: make(map[lnwire.ChannelID]chanCloserFsm),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg),
@ -1218,8 +1230,9 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
"delivery addr: %w", err)
return
}
chanCloser, err := p.createChanCloser(
lnChan, addr, feePerKw, nil, info.Closer(),
negotiateChanCloser, err := p.createChanCloser(
lnChan, addr, feePerKw, nil,
info.Closer(),
)
if err != nil {
shutdownInfoErr = fmt.Errorf("unable to "+
@ -1232,10 +1245,14 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
lnChan.State().FundingOutpoint,
)
chanCloser := makeNegotiateCloser(
negotiateChanCloser,
)
p.activeChanCloses[chanID] = chanCloser
// Create the Shutdown message.
shutdown, err := chanCloser.ShutdownChan()
shutdown, err := negotiateChanCloser.ShutdownChan()
if err != nil {
delete(p.activeChanCloses, chanID)
shutdownInfoErr = err
@ -2947,7 +2964,7 @@ func (p *Brontide) reenableActiveChannels() {
// Otherwise, either an existing state machine will be returned, or a new one
// will be created.
func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
*chancloser.ChanCloser, error) {
*chanCloserFsm, error) {
chanCloser, found := p.activeChanCloses[chanID]
if found {
@ -2955,7 +2972,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
// created for a non-pending channel or for a channel that had
// previously started the shutdown process but the connection
// was restarted.
return chanCloser, nil
return &chanCloser, nil
}
// First, we'll ensure that we actually know of the target channel. If
@ -3001,7 +3018,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
if err != nil {
return nil, fmt.Errorf("unable to parse addr: %w", err)
}
chanCloser, err = p.createChanCloser(
negotiateChanCloser, err := p.createChanCloser(
channel, addr, feePerKw, nil, lntypes.Remote,
)
if err != nil {
@ -3009,9 +3026,11 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
return nil, fmt.Errorf("unable to create chan closer")
}
chanCloser = makeNegotiateCloser(negotiateChanCloser)
p.activeChanCloses[chanID] = chanCloser
return chanCloser, nil
return &chanCloser, nil
}
// filterChannelsToEnable filters a list of channels to be enabled upon start.
@ -3265,7 +3284,7 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
// goroutine since this is done before the channelManager goroutine is
// created.
chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint)
p.activeChanCloses[chanID] = chanCloser
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
// Create the Shutdown message.
shutdownMsg, err := chanCloser.ShutdownChan()
@ -3365,7 +3384,7 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose,
}
chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
p.activeChanCloses[chanID] = chanCloser
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
// Finally, we'll initiate the channel shutdown within the
// chanCloser, and send the shutdown message to the remote
@ -3469,6 +3488,7 @@ func (p *Brontide) initRbfChanCloser(
return p.genDeliveryScript()
},
FeeEstimator: &chancloser.SimpleCoopFeeEstimator{},
CloseSigner: channel,
ChanObserver: newChanObserver(
channel, link, p.cfg.ChanStatusMgr,
),
@ -4165,7 +4185,7 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
// We'll now fetch the matching closing state machine in order to continue,
// or finalize the channel closure process.
chanCloser, err := p.fetchActiveChanCloser(msg.cid)
chanCloserE, err := p.fetchActiveChanCloser(msg.cid)
if err != nil {
// If the channel is not known to us, we'll simply ignore this message.
if err == ErrChannelNotFound {
@ -4182,17 +4202,31 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
return
}
if chanCloserE.IsRight() {
// TODO(roasbeef): assert?
return
}
// At this point, we'll only enter this call path if a negotiate chan
// closer was used. So we'll extract that from the either now.
//
// TODO(roabeef): need extra helper func for either to make cleaner
var chanCloser *chancloser.ChanCloser
chanCloserE.WhenLeft(func(c *chancloser.ChanCloser) {
chanCloser = c
})
handleErr := func(err error) {
err = fmt.Errorf("unable to process close msg: %w", err)
p.log.Error(err)
// As the negotiations failed, we'll reset the channel state machine to
// ensure we act to on-chain events as normal.
// As the negotiations failed, we'll reset the channel state
// machine to ensure we act to on-chain events as normal.
chanCloser.Channel().ResetState()
if chanCloser.CloseRequest() != nil {
chanCloser.CloseRequest().Err <- err
}
delete(p.activeChanCloses, msg.cid)
p.Disconnect(err)