peer: add new composite chanCloserFsm type

In this commit, we add a new composite chanCloserFsm type. This'll allow
us to store a single value that might be a negotiator or and rbf-er.

In a follow up commit, we'll use this to conditionally create the new
rbf closer.
This commit is contained in:
Olaoluwa Osuntokun 2024-01-31 19:48:15 -08:00
parent a89784a7fb
commit 5a41487775
2 changed files with 80 additions and 29 deletions

View File

@ -397,6 +397,18 @@ type Config struct {
Quit chan struct{}
}
// chanCloserFsm is a union-like type that can hold the two versions of co-op
// close we support: negotiation, and RBF based.
//
// TODO(roasbeef): rename to chancloser.Negotiator and chancloser.RBF?
type chanCloserFsm = fn.Either[*chancloser.ChanCloser, *chancloser.RbfChanCloser]
func makeNegotiateCloser(chanCloser *chancloser.ChanCloser) chanCloserFsm {
return fn.NewLeft[*chancloser.ChanCloser, *chancloser.RbfChanCloser](
chanCloser,
)
}
// Brontide is an active peer on the Lightning Network. This struct is responsible
// for managing any channel state related to this peer. To do so, it has
// several helper goroutines to handle events such as HTLC timeouts, new
@ -495,7 +507,7 @@ type Brontide struct {
// cooperative channel closures. Any channel closing messages are directed
// to one of these active state machines. Once the channel has been closed,
// the state machine will be deleted from the map.
activeChanCloses map[lnwire.ChannelID]*chancloser.ChanCloser
activeChanCloses map[lnwire.ChannelID]chanCloserFsm
// localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over.
@ -578,7 +590,7 @@ func NewBrontide(cfg Config) *Brontide {
removePendingChannel: make(chan *newChannelMsg),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
activeChanCloses: make(map[lnwire.ChannelID]chanCloserFsm),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg),
@ -1113,7 +1125,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
return
}
chanCloser, err := p.createChanCloser(
negotiateChanCloser, err := p.createChanCloser(
lnChan, info.DeliveryScript.Val, feePerKw, nil,
info.Closer(),
)
@ -1128,10 +1140,14 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
lnChan.State().FundingOutpoint,
)
chanCloser := makeNegotiateCloser(
negotiateChanCloser,
)
p.activeChanCloses[chanID] = chanCloser
// Create the Shutdown message.
shutdown, err := chanCloser.ShutdownChan()
shutdown, err := negotiateChanCloser.ShutdownChan()
if err != nil {
delete(p.activeChanCloses, chanID)
shutdownInfoErr = err
@ -2823,7 +2839,7 @@ func (p *Brontide) reenableActiveChannels() {
// Otherwise, either an existing state machine will be returned, or a new one
// will be created.
func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
*chancloser.ChanCloser, error) {
*chanCloserFsm, error) {
chanCloser, found := p.activeChanCloses[chanID]
if found {
@ -2831,7 +2847,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
// created for a non-pending channel or for a channel that had
// previously started the shutdown process but the connection
// was restarted.
return chanCloser, nil
return &chanCloser, nil
}
// First, we'll ensure that we actually know of the target channel. If
@ -2873,7 +2889,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
return nil, fmt.Errorf("unable to estimate fee")
}
chanCloser, err = p.createChanCloser(
negotiateChanCloser, err := p.createChanCloser(
channel, deliveryScript, feePerKw, nil, lntypes.Remote,
)
if err != nil {
@ -2881,9 +2897,11 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
return nil, fmt.Errorf("unable to create chan closer")
}
chanCloser = makeNegotiateCloser(negotiateChanCloser)
p.activeChanCloses[chanID] = chanCloser
return chanCloser, nil
return &chanCloser, nil
}
// filterChannelsToEnable filters a list of channels to be enabled upon start.
@ -3128,7 +3146,7 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
// goroutine since this is done before the channelManager goroutine is
// created.
chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint)
p.activeChanCloses[chanID] = chanCloser
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
// Create the Shutdown message.
shutdownMsg, err := chanCloser.ShutdownChan()
@ -3230,7 +3248,7 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose,
}
chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
p.activeChanCloses[chanID] = chanCloser
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
// Finally, we'll initiate the channel shutdown within the
// chanCloser, and send the shutdown message to the remote
@ -3284,7 +3302,7 @@ func chooseAddr(addr lnwire.DeliveryAddress) fn.Option[lnwire.DeliveryAddress] {
func (p *Brontide) initRbfChanCloser(req *htlcswitch.ChanClose,
channel *lnwallet.LightningChannel) (*chancloser.RbfChanCloser, error) {
chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint)
chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
link := p.fetchLinkFromKeyAndCid(chanID)
@ -3305,7 +3323,7 @@ func (p *Brontide) initRbfChanCloser(req *htlcswitch.ChanClose,
return nil, fmt.Errorf("unable to get thaw height: %w", err)
}
msgMapper := chancloser.NewRbfMsgMapper(uint32(startingHeight))
msgMapper := chancloser.NewRbfMsgMapper(uint32(startingHeight), chanID)
initialState := chancloser.ChannelActive{}
@ -3335,14 +3353,16 @@ func (p *Brontide) initRbfChanCloser(req *htlcswitch.ChanClose,
return p.genDeliveryScript()
},
FeeEstimator: &chancloser.SimpleCoopFeeEstimator{},
ChanObserver: newChanObserver(channel, link),
ChanObserver: newChanObserver(channel, link, p.cfg.ChanStatusMgr),
}
spendEvent := protofsm.RegisterSpend[chancloser.SpendEvent]{
OutPoint: *req.ChanPoint,
PkScript: channel.FundingTxOut().PkScript,
HeightHint: scid.BlockHeight,
PostSpendEvent: fn.Some(chancloser.SpendEvent{}),
spendEvent := protofsm.RegisterSpend[chancloser.ProtocolEvent]{
OutPoint: channel.ChannelPoint(),
PkScript: channel.FundingTxOut().PkScript,
HeightHint: scid.BlockHeight,
PostSpendEvent: fn.Some[chancloser.RbfSpendMapper](
chancloser.SpendMapper,
),
}
// TODO(roasbeef): edge case here to re-enable a channel before both
@ -4022,7 +4042,7 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
// We'll now fetch the matching closing state machine in order to continue,
// or finalize the channel closure process.
chanCloser, err := p.fetchActiveChanCloser(msg.cid)
chanCloserE, err := p.fetchActiveChanCloser(msg.cid)
if err != nil {
// If the channel is not known to us, we'll simply ignore this message.
if err == ErrChannelNotFound {
@ -4039,17 +4059,31 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
return
}
if chanCloserE.IsRight() {
// TODO(roasbeef): assert?
return
}
// At this point, we'll only enter this call path if a negotiate chan
// closer was used. So we'll extract that from the either now.
//
// TODO(roabeef): need extra helper func for either to make cleaner
var chanCloser *chancloser.ChanCloser
chanCloserE.WhenLeft(func(c *chancloser.ChanCloser) {
chanCloser = c
})
handleErr := func(err error) {
err = fmt.Errorf("unable to process close msg: %w", err)
p.log.Error(err)
// As the negotiations failed, we'll reset the channel state machine to
// ensure we act to on-chain events as normal.
// As the negotiations failed, we'll reset the channel state
// machine to ensure we act to on-chain events as normal.
chanCloser.Channel().ResetState()
if chanCloser.CloseRequest() != nil {
chanCloser.CloseRequest().Err <- err
}
delete(p.activeChanCloses, msg.cid)
p.Disconnect(err)

View File

@ -5,6 +5,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
)
@ -24,7 +25,7 @@ type channelView interface {
// MarkCoopBroadcasted persistently marks that the channel close
// transaction has been broadcast.
MarkCoopBroadcasted(*wire.MsgTx, bool) error
MarkCoopBroadcasted(*wire.MsgTx, lntypes.ChannelParty) error
// StateSnapshot returns a snapshot of the current fully committed
// state within the channel.
@ -50,21 +51,31 @@ type linkController interface {
IsFlushing(direction bool) bool
}
// linkNetworkController is an interface that represents an object capable of
// managing interactions with the active channel links from the PoV of the
// gossip network.
type linkNetworkController interface {
// RequestDisable disables a channel by its channel point.
RequestDisable(wire.OutPoint, bool) error
}
// chanObserver implements the chancloser.ChanObserver interface for the
// existing LightningChannel struct/instance.
type chanObserver struct {
chanView channelView
link linkController
chanView channelView
link linkController
linkNetwork linkNetworkController
}
// newChanObserver creates a new instance of a chanObserver from an active
// channelView.
func newChanObserver(chanView channelView,
link linkController) *chanObserver {
link linkController, linkNetwork linkNetworkController) *chanObserver {
return &chanObserver{
chanView: chanView,
link: link,
chanView: chanView,
link: link,
linkNetwork: linkNetwork,
}
}
@ -102,7 +113,7 @@ func (l *chanObserver) DisableOutgoingAdds() error {
// MarkCoopBroadcasted persistently marks that the channel close transaction
// has been broadcast.
func (l *chanObserver) MarkCoopBroadcasted(tx *wire.MsgTx, local bool) error {
return l.chanView.MarkCoopBroadcasted(tx, local)
return l.chanView.MarkCoopBroadcasted(tx, lntypes.Local)
}
// MarkShutdownSent persists the given ShutdownInfo. The existence of the
@ -149,6 +160,12 @@ func (l *chanObserver) FinalBalances() fn.Option[chancloser.ShutdownBalances] {
}
}
// DisableChannel disables the target channel.
func (l *chanObserver) DisableChannel() error {
op := l.chanView.StateSnapshot().ChannelPoint
return l.linkNetwork.RequestDisable(op, false)
}
// A compile-time assertion to ensure that chanObserver meets the
// chancloser.ChanStateObserver interface.
var _ chancloser.ChanStateObserver = (*chanObserver)(nil)