lnwallet/chancloser: update RBF state machine to handle early offer case

In this commit, we update the RBF state machine to handle early offer
cases. This can happen if after we send out shutdown (to kick things
off), the remote party sends their offer early. This can also happen if
their outgoing shutdown (to ACK ours) was delayed for w/e reason, and we
get their offer first.

The alternative was to modify the state machine itself, but we feel that
handling this early case is better in line with the Robustness principle.
This commit is contained in:
Olaoluwa Osuntokun 2025-02-26 18:04:16 -08:00
parent 333492e657
commit 7370617aaa
3 changed files with 149 additions and 13 deletions

View File

@ -454,6 +454,11 @@ type ShutdownPending struct {
// IdealFeeRate is the ideal fee rate we'd like to use for the closing
// attempt.
IdealFeeRate fn.Option[chainfee.SatPerVByte]
// EarlyRemoteOffer is the offer we received from the remote party
// before we received their shutdown message. We'll stash it to process
// later.
EarlyRemoteOffer fn.Option[OfferReceivedEvent]
}
// String returns the name of the state for ShutdownPending.

View File

@ -251,6 +251,8 @@ func (r *rbfCloserTestHarness) assertNoStateTransitions() {
}
func (r *rbfCloserTestHarness) assertStateTransitions(states ...RbfState) {
r.T.Helper()
assertStateTransitions(r.T, r.stateSub, states)
}
@ -1035,6 +1037,101 @@ func TestRbfShutdownPendingTransitions(t *testing.T) {
closeHarness.assertStateTransitions(&ChannelFlushing{})
})
// If we an early offer from the remote party, then we should stash
// that, transition to the channel flushing state. Once there, another
// self transition should emit the stashed offer.
t.Run("early_remote_offer_shutdown_complete", func(t *testing.T) {
firstState := *startingState
firstState.IdealFeeRate = fn.Some(
chainfee.FeePerKwFloor.FeePerVByte(),
)
firstState.ShutdownScripts = ShutdownScripts{
LocalDeliveryScript: localAddr,
RemoteDeliveryScript: remoteAddr,
}
closeHarness := newCloser(t, &harnessCfg{
initialState: fn.Some[ProtocolState](
&firstState,
),
})
defer closeHarness.stopAndAssert()
// In this case we're doing the shutdown dance for the first
// time, so we'll mark the channel as not being flushed.
closeHarness.expectFinalBalances(fn.None[ShutdownBalances]())
// Before we send the shutdown complete event, we'll send in an
// early offer from the remote party.
closeHarness.chanCloser.SendEvent(ctx, &OfferReceivedEvent{})
// This will cause a self transition back to ShutdownPending.
closeHarness.assertStateTransitions(&ShutdownPending{})
// Next, we'll send in a shutdown complete event.
closeHarness.chanCloser.SendEvent(ctx, &ShutdownComplete{})
// We should transition to the channel flushing state, then the
// self event to have this state cache he early offer should
// follow.
closeHarness.assertStateTransitions(
&ChannelFlushing{}, &ChannelFlushing{},
)
// If we get the current state, we should see that the offer is
// cached.
currentState := assertStateT[*ChannelFlushing](closeHarness)
require.NotNil(t, currentState.EarlyRemoteOffer)
})
// If we an early offer from the remote party, then we should stash
// that, transition to the channel flushing state. Once there, another
// self transition should emit the stashed offer.
t.Run("early_remote_offer_shutdown_received", func(t *testing.T) {
firstState := *startingState
firstState.IdealFeeRate = fn.Some(
chainfee.FeePerKwFloor.FeePerVByte(),
)
firstState.ShutdownScripts = ShutdownScripts{
LocalDeliveryScript: localAddr,
RemoteDeliveryScript: remoteAddr,
}
closeHarness := newCloser(t, &harnessCfg{
initialState: fn.Some[ProtocolState](
&firstState,
),
})
defer closeHarness.stopAndAssert()
// In this case we're doing the shutdown dance for the first
// time, so we'll mark the channel as not being flushed.
closeHarness.expectFinalBalances(fn.None[ShutdownBalances]())
closeHarness.expectIncomingAddsDisabled()
// Before we send the shutdown complete event, we'll send in an
// early offer from the remote party.
closeHarness.chanCloser.SendEvent(ctx, &OfferReceivedEvent{})
// This will cause a self transition back to ShutdownPending.
closeHarness.assertStateTransitions(&ShutdownPending{})
// Next, we'll send in a shutdown complete event.
closeHarness.chanCloser.SendEvent(ctx, &ShutdownReceived{})
// We should transition to the channel flushing state, then the
// self event to have this state cache he early offer should
// follow.
closeHarness.assertStateTransitions(
&ChannelFlushing{}, &ChannelFlushing{},
)
// If we get the current state, we should see that the offer is
// cached.
currentState := assertStateT[*ChannelFlushing](closeHarness)
require.NotNil(t, currentState.EarlyRemoteOffer)
})
// Any other event should be ignored.
assertUnknownEventFail(t, startingState)
}

