diff --git a/lnwallet/chancloser/rbf_coop_states.go b/lnwallet/chancloser/rbf_coop_states.go index a195334f2..86de10c9f 100644 --- a/lnwallet/chancloser/rbf_coop_states.go +++ b/lnwallet/chancloser/rbf_coop_states.go @@ -959,3 +959,7 @@ type RbfState = protofsm.State[ProtocolEvent, *Environment] // RbfEvent is a type alias for the event type of the RBF channel closer. type RbfEvent = protofsm.EmittedEvent[ProtocolEvent] + +// RbfStateSub is a type alias for the state subscription type of the RBF chan +// closer. +type RbfStateSub = protofsm.StateSubscriber[ProtocolEvent, *Environment] diff --git a/peer/brontide.go b/peer/brontide.go index 7023707d8..09c3018bd 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1338,7 +1338,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) ( // continue the old shutdown flow. restartShutdown := func(s channeldb.ShutdownInfo) error { return p.startRbfChanCloser( - newRestartShutdownInit(s), lnChan, + newRestartShutdownInit(s), + lnChan.ChannelPoint(), ) } err = fn.MapOptionZ(shutdownInfo, restartShutdown) @@ -3332,7 +3333,7 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) ( )(shutdownInfo) err = p.startRbfChanCloser( - fn.FlattenOption(shutdownDesc), lnChan, + fn.FlattenOption(shutdownDesc), lnChan.ChannelPoint(), ) return nil, err @@ -3366,9 +3367,6 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) ( return nil, fmt.Errorf("unable to create chan closer") } - // This does not need a mutex even though it is in a different - // goroutine since this is done before the channelManager goroutine is - // created. p.activeChanCloses.Store(chanID, makeNegotiateCloser(chanCloser)) // Create the Shutdown message. @@ -3523,20 +3521,40 @@ func chooseAddr(addr lnwire.DeliveryAddress) fn.Option[lnwire.DeliveryAddress] { // indicate that a new txid has been broadcasted, or the channel fully closed // on chain. func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser, - closeReq *htlcswitch.ChanClose) { - - coopCloseStates := chanCloser.RegisterStateEvents() - defer chanCloser.RemoveStateSub(coopCloseStates) + closeReq *htlcswitch.ChanClose, + coopCloseStates chancloser.RbfStateSub) { newStateChan := coopCloseStates.NewItemCreated.ChanOut() + defer chanCloser.RemoveStateSub(coopCloseStates) var ( - lastLocalTxid, lastRemoteTxid chainhash.Hash - lastFeeRate chainfee.SatPerVByte + lastTxids lntypes.Dual[chainhash.Hash] + lastFeeRates lntypes.Dual[chainfee.SatPerVByte] ) maybeNotifyTxBroadcast := func(state chancloser.AsymmetricPeerState, - local bool) { + party lntypes.ChannelParty) { + + // First, check to see if we have an error to report to the + // caller. If so, then we''ll return that error and exit, as the + // stream will exit as well. + if closeErr, ok := state.(*chancloser.CloseErr); ok { + // We hit an error during the last state transition, so + // we'll extract the error then send it to the + // user. + err := closeErr.Err() + + peerLog.Warnf("ChannelPoint(%v): encountered close "+ + "err: %v", closeReq.ChanPoint, err) + + select { + case closeReq.Err <- err: + case <-closeReq.Ctx.Done(): + case <-p.cg.Done(): + } + + return + } closePending, ok := state.(*chancloser.ClosePending) @@ -3547,37 +3565,50 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser, } // Only notify if the fee rate is greater. - if closePending.FeeRate <= lastFeeRate { + newFeeRate := closePending.FeeRate + lastFeeRate := lastFeeRates.GetForParty(party) + if newFeeRate <= lastFeeRate { + peerLog.Debugf("ChannelPoint(%v): remote party made "+ + "update for fee rate %v, but we already have "+ + "a higher fee rate of %v", closeReq.ChanPoint, + newFeeRate, lastFeeRate) + return } - lastFeeRate = closePending.FeeRate + feeRate := closePending.FeeRate + lastFeeRates.SetForParty(party, feeRate) - // We'll also only notify if the transaction was actually able - // to enter the mempool. - err := p.cfg.Wallet.PublishTransaction(closePending.CloseTx, "") - if err != nil { - return - } - - lastTxid := lastLocalTxid - if !local { - lastTxid = lastRemoteTxid - } - - // Otherwise, we'll have a txid that we can use to notify the - // client, but only if it's different from the last one we - // sent. + // At this point, we'll have a txid that we can use to notify + // the client, but only if it's different from the last one we + // sent. If the user attempted to bump, but was rejected due to + // RBF, then we'll send a redundant update. closingTxid := closePending.CloseTx.TxHash() + lastTxid := lastTxids.GetForParty(party) if closeReq != nil && closingTxid != lastTxid { - closeReq.Updates <- &PendingUpdate{ - Txid: closingTxid[:], - FeePerVbyte: fn.Some(closePending.FeeRate), - IsLocalCloseTx: fn.Some(local), + select { + case closeReq.Updates <- &PendingUpdate{ + Txid: closingTxid[:], + FeePerVbyte: fn.Some(closePending.FeeRate), + IsLocalCloseTx: fn.Some( + party == lntypes.Local, + ), + }: + + case <-closeReq.Ctx.Done(): + return + + case <-p.cg.Done(): + return } } + + lastTxids.SetForParty(party, closingTxid) } + peerLog.Infof("Observing RBF close updates for channel %v", + closeReq.ChanPoint) + // We'll consume each new incoming state to send out the appropriate // RPC update. for { @@ -3595,11 +3626,11 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser, // changed. maybeNotifyTxBroadcast( peerState.GetForParty(lntypes.Local), - true, + lntypes.Local, ) maybeNotifyTxBroadcast( peerState.GetForParty(lntypes.Remote), - false, + lntypes.Remote, ) // Otherwise, if we're transition to CloseFin, then we @@ -3615,9 +3646,6 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser, Success: true, } } - - // TODO(roasbeef): race, make to sync map? - // other clean up? chanID := lnwire.NewChanIDFromOutPoint( *closeReq.ChanPoint, ) @@ -3791,7 +3819,9 @@ func (p *Brontide) initRbfChanCloser( peerPub := *p.IdentityKey() - msgMapper := chancloser.NewRbfMsgMapper(uint32(startingHeight), chanID, peerPub) + msgMapper := chancloser.NewRbfMsgMapper( + uint32(startingHeight), chanID, peerPub, + ) initialState := chancloser.ChannelActive{} @@ -3927,9 +3957,9 @@ func shutdownStartAddr(s shutdownInit) fn.Option[lnwire.DeliveryAddress] { return fn.FlattenOption(addrOpt) } -// whenRpcShutdown registers a callback to be executed when the shutdown init +// whenRPCShutdown registers a callback to be executed when the shutdown init // type is and RPC request. -func whenRpcShutdown(s shutdownInit, f func(r *htlcswitch.ChanClose)) { +func whenRPCShutdown(s shutdownInit, f func(r *htlcswitch.ChanClose)) { s.WhenSome(func(init fn.Either[*htlcswitch.ChanClose, channeldb.ShutdownInfo]) { @@ -3957,15 +3987,15 @@ func newRPCShutdownInit(req *htlcswitch.ChanClose) shutdownInit { // // TODO(roasbeef): just accept the two shutdown pointer params instead?? func (p *Brontide) startRbfChanCloser(shutdown shutdownInit, - channel *lnwallet.LightningChannel) error { + chanPoint wire.OutPoint) error { // Unlike the old negotiate chan closer, we'll always create the RBF // chan closer on startup, so we can skip init here. - chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint()) + chanID := lnwire.NewChanIDFromOutPoint(chanPoint) chanCloser, found := p.activeChanCloses.Load(chanID) if !found { return fmt.Errorf("rbf can closer not found for channel %v", - channel.ChannelPoint()) + chanPoint) } defaultFeePerKw, err := shutdownStartFeeRate( @@ -3981,32 +4011,76 @@ func (p *Brontide) startRbfChanCloser(shutdown shutdownInit, chanCloser.WhenRight(func(rbfCloser *chancloser.RbfChanCloser) { peerLog.Infof("ChannelPoint(%v): rbf-coop close requested, "+ - "sending shutdown", channel.ChannelPoint()) + "sending shutdown", chanPoint) - // With the chan closer created, we'll now kick off the co-op - // close process by instructing it to send a shutdown message - // to the remote party. - ctx := context.Background() - rbfCloser.SendEvent(ctx, &chancloser.SendShutdown{ - IdealFeeRate: defaultFeePerKw.FeePerVByte(), - DeliveryAddr: shutdownStartAddr(shutdown), - }) + rbfState, err := rbfCloser.CurrentState() + if err != nil { + peerLog.Warnf("ChannelPoint(%v): unable to get "+ + "current state for rbf-coop close: %v", + chanPoint, err) + + return + } + + coopCloseStates := rbfCloser.RegisterStateEvents() + + // Before we send our event below, we'll launch a goroutine to + // watch for the final terminal state to send updates to the RPC + // client. We only need to do this if there's an RPC caller. + var rpcShutdown bool + whenRPCShutdown(shutdown, func(req *htlcswitch.ChanClose) { + rpcShutdown = true - // Now that the channel is active, we'll launch a goroutine to - // watch for the final terminal state to send updates to the - // RPC client. We only need to do this if there's an RPC - // caller. - // - // TODO(roasbeef): make into a do once? otherwise new one for - // each RBF loop. - whenRpcShutdown(shutdown, func(req *htlcswitch.ChanClose) { p.cg.WgAdd(1) go func() { defer p.cg.WgDone() - p.observeRbfCloseUpdates(rbfCloser, req) + p.observeRbfCloseUpdates( + rbfCloser, req, coopCloseStates, + ) }() }) + + if !rpcShutdown { + defer rbfCloser.RemoveStateSub(coopCloseStates) + } + + ctx, _ := p.cg.Create(context.Background()) + feeRate := defaultFeePerKw.FeePerVByte() + + // Depending on the state of the state machine, we'll either + // kick things off by sending shutdown, or attempt to send a new + // offer to the remote party. + switch rbfState.(type) { + // The channel is still active, so we'll now kick off the co-op + // close process by instructing it to send a shutdown message to + // the remote party. + case *chancloser.ChannelActive: + rbfCloser.SendEvent( + context.Background(), + &chancloser.SendShutdown{ + IdealFeeRate: feeRate, + DeliveryAddr: shutdownStartAddr( + shutdown, + ), + }, + ) + + // If we haven't yet sent an offer (didn't have enough funds at + // the prior fee rate), or we've sent an offer, then we'll + // trigger a new offer event. + case *chancloser.ClosingNegotiation: + event := chancloser.ProtocolEvent( + &chancloser.SendOfferEvent{ + TargetFeeRate: feeRate, + }, + ) + rbfCloser.SendEvent(ctx, event) + + default: + peerLog.Warnf("ChannelPoint(%v): unexpected state "+ + "for rbf-coop close: %T", chanPoint, rbfState) + } }) return nil @@ -4042,7 +4116,7 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) { // transaction w/ a higher fee rate. case p.rbfCoopCloseAllowed(): err = p.startRbfChanCloser( - newRPCShutdownInit(req), channel, + newRPCShutdownInit(req), channel.ChannelPoint(), ) default: err = p.initNegotiateChanCloser(req, channel)