peer: conditionally create rbf coop close fsm based on feature bits

In this commit, we fully integrate the new RBF close state machine into
the peer.

For the restart case after shutdown, we can short circuit the existing
logic as the new FSM will handle retransmitting the shutdown message
itself, and doesn't need to delegate that duty to the link.

Unlike the existing state machine, we're able to restart the flow to
sign a coop close with a new higher fee rate. In this case, we can now
send multiple updates to the RPC caller, one for each newly singed coop
close transaction.

To implement the async flush case, we'll launch a new goroutine to wait
until the state machine reaches the `ChannelFlushing` state, then we'll
register the hook. We don't do this at start up, as otherwise the
channel may _already_ be flushed, triggering an invalid state
transition.
This commit is contained in:
Olaoluwa Osuntokun
2024-03-05 00:15:46 -06:00
parent f22cba9de1
commit 6364e98f0e
5 changed files with 425 additions and 67 deletions

View File

@@ -1,6 +1,7 @@
package msgmux package msgmux
import ( import (
"context"
"fmt" "fmt"
"maps" "maps"
"sync" "sync"
@@ -46,7 +47,7 @@ type Endpoint interface {
// SendMessage handles the target message, and returns true if the // SendMessage handles the target message, and returns true if the
// message was able being processed. // message was able being processed.
SendMessage(msg PeerMsg) bool SendMessage(ctx context.Context, msg PeerMsg) bool
} }
// MsgRouter is an interface that represents a message router, which is generic // MsgRouter is an interface that represents a message router, which is generic
@@ -66,7 +67,7 @@ type Router interface {
RouteMsg(PeerMsg) error RouteMsg(PeerMsg) error
// Start starts the peer message router. // Start starts the peer message router.
Start() Start(ctx context.Context)
// Stop stops the peer message router. // Stop stops the peer message router.
Stop() Stop()
@@ -137,12 +138,12 @@ func NewMultiMsgRouter() *MultiMsgRouter {
} }
// Start starts the peer message router. // Start starts the peer message router.
func (p *MultiMsgRouter) Start() { func (p *MultiMsgRouter) Start(ctx context.Context) {
log.Infof("Starting Router") log.Infof("Starting Router")
p.startOnce.Do(func() { p.startOnce.Do(func() {
p.wg.Add(1) p.wg.Add(1)
go p.msgRouter() go p.msgRouter(ctx)
}) })
} }
@@ -179,7 +180,7 @@ func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
} }
// msgRouter is the main goroutine that handles all incoming messages. // msgRouter is the main goroutine that handles all incoming messages.
func (p *MultiMsgRouter) msgRouter() { func (p *MultiMsgRouter) msgRouter(ctx context.Context) {
defer p.wg.Done() defer p.wg.Done()
// endpoints is a map of all registered endpoints. // endpoints is a map of all registered endpoints.
@@ -235,7 +236,7 @@ func (p *MultiMsgRouter) msgRouter() {
"msg %T to endpoint %s", msg, "msg %T to endpoint %s", msg,
endpoint.Name()) endpoint.Name())
sent := endpoint.SendMessage(msg) sent := endpoint.SendMessage(ctx, msg)
couldSend = couldSend || sent couldSend = couldSend || sent
} }
} }

View File