View File

@ -291,6 +291,22 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
},
}, nil
// The remote party sent an offer early. We'll go to the ChannelFlushing
// case, and then emit the offer as a internal event, which'll be
// handled as an early offer.
case *OfferReceivedEvent:
chancloserLog.Infof("ChannelPoint(%v): got an early offer "+
"in ShutdownPending, emitting as external event",
env.ChanPoint)
s.EarlyRemoteOffer = fn.Some(*msg)
// We'll perform a noop update so we can wait for the actual
// channel flushed event.
return &CloseStateTransition{
NextState: s,
}, nil
// When we receive a shutdown from the remote party, we'll validate the
// shutdown message, then transition to the ChannelFlushing state.
case *ShutdownReceived:
@ -314,7 +330,7 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
// If the channel is *already* flushed, and the close is
// go straight into negotiation, as this is the RBF loop.
// already in progress, then we can skip the flushing state and
var eventsToEmit fn.Option[protofsm.EmittedEvent[ProtocolEvent]]
var eventsToEmit []ProtocolEvent
finalBalances := env.ChanObserver.FinalBalances().UnwrapOr(
unknownBalance,
)
@ -322,11 +338,7 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
channelFlushed := ProtocolEvent(&ChannelFlushed{
ShutdownBalances: finalBalances,
})
eventsToEmit = fn.Some(RbfEvent{
InternalEvent: []ProtocolEvent{
channelFlushed,
},
})
eventsToEmit = append(eventsToEmit, channelFlushed)
}
chancloserLog.Infof("ChannelPoint(%v): disabling incoming adds",
@ -342,6 +354,19 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
chancloserLog.Infof("ChannelPoint(%v): waiting for channel to "+
"be flushed...", env.ChanPoint)
// If we received a remote offer early from the remote party,
// then we'll add that to the set of internal events to emit.
s.EarlyRemoteOffer.WhenSome(func(offer OfferReceivedEvent) {
eventsToEmit = append(eventsToEmit, &offer)
})
var newEvents fn.Option[RbfEvent]
if len(eventsToEmit) > 0 {
newEvents = fn.Some(RbfEvent{
InternalEvent: eventsToEmit,
})
}
// We transition to the ChannelFlushing state, where we await
// the ChannelFlushed event.
return &CloseStateTransition{
@ -352,7 +377,7 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
RemoteDeliveryScript: msg.ShutdownScript, //nolint:ll
},
},
NewEvents: eventsToEmit,
NewEvents: newEvents,
}, nil
// If we get this message, then this means that we were finally able to
@ -365,7 +390,7 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
// If the channel is *already* flushed, and the close is
// already in progress, then we can skip the flushing state and
// go straight into negotiation, as this is the RBF loop.
var eventsToEmit fn.Option[protofsm.EmittedEvent[ProtocolEvent]]
var eventsToEmit []ProtocolEvent
finalBalances := env.ChanObserver.FinalBalances().UnwrapOr(
unknownBalance,
)
@ -373,10 +398,19 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
channelFlushed := ProtocolEvent(&ChannelFlushed{
ShutdownBalances: finalBalances,
})
eventsToEmit = fn.Some(RbfEvent{
InternalEvent: []ProtocolEvent{
channelFlushed,
},
eventsToEmit = append(eventsToEmit, channelFlushed)
}
// If we received a remote offer early from the remote party,
// then we'll add that to the set of internal events to emit.
s.EarlyRemoteOffer.WhenSome(func(offer OfferReceivedEvent) {
eventsToEmit = append(eventsToEmit, &offer)
})
var newEvents fn.Option[RbfEvent]
if len(eventsToEmit) > 0 {
newEvents = fn.Some(RbfEvent{
InternalEvent: eventsToEmit,
})
}
@ -387,7 +421,7 @@ func (s *ShutdownPending) ProcessEvent(event ProtocolEvent, env *Environment,
IdealFeeRate: s.IdealFeeRate,
ShutdownScripts: s.ShutdownScripts,
},
NewEvents: eventsToEmit,
NewEvents: newEvents,
}, nil
// Any other messages in this state will result in an error, as this is