peer: update rbf close client logic w/ error and iteration awareness

We'll properly handle a protocol error due to user input by halting, and
sending the error back to the user.

When a user goes to issue a new update, based on which state we're in,
we'll either kick off the shutdown, or attempt a new offer. This matches
the new spec update where we'll only send `Shutdown` once per
connection.
This commit is contained in:
Olaoluwa Osuntokun 2025-02-07 18:25:07 -08:00
parent fbc67f7610
commit 80b48fb49b
2 changed files with 141 additions and 63 deletions

View File

@ -959,3 +959,7 @@ type RbfState = protofsm.State[ProtocolEvent, *Environment]
// RbfEvent is a type alias for the event type of the RBF channel closer.
type RbfEvent = protofsm.EmittedEvent[ProtocolEvent]
// RbfStateSub is a type alias for the state subscription type of the RBF chan
// closer.
type RbfStateSub = protofsm.StateSubscriber[ProtocolEvent, *Environment]

View File

@ -1338,7 +1338,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// continue the old shutdown flow.
restartShutdown := func(s channeldb.ShutdownInfo) error {
return p.startRbfChanCloser(
newRestartShutdownInit(s), lnChan,
newRestartShutdownInit(s),
lnChan.ChannelPoint(),
)
}
err = fn.MapOptionZ(shutdownInfo, restartShutdown)
@ -3332,7 +3333,7 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
)(shutdownInfo)
err = p.startRbfChanCloser(
fn.FlattenOption(shutdownDesc), lnChan,
fn.FlattenOption(shutdownDesc), lnChan.ChannelPoint(),
)
return nil, err
@ -3366,9 +3367,6 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
return nil, fmt.Errorf("unable to create chan closer")
}
// This does not need a mutex even though it is in a different
// goroutine since this is done before the channelManager goroutine is
// created.
p.activeChanCloses.Store(chanID, makeNegotiateCloser(chanCloser))
// Create the Shutdown message.
@ -3523,20 +3521,40 @@ func chooseAddr(addr lnwire.DeliveryAddress) fn.Option[lnwire.DeliveryAddress] {
// indicate that a new txid has been broadcasted, or the channel fully closed
// on chain.
func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
closeReq *htlcswitch.ChanClose) {
coopCloseStates := chanCloser.RegisterStateEvents()
defer chanCloser.RemoveStateSub(coopCloseStates)
closeReq *htlcswitch.ChanClose,
coopCloseStates chancloser.RbfStateSub) {
newStateChan := coopCloseStates.NewItemCreated.ChanOut()
defer chanCloser.RemoveStateSub(coopCloseStates)
var (
lastLocalTxid, lastRemoteTxid chainhash.Hash
lastFeeRate chainfee.SatPerVByte
lastTxids lntypes.Dual[chainhash.Hash]
lastFeeRates lntypes.Dual[chainfee.SatPerVByte]
)
maybeNotifyTxBroadcast := func(state chancloser.AsymmetricPeerState,
local bool) {
party lntypes.ChannelParty) {
// First, check to see if we have an error to report to the
// caller. If so, then we''ll return that error and exit, as the
// stream will exit as well.
if closeErr, ok := state.(*chancloser.CloseErr); ok {
// We hit an error during the last state transition, so
// we'll extract the error then send it to the
// user.
err := closeErr.Err()
peerLog.Warnf("ChannelPoint(%v): encountered close "+
"err: %v", closeReq.ChanPoint, err)
select {
case closeReq.Err <- err:
case <-closeReq.Ctx.Done():
case <-p.cg.Done():
}
return
}
closePending, ok := state.(*chancloser.ClosePending)
@ -3547,37 +3565,50 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
}
// Only notify if the fee rate is greater.
if closePending.FeeRate <= lastFeeRate {
newFeeRate := closePending.FeeRate
lastFeeRate := lastFeeRates.GetForParty(party)
if newFeeRate <= lastFeeRate {
peerLog.Debugf("ChannelPoint(%v): remote party made "+
"update for fee rate %v, but we already have "+
"a higher fee rate of %v", closeReq.ChanPoint,
newFeeRate, lastFeeRate)
return
}
lastFeeRate = closePending.FeeRate
feeRate := closePending.FeeRate
lastFeeRates.SetForParty(party, feeRate)
// We'll also only notify if the transaction was actually able
// to enter the mempool.
err := p.cfg.Wallet.PublishTransaction(closePending.CloseTx, "")
if err != nil {
return
}
lastTxid := lastLocalTxid
if !local {
lastTxid = lastRemoteTxid
}
// Otherwise, we'll have a txid that we can use to notify the
// client, but only if it's different from the last one we
// sent.
// At this point, we'll have a txid that we can use to notify
// the client, but only if it's different from the last one we
// sent. If the user attempted to bump, but was rejected due to
// RBF, then we'll send a redundant update.
closingTxid := closePending.CloseTx.TxHash()
lastTxid := lastTxids.GetForParty(party)
if closeReq != nil && closingTxid != lastTxid {
closeReq.Updates <- &PendingUpdate{
Txid: closingTxid[:],
FeePerVbyte: fn.Some(closePending.FeeRate),
IsLocalCloseTx: fn.Some(local),
select {
case closeReq.Updates <- &PendingUpdate{
Txid: closingTxid[:],
FeePerVbyte: fn.Some(closePending.FeeRate),
IsLocalCloseTx: fn.Some(
party == lntypes.Local,
),
}:
case <-closeReq.Ctx.Done():
return
case <-p.cg.Done():
return
}
}
lastTxids.SetForParty(party, closingTxid)
}
peerLog.Infof("Observing RBF close updates for channel %v",
closeReq.ChanPoint)
// We'll consume each new incoming state to send out the appropriate
// RPC update.
for {
@ -3595,11 +3626,11 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
// changed.
maybeNotifyTxBroadcast(
peerState.GetForParty(lntypes.Local),
true,
lntypes.Local,
)
maybeNotifyTxBroadcast(
peerState.GetForParty(lntypes.Remote),
false,
lntypes.Remote,
)
// Otherwise, if we're transition to CloseFin, then we
@ -3615,9 +3646,6 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
Success: true,
}
}
// TODO(roasbeef): race, make to sync map?
// other clean up?
chanID := lnwire.NewChanIDFromOutPoint(
*closeReq.ChanPoint,
)
@ -3791,7 +3819,9 @@ func (p *Brontide) initRbfChanCloser(
peerPub := *p.IdentityKey()
msgMapper := chancloser.NewRbfMsgMapper(uint32(startingHeight), chanID, peerPub)
msgMapper := chancloser.NewRbfMsgMapper(
uint32(startingHeight), chanID, peerPub,
)
initialState := chancloser.ChannelActive{}
@ -3927,9 +3957,9 @@ func shutdownStartAddr(s shutdownInit) fn.Option[lnwire.DeliveryAddress] {
return fn.FlattenOption(addrOpt)
}
// whenRpcShutdown registers a callback to be executed when the shutdown init
// whenRPCShutdown registers a callback to be executed when the shutdown init
// type is and RPC request.
func whenRpcShutdown(s shutdownInit, f func(r *htlcswitch.ChanClose)) {
func whenRPCShutdown(s shutdownInit, f func(r *htlcswitch.ChanClose)) {
s.WhenSome(func(init fn.Either[*htlcswitch.ChanClose,
channeldb.ShutdownInfo]) {
@ -3957,15 +3987,15 @@ func newRPCShutdownInit(req *htlcswitch.ChanClose) shutdownInit {
//
// TODO(roasbeef): just accept the two shutdown pointer params instead??
func (p *Brontide) startRbfChanCloser(shutdown shutdownInit,
channel *lnwallet.LightningChannel) error {
chanPoint wire.OutPoint) error {
// Unlike the old negotiate chan closer, we'll always create the RBF
// chan closer on startup, so we can skip init here.
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint())
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
chanCloser, found := p.activeChanCloses.Load(chanID)
if !found {
return fmt.Errorf("rbf can closer not found for channel %v",
channel.ChannelPoint())
chanPoint)
}
defaultFeePerKw, err := shutdownStartFeeRate(
@ -3981,32 +4011,76 @@ func (p *Brontide) startRbfChanCloser(shutdown shutdownInit,
chanCloser.WhenRight(func(rbfCloser *chancloser.RbfChanCloser) {
peerLog.Infof("ChannelPoint(%v): rbf-coop close requested, "+
"sending shutdown", channel.ChannelPoint())
"sending shutdown", chanPoint)
// With the chan closer created, we'll now kick off the co-op
// close process by instructing it to send a shutdown message
// to the remote party.
ctx := context.Background()
rbfCloser.SendEvent(ctx, &chancloser.SendShutdown{
IdealFeeRate: defaultFeePerKw.FeePerVByte(),
DeliveryAddr: shutdownStartAddr(shutdown),
})
rbfState, err := rbfCloser.CurrentState()
if err != nil {
peerLog.Warnf("ChannelPoint(%v): unable to get "+
"current state for rbf-coop close: %v",
chanPoint, err)
return
}
coopCloseStates := rbfCloser.RegisterStateEvents()
// Before we send our event below, we'll launch a goroutine to
// watch for the final terminal state to send updates to the RPC
// client. We only need to do this if there's an RPC caller.
var rpcShutdown bool
whenRPCShutdown(shutdown, func(req *htlcswitch.ChanClose) {
rpcShutdown = true
// Now that the channel is active, we'll launch a goroutine to
// watch for the final terminal state to send updates to the
// RPC client. We only need to do this if there's an RPC
// caller.
//
// TODO(roasbeef): make into a do once? otherwise new one for
// each RBF loop.
whenRpcShutdown(shutdown, func(req *htlcswitch.ChanClose) {
p.cg.WgAdd(1)
go func() {
defer p.cg.WgDone()
p.observeRbfCloseUpdates(rbfCloser, req)
p.observeRbfCloseUpdates(
rbfCloser, req, coopCloseStates,
)
}()
})
if !rpcShutdown {
defer rbfCloser.RemoveStateSub(coopCloseStates)
}
ctx, _ := p.cg.Create(context.Background())
feeRate := defaultFeePerKw.FeePerVByte()
// Depending on the state of the state machine, we'll either
// kick things off by sending shutdown, or attempt to send a new
// offer to the remote party.
switch rbfState.(type) {
// The channel is still active, so we'll now kick off the co-op
// close process by instructing it to send a shutdown message to
// the remote party.
case *chancloser.ChannelActive:
rbfCloser.SendEvent(
context.Background(),
&chancloser.SendShutdown{
IdealFeeRate: feeRate,
DeliveryAddr: shutdownStartAddr(
shutdown,
),
},
)
// If we haven't yet sent an offer (didn't have enough funds at
// the prior fee rate), or we've sent an offer, then we'll
// trigger a new offer event.
case *chancloser.ClosingNegotiation:
event := chancloser.ProtocolEvent(
&chancloser.SendOfferEvent{
TargetFeeRate: feeRate,
},
)
rbfCloser.SendEvent(ctx, event)
default:
peerLog.Warnf("ChannelPoint(%v): unexpected state "+
"for rbf-coop close: %T", chanPoint, rbfState)
}
})
return nil
@ -4042,7 +4116,7 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
// transaction w/ a higher fee rate.
case p.rbfCoopCloseAllowed():
err = p.startRbfChanCloser(
newRPCShutdownInit(req), channel,
newRPCShutdownInit(req), channel.ChannelPoint(),
)
default:
err = p.initNegotiateChanCloser(req, channel)