@@ -1,6 +1,7 @@
package msgmux package msgmux
import ( import (
"context"
"testing" "testing"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@@ -24,7 +25,7 @@ func (m *mockEndpoint) CanHandle(msg PeerMsg) bool {
return args.Bool(0) return args.Bool(0)
} }
func (m *mockEndpoint) SendMessage(msg PeerMsg) bool { func (m *mockEndpoint) SendMessage(ctx context.Context, msg PeerMsg) bool {
args := m.Called(msg) args := m.Called(msg)
return args.Bool(0) return args.Bool(0)
@@ -33,8 +34,9 @@ func (m *mockEndpoint) SendMessage(msg PeerMsg) bool {
// TestMessageRouterOperation tests the basic operation of the message router: // TestMessageRouterOperation tests the basic operation of the message router:
// add new endpoints, route to them, remove, them, etc. // add new endpoints, route to them, remove, them, etc.
func TestMessageRouterOperation(t *testing.T) { func TestMessageRouterOperation(t *testing.T) {
ctx := context.Background()
msgRouter := NewMultiMsgRouter() msgRouter := NewMultiMsgRouter()
msgRouter.Start() msgRouter.Start(ctx)
defer msgRouter.Stop() defer msgRouter.Stop()
openChanMsg := PeerMsg{ openChanMsg := PeerMsg{
@@ -57,7 +59,7 @@ func TestMessageRouterOperation(t *testing.T) {
fundingEndpoint.On("CanHandle", openChanMsg).Return(true) fundingEndpoint.On("CanHandle", openChanMsg).Return(true)
fundingEndpoint.On("CanHandle", errorMsg).Return(false) fundingEndpoint.On("CanHandle", errorMsg).Return(false)
fundingEndpoint.On("CanHandle", commitSigMsg).Return(false) fundingEndpoint.On("CanHandle", commitSigMsg).Return(false)
fundingEndpoint.On("SendMessage", openChanMsg).Return(true) fundingEndpoint.On("SendMessage", ctx, openChanMsg).Return(true)
commitEndpoint := &mockEndpoint{} commitEndpoint := &mockEndpoint{}
commitEndpointName := "commit" commitEndpointName := "commit"
@@ -65,7 +67,7 @@ func TestMessageRouterOperation(t *testing.T) {
commitEndpoint.On("CanHandle", commitSigMsg).Return(true) commitEndpoint.On("CanHandle", commitSigMsg).Return(true)
commitEndpoint.On("CanHandle", openChanMsg).Return(false) commitEndpoint.On("CanHandle", openChanMsg).Return(false)
commitEndpoint.On("CanHandle", errorMsg).Return(false) commitEndpoint.On("CanHandle", errorMsg).Return(false)
commitEndpoint.On("SendMessage", commitSigMsg).Return(true) commitEndpoint.On("SendMessage", ctx, commitSigMsg).Return(true)
t.Run("add endpoints", func(t *testing.T) { t.Run("add endpoints", func(t *testing.T) {
// First, we'll add the funding endpoint to the router. // First, we'll add the funding endpoint to the router.
@@ -113,8 +115,10 @@ func TestMessageRouterOperation(t *testing.T) {
fundingEndpoint.AssertCalled(t, "CanHandle", openChanMsg) fundingEndpoint.AssertCalled(t, "CanHandle", openChanMsg)
commitEndpoint.AssertCalled(t, "CanHandle", openChanMsg) commitEndpoint.AssertCalled(t, "CanHandle", openChanMsg)
fundingEndpoint.AssertCalled(t, "SendMessage", openChanMsg) fundingEndpoint.AssertCalled(t, "SendMessage", ctx, openChanMsg)
commitEndpoint.AssertNotCalled(t, "SendMessage", openChanMsg) commitEndpoint.AssertNotCalled(
t, "SendMessage", ctx, openChanMsg,
)
// We'll do the same for the commit sig message. // We'll do the same for the commit sig message.
require.NoError(t, msgRouter.RouteMsg(commitSigMsg)) require.NoError(t, msgRouter.RouteMsg(commitSigMsg))
@@ -122,8 +126,10 @@ func TestMessageRouterOperation(t *testing.T) {
fundingEndpoint.AssertCalled(t, "CanHandle", commitSigMsg) fundingEndpoint.AssertCalled(t, "CanHandle", commitSigMsg)
commitEndpoint.AssertCalled(t, "CanHandle", commitSigMsg) commitEndpoint.AssertCalled(t, "CanHandle", commitSigMsg)
commitEndpoint.AssertCalled(t, "SendMessage", commitSigMsg) commitEndpoint.AssertCalled(t, "SendMessage", ctx, commitSigMsg)
fundingEndpoint.AssertNotCalled(t, "SendMessage", commitSigMsg) fundingEndpoint.AssertNotCalled(
t, "SendMessage", ctx, commitSigMsg,
)
}) })
t.Run("remove endpoints", func(t *testing.T) { t.Run("remove endpoints", func(t *testing.T) {

View File

@@ -451,12 +451,15 @@ type Config struct {
// TODO(roasbeef): rename to chancloser.Negotiator and chancloser.RBF? // TODO(roasbeef): rename to chancloser.Negotiator and chancloser.RBF?
type chanCloserFsm = fn.Either[*chancloser.ChanCloser, *chancloser.RbfChanCloser] //nolint:ll type chanCloserFsm = fn.Either[*chancloser.ChanCloser, *chancloser.RbfChanCloser] //nolint:ll
// makeNegotiateCloser creates a new negotiate closer from a
// chancloser.ChanCloser.
func makeNegotiateCloser(chanCloser *chancloser.ChanCloser) chanCloserFsm { func makeNegotiateCloser(chanCloser *chancloser.ChanCloser) chanCloserFsm {
return fn.NewLeft[*chancloser.ChanCloser, *chancloser.RbfChanCloser]( return fn.NewLeft[*chancloser.ChanCloser, *chancloser.RbfChanCloser](
chanCloser, chanCloser,
) )
} }
// makeRbfCloser creates a new RBF closer from a chancloser.RbfChanCloser.
func makeRbfCloser(rbfCloser *chancloser.RbfChanCloser) chanCloserFsm { func makeRbfCloser(rbfCloser *chancloser.RbfChanCloser) chanCloserFsm {
return fn.NewRight[*chancloser.ChanCloser]( return fn.NewRight[*chancloser.ChanCloser](
rbfCloser, rbfCloser,
@@ -468,7 +471,6 @@ func makeRbfCloser(rbfCloser *chancloser.RbfChanCloser) chanCloserFsm {
// several helper goroutines to handle events such as HTLC timeouts, new // several helper goroutines to handle events such as HTLC timeouts, new
// funding workflow, and detecting an uncooperative closure of any active // funding workflow, and detecting an uncooperative closure of any active
// channels. // channels.
// TODO(roasbeef): proper reconnection logic.
type Brontide struct { type Brontide struct {
// MUST be used atomically. // MUST be used atomically.
started int32 started int32
@@ -615,7 +617,8 @@ type Brontide struct {
log btclog.Logger log btclog.Logger
} }
// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer interface. // A compile-time check to ensure that Brontide satisfies the lnpeer.Peer
// interface.
var _ lnpeer.Peer = (*Brontide)(nil) var _ lnpeer.Peer = (*Brontide)(nil)
// NewBrontide creates a new Brontide from a peer.Config struct. // NewBrontide creates a new Brontide from a peer.Config struct.
@@ -839,7 +842,7 @@ func (p *Brontide) Start() error {
// Register the message router now as we may need to register some // Register the message router now as we may need to register some
// endpoints while loading the channels below. // endpoints while loading the channels below.
p.msgRouter.WhenSome(func(router msgmux.Router) { p.msgRouter.WhenSome(func(router msgmux.Router) {
router.Start() router.Start(context.Background())
}) })
msgs, err := p.loadActiveChannels(activeChans) msgs, err := p.loadActiveChannels(activeChans)
@@ -1217,6 +1220,12 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
shutdownInfoErr error shutdownInfoErr error
) )
shutdownInfo.WhenSome(func(info channeldb.ShutdownInfo) { shutdownInfo.WhenSome(func(info channeldb.ShutdownInfo) {
// If we can use the new RBF close feature, we don't
// need to create the legacy closer.
if p.rbfCoopCloseAllowed() {
return
}
// Compute an ideal fee. // Compute an ideal fee.
feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW( feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
p.cfg.CoopCloseTargetConfs, p.cfg.CoopCloseTargetConfs,
@@ -1290,6 +1299,39 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
} }
p.activeChannels.Store(chanID, lnChan) p.activeChannels.Store(chanID, lnChan)
// We're using the old co-op close, so we don't need to init
// the new RBF chan closer.
if !p.rbfCoopCloseAllowed() {
continue
}
// Now that the link has been added above, we'll also init an
// RBF chan closer for this channel, but only if the new close
// feature is negotiated.
//
// Creating this here ensures that any shutdown messages sent
// will be automatically routed by the msg router.
if _, err := p.initRbfChanCloser(lnChan); err != nil {
delete(p.activeChanCloses, chanID)
return nil, fmt.Errorf("unable to init RBF chan "+
"closer during peer connect: %w", err)
}
// If the shutdown info isn't blank, then we should kick things
// off by sending a shutdown message to the remote party to
// continue the old shutdown flow.
restartShutdown := func(s channeldb.ShutdownInfo) error {
return p.startRbfChanCloser(
newRestartShutdownInit(s), lnChan,
)
}
err = fn.MapOptionZ(shutdownInfo, restartShutdown)
if err != nil {
return nil, fmt.Errorf("unable to start RBF "+
"chan closer: %w", err)
}
} }
return msgs, nil return msgs, nil
@@ -3223,12 +3265,15 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
if err != nil && err != channeldb.ErrNoCloseTx { if err != nil && err != channeldb.ErrNoCloseTx {
// An error other than ErrNoCloseTx was encountered. // An error other than ErrNoCloseTx was encountered.
return nil, err return nil, err
} else if err == nil { } else if err == nil && !p.rbfCoopCloseAllowed() {
// This channel has already completed the coop close // This is a channel that doesn't support RBF coop close, and it
// negotiation. // already had a coop close txn broadcast. As a result, we can
// just exit here as all we can do is wait for it to confirm.
return nil, nil return nil, nil
} }
chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint)
var deliveryScript []byte var deliveryScript []byte
shutdownInfo, err := c.ShutdownInfo() shutdownInfo, err := c.ShutdownInfo()
@@ -3258,6 +3303,27 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
} }
} }
// If the new RBF co-op close is negotiated, then we'll init and start
// that state machine, skipping the steps for the negotiate machine
// below.
if p.rbfCoopCloseAllowed() {
_, err := p.initRbfChanCloser(lnChan)
if err != nil {
return nil, fmt.Errorf("unable to init rbf chan "+
"closer during restart: %w", err)
}
shutdownDesc := fn.MapOption(
newRestartShutdownInit,
)(shutdownInfo)
err = p.startRbfChanCloser(
fn.FlattenOption(shutdownDesc), lnChan,
)
return nil, err
}
// Compute an ideal fee. // Compute an ideal fee.
feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW( feePerKw, err := p.cfg.FeeEstimator.EstimateFeePerKW(
p.cfg.CoopCloseTargetConfs, p.cfg.CoopCloseTargetConfs,
@@ -3289,7 +3355,6 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
// This does not need a mutex even though it is in a different // This does not need a mutex even though it is in a different
// goroutine since this is done before the channelManager goroutine is // goroutine since this is done before the channelManager goroutine is
// created. // created.
chanID := lnwire.NewChanIDFromOutPoint(c.FundingOutpoint)
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser) p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
// Create the Shutdown message. // Create the Shutdown message.
@@ -3430,6 +3495,8 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose,
return nil return nil
} }
// chooseAddr returns the provided address if it is non-zero length, otherwise
// None.
func chooseAddr(addr lnwire.DeliveryAddress) fn.Option[lnwire.DeliveryAddress] { func chooseAddr(addr lnwire.DeliveryAddress) fn.Option[lnwire.DeliveryAddress] {
if len(addr) == 0 { if len(addr) == 0 {
return fn.None[lnwire.DeliveryAddress]() return fn.None[lnwire.DeliveryAddress]()
@@ -3438,6 +3505,101 @@ func chooseAddr(addr lnwire.DeliveryAddress) fn.Option[lnwire.DeliveryAddress] {
return fn.Some(addr) return fn.Some(addr)
} }
// observeRbfCloseUpdates observes the channel for any updates that may
// indicate that a new txid has been broadcasted, or the channel fully closed
// on chain.
func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
closeReq *htlcswitch.ChanClose) {
coopCloseStates := chanCloser.RegisterStateEvents()
defer chanCloser.RemoveStateSub(coopCloseStates)
newStateChan := coopCloseStates.NewItemCreated.ChanOut()
var lastLocalTxid, lastRemoteTxid chainhash.Hash
maybeNotifyTxBroadcast := func(state chancloser.AsymmetricPeerState,
local bool) {
closePending, ok := state.(*chancloser.ClosePending)
// If this isn't the close pending state, we aren't at the
// terminal state yet.
if !ok {
return
}
lastTxid := lastLocalTxid
if !local {
lastTxid = lastRemoteTxid
}
// Otherwise, we'll have a txid that we can use to notify the
// client, but only if it's different from the last one we
// sent.
closingTxid := closePending.CloseTx.TxHash()
if closeReq != nil && closingTxid != lastTxid {
closeReq.Updates <- &PendingUpdate{
Txid: closingTxid[:],
}
}
}
// We'll consume each new incoming state to send out the appropriate
// RPC update.
for {
select {
case newState := <-newStateChan:
switch closeState := newState.(type) {
// Once we've reached the state of pending close, we
// have a txid that we broadcasted.
case *chancloser.ClosingNegotiation:
peerState := closeState.PeerState
// Each side may have gained a new co-op close
// tx, so we'll examine both to see if they've
// changed.
maybeNotifyTxBroadcast(
peerState.GetForParty(lntypes.Local),
true,
)
maybeNotifyTxBroadcast(
peerState.GetForParty(lntypes.Remote),
false,
)
// Otherwise, if we're transition to CloseFin, then we
// know that we're done.
case *chancloser.CloseFin:
// To clean up, we'll remove the chan closer
// from the active map, and send the final
// update to the client.
closingTxid := closeState.ConfirmedTx.TxHash()
if closeReq != nil {
closeReq.Updates <- &ChannelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
}
}
// TODO(roasbeef): race, make to sync map?
chanID := lnwire.NewChanIDFromOutPoint(
*closeReq.ChanPoint,
)
delete(p.activeChanCloses, chanID)
}
case <-closeReq.Ctx.Done():
return
case <-p.cg.Done():
return
}
}
}
// chanErrorReporter is a simple implementation of the // chanErrorReporter is a simple implementation of the
// chancloser.ErrorReporter. This is bound to a single channel by the channel // chancloser.ErrorReporter. This is bound to a single channel by the channel
// ID. // ID.
@@ -3502,6 +3664,66 @@ func (c *chanErrorReporter) ReportError(chanErr error) {
} }
} }
// chanFlushEventSentinel is used to send the RBF coop close state machine the
// channel flushed event. We'll wait until the state machine enters the
// ChannelFlushing state, then request the link to send the event once flushed.
//
// NOTE: This MUST be run as a goroutine.
func (p *Brontide) chanFlushEventSentinel(chanCloser *chancloser.RbfChanCloser,
link htlcswitch.ChannelUpdateHandler,
channel *lnwallet.LightningChannel) {
defer p.cg.WgDone()
// If there's no link, then the channel has already been flushed, so we
// don't need to continue.
if link == nil {
return
}
coopCloseStates := chanCloser.RegisterStateEvents()
defer chanCloser.RemoveStateSub(coopCloseStates)
newStateChan := coopCloseStates.NewItemCreated.ChanOut()
sendChanFlushed := func() {
chanState := channel.StateSnapshot()
peerLog.Infof("ChannelPoint(%v) has been flushed for co-op "+
"close, sending event to chan closer",
channel.ChannelPoint())
chanBalances := chancloser.ShutdownBalances{
LocalBalance: chanState.LocalBalance,
RemoteBalance: chanState.RemoteBalance,
}
ctx := context.Background()
chanCloser.SendEvent(ctx, &chancloser.ChannelFlushed{
ShutdownBalances: chanBalances,
FreshFlush: true,
})
}
// We'll wait until the channel enters the ChannelFlushing state. We
// exit after a success loop. As after the first RBF iteration, the
// channel will always be flushed.
for newState := range newStateChan {
if _, ok := newState.(*chancloser.ChannelFlushing); ok {
peerLog.Infof("ChannelPoint(%v): rbf coop "+
"close is awaiting a flushed state, "+
"registering with link..., ",
channel.ChannelPoint())
// Request the link to send the event once the channel
// is flushed. We only need this event sent once, so we
// can exit now.
link.OnFlushedOnce(sendChanFlushed)
return
}
}
}
// initRbfChanCloser initializes the channel closer for a channel that // initRbfChanCloser initializes the channel closer for a channel that
// is using the new RBF based co-op close protocol. This only creates the chan // is using the new RBF based co-op close protocol. This only creates the chan
// closer, but doesn't attempt to trigger any manual state transitions. // closer, but doesn't attempt to trigger any manual state transitions.
@@ -3590,49 +3812,162 @@ func (p *Brontide) initRbfChanCloser(
), ),
} }
ctx := context.Background()
chanCloser := protofsm.NewStateMachine(protoCfg) chanCloser := protofsm.NewStateMachine(protoCfg)
chanCloser.Start(ctx)
// Finally, we'll register this new endpoint with the message router so
// future co-op close messages are handled by this state machine.
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
return r.RegisterEndpoint(&chanCloser)
})
if err != nil {
chanCloser.Stop()
delete(p.activeChanCloses, chanID)
return nil, fmt.Errorf("unable to register endpoint for co-op "+
"close: %w", err)
}
p.activeChanCloses[chanID] = makeRbfCloser(&chanCloser) p.activeChanCloses[chanID] = makeRbfCloser(&chanCloser)
// Now that we've created the channel state machine, we'll register for // Now that we've created the rbf closer state machine, we'll launch a
// a hook to be sent once the channel has been flushed. // new goroutine to eventually send in the ChannelFlushed event once
link.OnFlushedOnce(func() { // needed.
commitState := channel.StateSnapshot() p.cg.WgAdd(1)
go p.chanFlushEventSentinel(&chanCloser, link, channel)
ctx := context.Background()
chanCloser.SendEvent(ctx, &chancloser.ChannelFlushed{
ShutdownBalances: chancloser.ShutdownBalances{
LocalBalance: commitState.LocalBalance,
RemoteBalance: commitState.RemoteBalance,
},
})
})
return &chanCloser, nil return &chanCloser, nil
} }
// initAndStartRbfChanCloser initializes the channel closer for a channel that // shutdownInit describes the two ways we can initiate a new shutdown. Either we
// is using the new RBF based co-op close protocol. This is called when we're // got an RPC request to do so (left), or we sent a shutdown message to the
// the one that's initiating the cooperative channel close. // party (for w/e reason), but crashed before the close was complete.
func (p *Brontide) initAndStartRbfChanCloser(req *htlcswitch.ChanClose, //
//nolint:ll
type shutdownInit = fn.Option[fn.Either[*htlcswitch.ChanClose, channeldb.ShutdownInfo]]
// shutdownStartFeeRate returns the fee rate that should be used for the
// shutdown. This returns a doubly wrapped option as the shutdown info might
// be none, and the fee rate is only defined for the user initiated shutdown.
func shutdownStartFeeRate(s shutdownInit) fn.Option[chainfee.SatPerKWeight] {
feeRateOpt := fn.MapOption(func(init fn.Either[*htlcswitch.ChanClose,
channeldb.ShutdownInfo]) fn.Option[chainfee.SatPerKWeight] {
var feeRate fn.Option[chainfee.SatPerKWeight]
init.WhenLeft(func(req *htlcswitch.ChanClose) {
feeRate = fn.Some(req.TargetFeePerKw)
})
return feeRate
})(s)
return fn.FlattenOption(feeRateOpt)
}
// shutdownStartAddr returns the delivery address that should be used when
// restarting the shutdown process. If we didn't send a shutdown before we
// restarted, and the user didn't initiate one either, then None is returned.
func shutdownStartAddr(s shutdownInit) fn.Option[lnwire.DeliveryAddress] {
addrOpt := fn.MapOption(func(init fn.Either[*htlcswitch.ChanClose,
channeldb.ShutdownInfo]) fn.Option[lnwire.DeliveryAddress] {
var addr fn.Option[lnwire.DeliveryAddress]
init.WhenLeft(func(req *htlcswitch.ChanClose) {
if len(req.DeliveryScript) != 0 {
addr = fn.Some(req.DeliveryScript)
}
})
init.WhenRight(func(info channeldb.ShutdownInfo) {
addr = fn.Some(info.DeliveryScript.Val)
})
return addr
})(s)
return fn.FlattenOption(addrOpt)
}
// whenRpcShutdown registers a callback to be executed when the shutdown init
// type is and RPC request.
func whenRpcShutdown(s shutdownInit, f func(r *htlcswitch.ChanClose)) {
s.WhenSome(func(init fn.Either[*htlcswitch.ChanClose,
channeldb.ShutdownInfo]) {
init.WhenLeft(f)
})
}
// newRestartShutdownInit creates a new shutdownInit for the case where we need
// to restart the shutdown flow after a restart.
func newRestartShutdownInit(info channeldb.ShutdownInfo) shutdownInit {
return fn.Some(fn.NewRight[*htlcswitch.ChanClose](info))
}
// newRPCShutdownInit creates a new shutdownInit for the case where we
// initiated the shutdown via an RPC client.
func newRPCShutdownInit(req *htlcswitch.ChanClose) shutdownInit {
return fn.Some(
fn.NewLeft[*htlcswitch.ChanClose, channeldb.ShutdownInfo](req),
)
}
// startRbfChanCloser kicks off the co-op close process using the new RBF based
// co-op close protocol. This is called when we're the one that's initiating
// the cooperative channel close.
//
// TODO(roasbeef): just accept the two shutdown pointer params instead??
func (p *Brontide) startRbfChanCloser(shutdown shutdownInit,
channel *lnwallet.LightningChannel) error { channel *lnwallet.LightningChannel) error {
// TODO(roasbeef): either kick off sent shutdown or shutdown recv'd // Unlike the old negotiate chan closer, we'll always create the RBF
// * can also send the NoDangling in as new event? // chan closer on startup, so we can skip init here.
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint())
// First, we'll create the channel closer for this channel. chanCloser, found := p.activeChanCloses[chanID]
chanCloser, err := p.initRbfChanCloser(channel) if !found {
if err != nil { return fmt.Errorf("rbf can closer not found for channel %v",
return err channel.ChannelPoint())
} }
// With the chan closer created, we'll now kick off the co-op close defaultFeePerKw, err := shutdownStartFeeRate(
// process by instructing it to send a shutdown message to the remote shutdown,
// party. ).UnwrapOrFuncErr(func() (chainfee.SatPerKWeight, error) {
ctx := context.Background() return p.cfg.FeeEstimator.EstimateFeePerKW(
chanCloser.SendEvent(ctx, &chancloser.SendShutdown{ p.cfg.CoopCloseTargetConfs,
IdealFeeRate: req.TargetFeePerKw.FeePerVByte(), )
DeliveryAddr: chooseAddr(req.DeliveryScript), })
if err != nil {
return fmt.Errorf("unable to estimate fee: %w", err)
}
chanCloser.WhenRight(func(rbfCloser *chancloser.RbfChanCloser) {
peerLog.Infof("ChannelPoint(%v): rbf-coop close requested, "+
"sending shutdown", channel.ChannelPoint())
// With the chan closer created, we'll now kick off the co-op
// close process by instructing it to send a shutdown message
// to the remote party.
ctx := context.Background()
rbfCloser.SendEvent(ctx, &chancloser.SendShutdown{
IdealFeeRate: defaultFeePerKw.FeePerVByte(),
DeliveryAddr: shutdownStartAddr(shutdown),
})
// Now that the channel is active, we'll launch a goroutine to
// watch for the final terminal state to send updates to the
// RPC client. We only need to do this if there's an RPC
// caller.
//
// TODO(roasbeef): make into a do once? otherwise new one for
// each RBF loop.
whenRpcShutdown(shutdown, func(req *htlcswitch.ChanClose) {
p.cg.WgAdd(1)
go func() {
defer p.cg.WgDone()
p.observeRbfCloseUpdates(rbfCloser, req)
}()
})
}) })
return nil return nil
@@ -3662,8 +3997,14 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
case contractcourt.CloseRegular: case contractcourt.CloseRegular:
var err error var err error
switch { switch {
// If this is the RBF coop state machine, then we'll instruct
// it to send the shutdown message. This also might be an RBF
// iteration, in which case we'll be obtaining a new
// transaction w/ a higher fee rate.
case p.rbfCoopCloseAllowed(): case p.rbfCoopCloseAllowed():
err = p.initAndStartRbfChanCloser(req, channel) err = p.startRbfChanCloser(
newRPCShutdownInit(req), channel,
)
default: default:
err = p.initNegotiateChanCloser(req, channel) err = p.initNegotiateChanCloser(req, channel)
} }
@@ -3815,7 +4156,7 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
// Also clear the activeChanCloses map of this channel. // Also clear the activeChanCloses map of this channel.
cid := lnwire.NewChanIDFromOutPoint(chanPoint) cid := lnwire.NewChanIDFromOutPoint(chanPoint)
delete(p.activeChanCloses, cid) delete(p.activeChanCloses, cid) // TODO(roasbeef): existing race
// Next, we'll launch a goroutine which will request to be notified by // Next, we'll launch a goroutine which will request to be notified by
// the ChainNotifier once the closure transaction obtains a single // the ChainNotifier once the closure transaction obtains a single
@@ -4260,11 +4601,12 @@ func (p *Brontide) StartTime() time.Time {
func (p *Brontide) handleCloseMsg(msg *closeMsg) { func (p *Brontide) handleCloseMsg(msg *closeMsg) {
link := p.fetchLinkFromKeyAndCid(msg.cid) link := p.fetchLinkFromKeyAndCid(msg.cid)
// We'll now fetch the matching closing state machine in order to continue, // We'll now fetch the matching closing state machine in order to
// or finalize the channel closure process. // continue, or finalize the channel closure process.
chanCloserE, err := p.fetchActiveChanCloser(msg.cid) chanCloserE, err := p.fetchActiveChanCloser(msg.cid)
if err != nil { if err != nil {
// If the channel is not known to us, we'll simply ignore this message. // If the channel is not known to us, we'll simply ignore this
// message.
if err == ErrChannelNotFound { if err == ErrChannelNotFound {
return return
} }
@@ -4618,6 +4960,25 @@ func (p *Brontide) addActiveChannel(c *lnpeer.NewChannel) error {
"peer", chanPoint) "peer", chanPoint)
} }
// We're using the old co-op close, so we don't need to init the new
// RBF chan closer.
if !p.rbfCoopCloseAllowed() {
return nil
}
// Now that the link has been added above, we'll also init an RBF chan
// closer for this channel, but only if the new close feature is
// negotiated.
//
// Creating this here ensures that any shutdown messages sent will be
// automatically routed by the msg router.
if _, err := p.initRbfChanCloser(lnChan); err != nil {
delete(p.activeChanCloses, chanID)
return fmt.Errorf("unable to init RBF chan closer for new "+
"chan: %w", err)
}
return nil return nil
} }

View File

@@ -68,8 +68,6 @@ type chanObserver struct {
// newChanObserver creates a new instance of a chanObserver from an active // newChanObserver creates a new instance of a chanObserver from an active
// channelView. // channelView.
//
//nolint:unused
func newChanObserver(chanView channelView, func newChanObserver(chanView channelView,
link linkController, linkNetwork linkNetworkController) *chanObserver { link linkController, linkNetwork linkNetworkController) *chanObserver {

View File

@@ -23,8 +23,6 @@ type MessageSender interface {
// flexMessageSender is a message sender-like interface that is aware of // flexMessageSender is a message sender-like interface that is aware of
// sync/async semantics, and is bound to a single peer. // sync/async semantics, and is bound to a single peer.
//
//nolint:unused
type flexMessageSender interface { type flexMessageSender interface {
// SendMessage sends a variadic number of high-priority messages to the // SendMessage sends a variadic number of high-priority messages to the
// remote peer. The first argument denotes if the method should block // remote peer. The first argument denotes if the method should block
@@ -36,16 +34,12 @@ type flexMessageSender interface {
// peerMsgSender implements the MessageSender interface for a single peer. // peerMsgSender implements the MessageSender interface for a single peer.
// It'll return an error if the target public isn't equal to public key of the // It'll return an error if the target public isn't equal to public key of the
// backing peer. // backing peer.
//
//nolint:unused
type peerMsgSender struct { type peerMsgSender struct {
sender flexMessageSender sender flexMessageSender
peerPub btcec.PublicKey peerPub btcec.PublicKey
} }
// newPeerMsgSender creates a new instance of a peerMsgSender. // newPeerMsgSender creates a new instance of a peerMsgSender.
//
//nolint:unused
func newPeerMsgSender(peerPub btcec.PublicKey, func newPeerMsgSender(peerPub btcec.PublicKey,
msgSender flexMessageSender) *peerMsgSender { msgSender flexMessageSender) *peerMsgSender {
@@ -59,8 +53,6 @@ func newPeerMsgSender(peerPub btcec.PublicKey,
// //
// TODO(roasbeef): current impl bound to single peer, need server pointer // TODO(roasbeef): current impl bound to single peer, need server pointer
// otherwise? // otherwise?
//
//nolint:unused
func (p *peerMsgSender) SendMessages(pub btcec.PublicKey, func (p *peerMsgSender) SendMessages(pub btcec.PublicKey,
msgs []lnwire.Message) error { msgs []lnwire.Message) error {