From 3bae7f32cd61ef5ee111acdbbb06671f6975f2ae Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 2 Jan 2024 17:30:20 -0800 Subject: [PATCH 01/12] protofsm: add new package for driving generic protocol FSMs In this PR, we create a new package, `protofsm` which is intended to abstract away from something we've done dozens of time in the daemon: create a new event-drive protocol FSM. One example of this is the co-op close state machine, and also the channel state machine itself. This packages picks out the common themes of: * clear states and transitions between them * calling out to special daemon adapters for I/O such as transaction broadcast or sending a message to a peer * cleaning up after state machine execution * notifying relevant callers of updates to the state machine The goal of this PR, is that devs can now implement a state machine based off of this primary interface: ```go // State defines an abstract state along, namely its state transition function // that takes as input an event and an environment, and returns a state // transition (next state, and set of events to emit). As state can also either // be terminal, or not, a terminal event causes state execution to halt. type State[Event any, Env Environment] interface { // ProcessEvent takes an event and an environment, and returns a new // state transition. This will be iteratively called until either a // terminal state is reached, or no further internal events are // emitted. ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error) // IsTerminal returns true if this state is terminal, and false otherwise. IsTerminal() bool } ``` With their focus being _only_ on each state transition, rather than all the boiler plate involved (processing new events, advancing to completion, doing I/O, etc, etc). Instead, they just make their states, then create the state machine given the starting state and env. The only other custom component needed is something capable of mapping wire messages or other events from the "outside world" into the domain of the state machine. The set of types is based on a pseudo sum type system wherein you declare an interface, make the sole method private, then create other instances based on that interface. This restricts call sites (must pass in that interface) type, and with some tooling, exhaustive matching can also be enforced via a linter. The best way to get a hang of the pattern proposed here is to check out the tests. They make a mock state machine, and then use the new executor to drive it to completion. You'll also get a view of how the code will actually look, with the focus being on the: input event, current state, and output transition (can also emit events to drive itself forward). --- protofsm/daemon_events.go | 66 ++++++ protofsm/log.go | 29 +++ protofsm/state_machine.go | 403 +++++++++++++++++++++++++++++++++ protofsm/state_machine_test.go | 281 +++++++++++++++++++++++ 4 files changed, 779 insertions(+) create mode 100644 protofsm/daemon_events.go create mode 100644 protofsm/log.go create mode 100644 protofsm/state_machine.go create mode 100644 protofsm/state_machine_test.go diff --git a/protofsm/daemon_events.go b/protofsm/daemon_events.go new file mode 100644 index 000000000..39719d65f --- /dev/null +++ b/protofsm/daemon_events.go @@ -0,0 +1,66 @@ +package protofsm + +import ( + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" +) + +// DaemonEvent is a special event that can be emmitted by a state transition +// function. A state machine can use this to perform side effects, such as +// sending a message to a peer, or broadcasting a transaction. +type DaemonEvent interface { + daemonSealed() +} + +// DaemonEventSet is a set of daemon events that can be emitted by a state +// transition. +type DaemonEventSet []DaemonEvent + +// DaemonEvents is a special type constraint that enumerates all the possible +// types of daemon events. +type DaemonEvents interface { + SendMsgEvent[any] | BroadcastTxn +} + +// SendPredicate is a function that returns true if the target message should +// sent. +type SendPredicate = func() bool + +// SendMsgEvent is a special event that can be emitted by a state transition +// that instructs the daemon to send the contained message to the target peer. +type SendMsgEvent[Event any] struct { + // TargetPeer is the peer to send the message to. + TargetPeer btcec.PublicKey + + // Msgs is the set of messages to send to the target peer. + Msgs []lnwire.Message + + // SendWhen implements a system for a conditional send once a special + // send predicate has been met. + // + // TODO(roasbeef): contrast with usage of OnCommitFlush, etc + SendWhen fn.Option[SendPredicate] + + // PostSendEvent is an optional event that is to be emitted after the + // message has been sent. If a SendWhen is specified, then this will + // only be executed after that returns true to unblock the send. + PostSendEvent fn.Option[Event] +} + +// daemonSealed indicates that this struct is a DaemonEvent instance. +func (s *SendMsgEvent[E]) daemonSealed() {} + +// BroadcastTxn indicates the target transaction should be broadcast to the +// network. +type BroadcastTxn struct { + // Tx is the transaction to broadcast. + Tx *wire.MsgTx + + // Label is an optional label to attach to the transaction. + Label string +} + +// daemonSealed indicates that this struct is a DaemonEvent instance. +func (b *BroadcastTxn) daemonSealed() {} diff --git a/protofsm/log.go b/protofsm/log.go new file mode 100644 index 000000000..ea1728ab8 --- /dev/null +++ b/protofsm/log.go @@ -0,0 +1,29 @@ +package protofsm + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("PRCL", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go new file mode 100644 index 000000000..f1a1e3b54 --- /dev/null +++ b/protofsm/state_machine.go @@ -0,0 +1,403 @@ +package protofsm + +import ( + "fmt" + "sync" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" +) + +const ( + // pollInterval is the interval at which we'll poll the SendWhen + // predicate if specified. + pollInterval = time.Millisecond * 100 +) + +// EmittedEvent is a special type that can be emitted by a state transition. +// This can container internal events which are to be routed back to the state, +// or external events which are to be sent to the daemon. +type EmittedEvent[Event any] struct { + // InternalEvent is an optional internal event that is to be routed + // back to the target state. This enables state to trigger one or many + // state transitions without a new external event. + InternalEvent fn.Option[Event] + + // ExternalEvent is an optional external event that is to be sent to + // the daemon for dispatch. Usually, this is some form of I/O. + ExternalEvents fn.Option[DaemonEventSet] +} + +// StateTransition is a state transition type. It denotes the next state to go +// to, and also the set of events to emit. +type StateTransition[Event any, Env Environment] struct { + // NextState is the next state to transition to. + NextState State[Event, Env] + + // NewEvents is the set of events to emit. + NewEvents fn.Option[EmittedEvent[Event]] +} + +// Environment is an abstract interface that represents the environment that +// the state machine will execute using. From the PoV of the main state machine +// executor, we just care about being able to clean up any resources that were +// allocated by the environment. +type Environment interface { + // Name returns the name of the environment. This is used to uniquely + // identify the environment of related state machines. + Name() string +} + +// State defines an abstract state along, namely its state transition function +// that takes as input an event and an environment, and returns a state +// transition (next state, and set of events to emit). As state can also either +// be terminal, or not, a terminal event causes state execution to halt. +type State[Event any, Env Environment] interface { + // ProcessEvent takes an event and an environment, and returns a new + // state transition. This will be iteratively called until either a + // terminal state is reached, or no further internal events are + // emitted. + ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error) + + // IsTerminal returns true if this state is terminal, and false otherwise. + IsTerminal() bool + + // TODO(roasbeef): also add state serialization? +} + +// DaemonAdapters is a set of methods that server as adapters to bridge the +// pure world of the FSM to the real world of the daemon. These will be used to +// do things like broadcast transactions, or send messages to peers. +type DaemonAdapters interface { + // SendMessages sends the target set of messages to the target peer. + SendMessages(btcec.PublicKey, []lnwire.Message) error + + // BroadcastTransaction broadcasts a transaction with the target label. + BroadcastTransaction(*wire.MsgTx, string) error +} + +// stateQuery is used by outside callers to query the internal state of the +// state machine. +type stateQuery[Event any, Env Environment] struct { + // CurrentState is a channel that will be sent the current state of the + // state machine. + CurrentState chan State[Event, Env] +} + +// StateMachine represents an abstract FSM that is able to process new incoming +// events and drive a state machine to termination. This implementation uses +// type params to abstract over the types of events and environment. Events +// trigger new state transitions, that use the environment to perform some +// action. +// +// TODO(roasbeef): terminal check, daemon event execution, init? +type StateMachine[Event any, Env Environment] struct { + currentState State[Event, Env] + env Env + + daemon DaemonAdapters + + events chan Event + + quit chan struct{} + wg sync.WaitGroup + + // newStateEvents is an EventDistributor that will be used to notify + // any relevant callers of new state transitions that occur. + newStateEvents *fn.EventDistributor[State[Event, Env]] + + stateQuery chan stateQuery[Event, Env] + + startOnce sync.Once + stopOnce sync.Once + + // TODO(roasbeef): also use that context guard here? +} + +// NewStateMachine creates a new state machine given a set of daemon adapters, +// an initial state, and an environment. +func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters, + initialState State[Event, Env], + env Env) StateMachine[Event, Env] { + + return StateMachine[Event, Env]{ + daemon: adapters, + events: make(chan Event, 1), + currentState: initialState, + stateQuery: make(chan stateQuery[Event, Env]), + quit: make(chan struct{}), + env: env, + newStateEvents: fn.NewEventDistributor[State[Event, Env]](), + } +} + +// Start starts the state machine. This will spawn a goroutine that will drive +// the state machine to completion. +func (s *StateMachine[Event, Env]) Start() { + s.startOnce.Do(func() { + s.wg.Add(1) + go s.driveMachine() + }) +} + +// Stop stops the state machine. This will block until the state machine has +// reached a stopping point. +func (s *StateMachine[Event, Env]) Stop() { + s.stopOnce.Do(func() { + close(s.quit) + s.wg.Wait() + }) +} + +// SendEvent sends a new event to the state machine. +// +// TODO(roasbeef): bool if processed? +func (s *StateMachine[Event, Env]) SendEvent(event Event) { + select { + case s.events <- event: + case <-s.quit: + return + } +} + +// CurrentState returns the current state of the state machine. +func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) { + query := stateQuery[Event, Env]{ + CurrentState: make(chan State[Event, Env], 1), + } + + if !fn.SendOrQuit(s.stateQuery, query, s.quit) { + return nil, fmt.Errorf("state machine is shutting down") + } + + return fn.RecvOrTimeout(query.CurrentState, time.Second) +} + +// StateSubscriber represents an active subscription to be notified of new +// state transitions. +type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]] + +// RegisterStateEvents registers a new event listener that will be notified of +// new state transitions. +func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, Env] { + subscriber := fn.NewEventReceiver[State[Event, Env]](10) + + // TODO(roasbeef): instead give the state and the input event? + + s.newStateEvents.RegisterSubscriber(subscriber) + + return subscriber +} + +// RemoveStateSub removes the target state subscriber from the set of active +// subscribers. +func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env]) { + s.newStateEvents.RemoveSubscriber(sub) +} + +// executeDaemonEvent executes a daemon event, which is a special type of event +// that can be emitted as part of the state transition function of the state +// machine. An error is returned if the type of event is unknown. +func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { + switch daemonEvent := event.(type) { + + // This is a send message event, so we'll send the event, and also mind + // any preconditions as well as post-send events. + case *SendMsgEvent[Event]: + sendAndCleanUp := func() error { + err := s.daemon.SendMessages( + daemonEvent.TargetPeer, daemonEvent.Msgs, + ) + if err != nil { + return fmt.Errorf("unable to send msgs: %w", err) + } + + // If a post-send event was specified, then we'll + // funnel that back into the main state machine now as + // well. + daemonEvent.PostSendEvent.WhenSome(func(event Event) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + + s.SendEvent(event) + }() + }) + + return nil + } + + // If this doesn't have a SendWhen predicate, then we can just + // send it off right away. + if !daemonEvent.SendWhen.IsSome() { + return sendAndCleanUp() + } + + // Otherwise, this has a SendWhen predicate, so we'll need + // launch a goroutine to poll the SendWhen, then send only once + // the predicate is true. + s.wg.Add(1) + go func() { + defer s.wg.Done() + + predicateTicker := time.NewTicker(pollInterval) + defer predicateTicker.Stop() + + for { + select { + case <-predicateTicker.C: + canSend := fn.MapOptionZ( + daemonEvent.SendWhen, + func(pred SendPredicate) bool { + return pred() + }, + ) + + if canSend { + sendAndCleanUp() + return + } + + case <-s.quit: + return + } + } + }() + + return nil + + // If this is a broadcast transaction event, then we'll broadcast with + // the label attached. + case *BroadcastTxn: + err := s.daemon.BroadcastTransaction( + daemonEvent.Tx, daemonEvent.Label, + ) + if err != nil { + // TODO(roasbeef): hook has channel read event event is + // hit? + return fmt.Errorf("unable to broadcast txn: %w", err) + } + + return nil + } + + return fmt.Errorf("unknown daemon event: %T", event) +} + +// applyEvents applies a new event to the state machine. This will continue +// until no further events are emitted by the state machine. Along the way, +// we'll also ensure to execute any daemon events that are emitted. +func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env], error) { + // TODO(roasbeef): make starting state as part of env? + currentState := s.currentState + + eventQueue := fn.NewQueue(newEvent) + + // Given the next event to handle, we'll process the event, then add + // any new emitted internal events to our event queue. This continues + // until we reach a terminal state, or we run out of internal events to + // process. + for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() { + err := fn.MapOptionZ(nextEvent, func(event Event) error { + // Apply the state transition function of the current + // state given this new event and our existing env. + transition, err := currentState.ProcessEvent( + event, s.env, + ) + if err != nil { + return err + } + + newEvents := transition.NewEvents + err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { + // With the event processed, we'll process any + // new daemon events that were emitted as part + // of this new state transition. + err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error { + for _, dEvent := range dEvents { + err := s.executeDaemonEvent(dEvent) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + // Next, we'll add any new emitted events to + // our event queue. + events.InternalEvent.WhenSome(func(inEvent Event) { + eventQueue.Enqueue(inEvent) + }) + + return nil + }) + if err != nil { + return err + } + + // With our events processed, we'll now update our + // internal state. + currentState = transition.NextState + + // Notify our subscribers of the new state transition. + s.newStateEvents.NotifySubscribers(currentState) + + return nil + }) + if err != nil { + return currentState, err + } + } + + return currentState, nil +} + +// driveMachine is the main event loop of the state machine. It accepts any new +// incoming events, and then drives the state machine forward until it reaches +// a terminal state. +func (s *StateMachine[Event, Env]) driveMachine() { + defer s.wg.Done() + + // TODO(roasbeef): move into env? read only to start with + currentState := s.currentState + + // We just started driving the state machine, so we'll notify our + // subscribers of this starting state. + s.newStateEvents.NotifySubscribers(currentState) + + for { + select { + // We have a new external event, so we'll drive the state + // machine forward until we either run out of internal events, + // or we reach a terminal state. + case newEvent := <-s.events: + newState, err := s.applyEvents(newEvent) + if err != nil { + // TODO(roasbeef): hard error? + log.Errorf("unable to apply event: %v", err) + continue + } + + currentState = newState + + // An outside caller is querying our state, so we'll return the + // latest state. + case stateQuery := <-s.stateQuery: + if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { + return + } + + case <-s.quit: + // TODO(roasbeef): logs, etc + // * something in env? + return + } + } +} diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go new file mode 100644 index 000000000..71760726e --- /dev/null +++ b/protofsm/state_machine_test.go @@ -0,0 +1,281 @@ +package protofsm + +import ( + "encoding/hex" + "fmt" + "sync/atomic" + "testing" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type dummyEvents interface { + dummy() +} + +type goToFin struct { +} + +func (g *goToFin) dummy() { +} + +type emitInternal struct { +} + +func (e *emitInternal) dummy() { +} + +type daemonEvents struct { +} + +func (s *daemonEvents) dummy() { +} + +type dummyEnv struct { + mock.Mock +} + +func (d *dummyEnv) Name() string { + return "test" +} + +type dummyStateStart struct { + canSend *atomic.Bool +} + +var ( + hexDecode = func(keyStr string) []byte { + keyBytes, _ := hex.DecodeString(keyStr) + return keyBytes + } + pub1, _ = btcec.ParsePubKey(hexDecode( + "02ec95e4e8ad994861b95fc5986eedaac24739e5ea3d0634db4c8ccd44cd" + + "a126ea", + )) + pub2, _ = btcec.ParsePubKey(hexDecode( + "0356167ba3e54ac542e86e906d4186aba9ca0b9df45001c62b753d33fe06" + + "f5b4e8", + )) +) + +func (d *dummyStateStart) ProcessEvent(event dummyEvents, env *dummyEnv, +) (*StateTransition[dummyEvents, *dummyEnv], error) { + + switch event.(type) { + case *goToFin: + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateFin{}, + }, nil + + // This state will loop back upon itself, but will also emit an event + // to head to the terminal state. + case *emitInternal: + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateStart{}, + NewEvents: fn.Some(EmittedEvent[dummyEvents]{ + InternalEvent: fn.Some(dummyEvents(&goToFin{})), + }), + }, nil + + // This state will proceed to the terminal state, but will emit all the + // possible daemon events. + case *daemonEvents: + // This send event can only succeed once the bool turns to + // true. After that, then we'll expect another event to take us + // to the final state. + sendEvent := &SendMsgEvent[dummyEvents]{ + TargetPeer: *pub1, + SendWhen: fn.Some(func() bool { + return d.canSend.Load() + }), + PostSendEvent: fn.Some(dummyEvents(&goToFin{})), + } + + // We'll also send out a normal send event that doesn't have + // any preconditions. + sendEvent2 := &SendMsgEvent[dummyEvents]{ + TargetPeer: *pub2, + } + + return &StateTransition[dummyEvents, *dummyEnv]{ + // We'll state in this state until the send succeeds + // based on our predicate. Then it'll transition to the + // final state. + NextState: &dummyStateStart{ + canSend: d.canSend, + }, + NewEvents: fn.Some(EmittedEvent[dummyEvents]{ + ExternalEvents: fn.Some(DaemonEventSet{ + sendEvent, &BroadcastTxn{}, sendEvent2, + }), + }), + }, nil + } + + return nil, fmt.Errorf("unknown event: %T", event) +} + +func (d *dummyStateStart) IsTerminal() bool { + return false +} + +type dummyStateFin struct { +} + +func (d *dummyStateFin) ProcessEvent(event dummyEvents, env *dummyEnv, +) (*StateTransition[dummyEvents, *dummyEnv], error) { + + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateFin{}, + }, nil +} + +func (d *dummyStateFin) IsTerminal() bool { + return true +} + +func assertState[Event any, Env Environment](t *testing.T, + m *StateMachine[Event, Env], expectedState State[Event, Env]) { + + state, err := m.CurrentState() + require.NoError(t, err) + require.IsType(t, expectedState, state) +} + +func assertStateTransitions[Event any, Env Environment]( + t *testing.T, stateSub StateSubscriber[Event, Env], + expectedStates []State[Event, Env]) { + + for _, expectedState := range expectedStates { + newState := <-stateSub.NewItemCreated.ChanOut() + + require.IsType(t, expectedState, newState) + } +} + +type dummyAdapters struct { + mock.Mock +} + +func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error { + args := d.Called(pub, msgs) + + return args.Error(0) +} + +func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error { + args := d.Called(tx, label) + + return args.Error(0) +} + +// TestStateMachineInternalEvents tests that the state machine is able to add +// new internal events to the event queue for further processing during a state +// transition. +func TestStateMachineInternalEvents(t *testing.T) { + t.Parallel() + + // First, we'll create our state machine given the env, and our + // starting state. + env := &dummyEnv{} + startingState := &dummyStateStart{} + + adapters := &dummyAdapters{} + + stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( + adapters, startingState, env, + ) + stateMachine.Start() + defer stateMachine.Stop() + + // As we're triggering internal events, we'll also subscribe to the set + // of new states so we can assert as we go. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + // For this transition, we'll send in the emitInternal event, which'll + // send us back to the starting event, but emit an internal event. + stateMachine.SendEvent(&emitInternal{}) + + // We'll now also assert the path we took to get here to ensure the + // internal events were processed. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateStart{}, &dummyStateStart{}, &dummyStateFin{}, + } + assertStateTransitions( + t, stateSub, expectedStates, + ) + + // We should ultimately end up in the terminal state. + assertState[dummyEvents, *dummyEnv](t, &stateMachine, &dummyStateFin{}) + + // Make sure all the env expectations were met. + env.AssertExpectations(t) +} + +// TestStateMachineDaemonEvents tests that the state machine is able to process +// daemon emitted as part of the state transition process. +func TestStateMachineDaemonEvents(t *testing.T) { + t.Parallel() + + // First, we'll create our state machine given the env, and our + // starting state. + env := &dummyEnv{} + + var boolTrigger atomic.Bool + startingState := &dummyStateStart{ + canSend: &boolTrigger, + } + + adapters := &dummyAdapters{} + + stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( + adapters, startingState, env, + ) + stateMachine.Start() + defer stateMachine.Stop() + + // As we're triggering internal events, we'll also subscribe to the set + // of new states so we can assert as we go. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + // As soon as we send in the daemon event, we expect the + // disable+broadcast events to be processed, as they are unconditional. + adapters.On("BroadcastTransaction", mock.Anything, mock.Anything).Return(nil) + adapters.On("SendMessages", *pub2, mock.Anything).Return(nil) + + // We'll start off by sending in the daemon event, which'll trigger the + // state machine to execute the series of daemon events. + stateMachine.SendEvent(&daemonEvents{}) + + // We should transition back to the starting state now, after we + // started from the very same state. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateStart{}, &dummyStateStart{}, + } + assertStateTransitions(t, stateSub, expectedStates) + + // At this point, we expect that the two methods above were called. + adapters.AssertExpectations(t) + + // However, we don't expect the SendMessages for the first peer target + // to be called yet, as the condition hasn't yet been met. + adapters.AssertNotCalled(t, "SendMessages", *pub1) + + // We'll now flip the bool to true, which should cause the SendMessages + // method to be called, and for us to transition to the final state. + boolTrigger.Store(true) + adapters.On("SendMessages", *pub1, mock.Anything).Return(nil) + + expectedStates = []State[dummyEvents, *dummyEnv]{&dummyStateFin{}} + assertStateTransitions(t, stateSub, expectedStates) + + adapters.AssertExpectations(t) + env.AssertExpectations(t) +} From 7f69ceb2d43bd9aca79354eb96608e0a891db126 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 23 Jan 2024 18:54:46 -0800 Subject: [PATCH 02/12] protofsm: add daemon events for spend+conf registration --- protofsm/daemon_events.go | 54 ++++++++++++++++++- protofsm/state_machine.go | 96 ++++++++++++++++++++++++++++++++++ protofsm/state_machine_test.go | 37 +++++++++++++ 3 files changed, 186 insertions(+), 1 deletion(-) diff --git a/protofsm/daemon_events.go b/protofsm/daemon_events.go index 39719d65f..5a269c7f1 100644 --- a/protofsm/daemon_events.go +++ b/protofsm/daemon_events.go @@ -2,6 +2,7 @@ package protofsm import ( "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" @@ -21,7 +22,8 @@ type DaemonEventSet []DaemonEvent // DaemonEvents is a special type constraint that enumerates all the possible // types of daemon events. type DaemonEvents interface { - SendMsgEvent[any] | BroadcastTxn + SendMsgEvent[any] | BroadcastTxn | RegisterSpend[any] | + RegisterConf[any] } // SendPredicate is a function that returns true if the target message should @@ -64,3 +66,53 @@ type BroadcastTxn struct { // daemonSealed indicates that this struct is a DaemonEvent instance. func (b *BroadcastTxn) daemonSealed() {} + +// RegisterSpend is used to request that a certain event is sent into the state +// machien once the specified outpoint has been spent. +type RegisterSpend[Event any] struct { + // OutPoint is the outpoint on chain to watch. + OutPoint wire.OutPoint + + // PkScript is the script that we expect to be spent along with the + // outpoint. + PkScript []byte + + // HeightHint is a value used to give the chain scanner a hint on how + // far back it needs to start its search. + HeightHint uint32 + + // PostSpendEvent is an event that's sent back to the requester once a + // transaction spending the outpoint has been confirmed in the main + // chain. + PostSpendEvent fn.Option[Event] +} + +// daemonSealed indicates that this struct is a DaemonEvent instance. +func (r *RegisterSpend[E]) daemonSealed() {} + +// RegisterConf is used to request that a certain event is sent into the state +// machien once the specified outpoint has been spent. +type RegisterConf[Event any] struct { + // Txid is the txid of the txn we want to watch the chain for. + Txid chainhash.Hash + + // PkScript is the script that we expect to be created along with the + // outpoint. + PkScript []byte + + // HeightHint is a value used to give the chain scanner a hint on how + // far back it needs to start its search. + HeightHint uint32 + + // NumConfs is the number of confirmations that the spending + // transaction needs to dispatch an event. + NumConfs fn.Option[uint32] + + // PostConfEvent is an event that's sent back to the requester once the + // transaction specified above has confirmed in the chain with + // sufficient depth. + PostConfEvent fn.Option[Event] +} + +// daemonSealed indicates that this struct is a DaemonEvent instance. +func (r *RegisterConf[E]) daemonSealed() {} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index f1a1e3b54..e25d82515 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -6,7 +6,9 @@ import ( "time" "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" ) @@ -77,6 +79,28 @@ type DaemonAdapters interface { // BroadcastTransaction broadcasts a transaction with the target label. BroadcastTransaction(*wire.MsgTx, string) error + + // RegisterConfirmationsNtfn registers an intent to be notified once + // txid reaches numConfs confirmations. We also pass in the pkScript as + // the default light client instead needs to match on scripts created + // in the block. If a nil txid is passed in, then not only should we + // match on the script, but we should also dispatch once the + // transaction containing the script reaches numConfs confirmations. + // This can be useful in instances where we only know the script in + // advance, but not the transaction containing it. + // + // TODO(roasbeef): could abstract further? + RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, + numConfs, heightHint uint32, + opts ...chainntnfs.NotifierOption, + ) (*chainntnfs.ConfirmationEvent, error) + + // RegisterSpendNtfn registers an intent to be notified once the target + // outpoint is successfully spent within a transaction. The script that + // the outpoint creates must also be specified. This allows this + // interface to be implemented by BIP 158-like filtering. + RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, + heightHint uint32) (*chainntnfs.SpendEvent, error) } // stateQuery is used by outside callers to query the internal state of the @@ -282,6 +306,78 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { } return nil + + // The state machine has requested a new event to be sent once a + // transaction spending a specified outpoint has confirmed. + case *RegisterSpend[Event]: + spendEvent, err := s.daemon.RegisterSpendNtfn( + &daemonEvent.OutPoint, daemonEvent.PkScript, + daemonEvent.HeightHint, + ) + if err != nil { + return fmt.Errorf("unable to register spend: %w", err) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-spendEvent.Spend: + // If there's a post-send event, then + // we'll send that into the current + // state now. + postSpend := daemonEvent.PostSpendEvent + postSpend.WhenSome(func(e Event) { + s.SendEvent(e) + }) + + return + + case <-s.quit: + return + } + } + }() + + return nil + + // The state machine has requested a new event to be sent once a + // specified txid+pkScript pair has confirmed. + case *RegisterConf[Event]: + numConfs := daemonEvent.NumConfs.UnwrapOr(1) + confEvent, err := s.daemon.RegisterConfirmationsNtfn( + &daemonEvent.Txid, daemonEvent.PkScript, + numConfs, daemonEvent.HeightHint, + ) + if err != nil { + return fmt.Errorf("unable to register conf: %w", err) + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + select { + case <-confEvent.Confirmed: + // If there's a post-conf event, then + // we'll send that into the current + // state now. + // + // TODO(roasbeef): refactor to + // dispatchAfterRecv w/ above + postConf := daemonEvent.PostConfEvent + postConf.WhenSome(func(e Event) { + s.SendEvent(e) + }) + + return + + case <-s.quit: + return + } + } + }() } return fmt.Errorf("unknown daemon event: %T", event) diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index 71760726e..17a5a32c4 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/mock" @@ -160,6 +162,16 @@ func assertStateTransitions[Event any, Env Environment]( type dummyAdapters struct { mock.Mock + + confChan chan *chainntnfs.TxConfirmation + spendChan chan *chainntnfs.SpendDetail +} + +func newDaemonAdapters() *dummyAdapters { + return &dummyAdapters{ + confChan: make(chan *chainntnfs.TxConfirmation, 1), + spendChan: make(chan *chainntnfs.SpendDetail, 1), + } } func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error { @@ -174,6 +186,31 @@ func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error return args.Error(0) } +func (d *dummyAdapters) RegisterConfirmationsNtfn(txid *chainhash.Hash, + pkScript []byte, numConfs, heightHint uint32, + opts ...chainntnfs.NotifierOption, +) (*chainntnfs.ConfirmationEvent, error) { + + args := d.Called(txid, pkScript, numConfs) + + err := args.Error(0) + return &chainntnfs.ConfirmationEvent{ + Confirmed: d.confChan, + }, err +} + +func (d *dummyAdapters) RegisterSpendNtfn(outpoint *wire.OutPoint, + pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + + args := d.Called(outpoint, pkScript, heightHint) + + err := args.Error(0) + + return &chainntnfs.SpendEvent{ + Spend: d.spendChan, + }, err +} + // TestStateMachineInternalEvents tests that the state machine is able to add // new internal events to the event queue for further processing during a state // transition. From d17e7375582a3c85f771ac1b1821e0bd29d72a7d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 23 Jan 2024 18:57:00 -0800 Subject: [PATCH 03/12] protofsm: add optional daemon event on init In this commit, we add an optional daemon event that can be specified to dispatch during init. This is useful for instances where before we start, we want to make sure we have a registered spend/conf notification before normal operation starts. We also add new unit tests to cover this, and the prior spend/conf event additions. --- protofsm/state_machine.go | 24 +++++++++++++-- protofsm/state_machine_test.go | 54 +++++++++++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index e25d82515..9c4db8dc1 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -135,6 +135,8 @@ type StateMachine[Event any, Env Environment] struct { stateQuery chan stateQuery[Event, Env] + initEvent fn.Option[DaemonEvent] + startOnce sync.Once stopOnce sync.Once @@ -142,10 +144,12 @@ type StateMachine[Event any, Env Environment] struct { } // NewStateMachine creates a new state machine given a set of daemon adapters, -// an initial state, and an environment. +// an initial state, an environment, and an event to process as if emitted at +// the onset of the state machine. Such an event can be used to set up tracking +// state such as a txid confirmation event. func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters, - initialState State[Event, Env], - env Env) StateMachine[Event, Env] { + initialState State[Event, Env], env Env, + initEvent fn.Option[DaemonEvent]) StateMachine[Event, Env] { return StateMachine[Event, Env]{ daemon: adapters, @@ -154,6 +158,7 @@ func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters, stateQuery: make(chan stateQuery[Event, Env]), quit: make(chan struct{}), env: env, + initEvent: initEvent, newStateEvents: fn.NewEventDistributor[State[Event, Env]](), } } @@ -443,6 +448,9 @@ func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env currentState = transition.NextState // Notify our subscribers of the new state transition. + // + // TODO(roasbeef): will only give us the outer state? + // * let FSMs choose which state to emit? s.newStateEvents.NotifySubscribers(currentState) return nil @@ -464,6 +472,16 @@ func (s *StateMachine[Event, Env]) driveMachine() { // TODO(roasbeef): move into env? read only to start with currentState := s.currentState + // Before we start, if we have an init daemon event specified, then + // we'll handle that now. + err := fn.MapOptionZ(s.initEvent, func(event DaemonEvent) error { + return s.executeDaemonEvent(event) + }) + if err != nil { + log.Errorf("unable to execute init event: %w", err) + return + } + // We just started driving the state machine, so we'll notify our // subscribers of this starting state. s.newStateEvents.NotifySubscribers(currentState) diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index 17a5a32c4..da765574e 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -211,6 +211,52 @@ func (d *dummyAdapters) RegisterSpendNtfn(outpoint *wire.OutPoint, }, err } +// TestStateMachineOnInitDaemonEvent tests that the state machine will properly +// execute any init-level daemon events passed into it. +func TestStateMachineOnInitDaemonEvent(t *testing.T) { + // First, we'll create our state machine given the env, and our + // starting state. + env := &dummyEnv{} + startingState := &dummyStateStart{} + + adapters := newDaemonAdapters() + + // We'll make an init event that'll send to a peer, then transition us + // to our terminal state. + initEvent := &SendMsgEvent[dummyEvents]{ + TargetPeer: *pub1, + PostSendEvent: fn.Some(dummyEvents(&goToFin{})), + } + + stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( + adapters, startingState, env, fn.Some[DaemonEvent](initEvent), + ) + + // Before we start up the state machine, we'll assert that the send + // message adapter is called on start up. + adapters.On("SendMessages", *pub1, mock.Anything).Return(nil) + + stateMachine.Start() + defer stateMachine.Stop() + + // As we're triggering internal events, we'll also subscribe to the set + // of new states so we can assert as we go. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + // Assert that we go from the starting state to the final state. The + // state machine should now also be on the final terminal state. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateStart{}, &dummyStateFin{}, + } + assertStateTransitions(t, stateSub, expectedStates) + + // We'll now assert that after the daemon was started, the send message + // adapter was called above as specified in the init event. + adapters.AssertExpectations(t) + env.AssertExpectations(t) +} + // TestStateMachineInternalEvents tests that the state machine is able to add // new internal events to the event queue for further processing during a state // transition. @@ -222,10 +268,10 @@ func TestStateMachineInternalEvents(t *testing.T) { env := &dummyEnv{} startingState := &dummyStateStart{} - adapters := &dummyAdapters{} + adapters := newDaemonAdapters() stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, + adapters, startingState, env, fn.None[DaemonEvent](), ) stateMachine.Start() defer stateMachine.Stop() @@ -269,10 +315,10 @@ func TestStateMachineDaemonEvents(t *testing.T) { canSend: &boolTrigger, } - adapters := &dummyAdapters{} + adapters := newDaemonAdapters() stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, + adapters, startingState, env, fn.None[DaemonEvent](), ) stateMachine.Start() defer stateMachine.Stop() From bf10e31167970c490a516d770f32c62790a01a8c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 25 Jan 2024 16:56:21 -0800 Subject: [PATCH 04/12] protofsm: convert state machine args into config --- protofsm/state_machine.go | 68 ++++++++++++++++++++-------------- protofsm/state_machine_test.go | 30 ++++++++++----- 2 files changed, 61 insertions(+), 37 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 9c4db8dc1..6968e36e9 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -119,46 +119,60 @@ type stateQuery[Event any, Env Environment] struct { // // TODO(roasbeef): terminal check, daemon event execution, init? type StateMachine[Event any, Env Environment] struct { - currentState State[Event, Env] - env Env - - daemon DaemonAdapters + cfg StateMachineCfg[Event, Env] + // events is the channel that will be used to send new events to the + // FSM. events chan Event - quit chan struct{} - wg sync.WaitGroup - // newStateEvents is an EventDistributor that will be used to notify // any relevant callers of new state transitions that occur. newStateEvents *fn.EventDistributor[State[Event, Env]] + // stateQuery is a channel that will be used by outside callers to + // query the internal state machine state. stateQuery chan stateQuery[Event, Env] - initEvent fn.Option[DaemonEvent] - startOnce sync.Once stopOnce sync.Once // TODO(roasbeef): also use that context guard here? + quit chan struct{} + wg sync.WaitGroup +} + +// StateMachineCfg is a configuration struct that's used to create a new state +// machine. +type StateMachineCfg[Event any, Env Environment] struct { + // Daemon is a set of adapters that will be used to bridge the FSM to + // the daemon. + Daemon DaemonAdapters + + // InitialState is the initial state of the state machine. + InitialState State[Event, Env] + + // Env is the environment that the state machine will use to execute. + Env Env + + // InitEvent is an optional event that will be sent to the state + // machine as if it was emitted at the onset of the state machine. This + // can be used to set up tracking state such as a txid confirmation + // event. + InitEvent fn.Option[DaemonEvent] } // NewStateMachine creates a new state machine given a set of daemon adapters, // an initial state, an environment, and an event to process as if emitted at // the onset of the state machine. Such an event can be used to set up tracking // state such as a txid confirmation event. -func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters, - initialState State[Event, Env], env Env, - initEvent fn.Option[DaemonEvent]) StateMachine[Event, Env] { +func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], +) StateMachine[Event, Env] { return StateMachine[Event, Env]{ - daemon: adapters, + cfg: cfg, events: make(chan Event, 1), - currentState: initialState, stateQuery: make(chan stateQuery[Event, Env]), quit: make(chan struct{}), - env: env, - initEvent: initEvent, newStateEvents: fn.NewEventDistributor[State[Event, Env]](), } } @@ -237,7 +251,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // any preconditions as well as post-send events. case *SendMsgEvent[Event]: sendAndCleanUp := func() error { - err := s.daemon.SendMessages( + err := s.cfg.Daemon.SendMessages( daemonEvent.TargetPeer, daemonEvent.Msgs, ) if err != nil { @@ -301,7 +315,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // If this is a broadcast transaction event, then we'll broadcast with // the label attached. case *BroadcastTxn: - err := s.daemon.BroadcastTransaction( + err := s.cfg.Daemon.BroadcastTransaction( daemonEvent.Tx, daemonEvent.Label, ) if err != nil { @@ -315,7 +329,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // The state machine has requested a new event to be sent once a // transaction spending a specified outpoint has confirmed. case *RegisterSpend[Event]: - spendEvent, err := s.daemon.RegisterSpendNtfn( + spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn( &daemonEvent.OutPoint, daemonEvent.PkScript, daemonEvent.HeightHint, ) @@ -351,7 +365,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // specified txid+pkScript pair has confirmed. case *RegisterConf[Event]: numConfs := daemonEvent.NumConfs.UnwrapOr(1) - confEvent, err := s.daemon.RegisterConfirmationsNtfn( + confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn( &daemonEvent.Txid, daemonEvent.PkScript, numConfs, daemonEvent.HeightHint, ) @@ -391,9 +405,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // applyEvents applies a new event to the state machine. This will continue // until no further events are emitted by the state machine. Along the way, // we'll also ensure to execute any daemon events that are emitted. -func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env], error) { - // TODO(roasbeef): make starting state as part of env? - currentState := s.currentState +func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], + newEvent Event) (State[Event, Env], error) { eventQueue := fn.NewQueue(newEvent) @@ -406,7 +419,7 @@ func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env // Apply the state transition function of the current // state given this new event and our existing env. transition, err := currentState.ProcessEvent( - event, s.env, + event, s.cfg.Env, ) if err != nil { return err @@ -469,12 +482,11 @@ func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env func (s *StateMachine[Event, Env]) driveMachine() { defer s.wg.Done() - // TODO(roasbeef): move into env? read only to start with - currentState := s.currentState + currentState := s.cfg.InitialState // Before we start, if we have an init daemon event specified, then // we'll handle that now. - err := fn.MapOptionZ(s.initEvent, func(event DaemonEvent) error { + err := fn.MapOptionZ(s.cfg.InitEvent, func(event DaemonEvent) error { return s.executeDaemonEvent(event) }) if err != nil { @@ -492,7 +504,7 @@ func (s *StateMachine[Event, Env]) driveMachine() { // machine forward until we either run out of internal events, // or we reach a terminal state. case newEvent := <-s.events: - newState, err := s.applyEvents(newEvent) + newState, err := s.applyEvents(currentState, newEvent) if err != nil { // TODO(roasbeef): hard error? log.Errorf("unable to apply event: %v", err) diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index da765574e..82d4431f2 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -228,9 +228,13 @@ func TestStateMachineOnInitDaemonEvent(t *testing.T) { PostSendEvent: fn.Some(dummyEvents(&goToFin{})), } - stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, fn.Some[DaemonEvent](initEvent), - ) + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + InitEvent: fn.Some[DaemonEvent](initEvent), + } + stateMachine := NewStateMachine(cfg) // Before we start up the state machine, we'll assert that the send // message adapter is called on start up. @@ -270,9 +274,13 @@ func TestStateMachineInternalEvents(t *testing.T) { adapters := newDaemonAdapters() - stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, fn.None[DaemonEvent](), - ) + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + InitEvent: fn.None[DaemonEvent](), + } + stateMachine := NewStateMachine(cfg) stateMachine.Start() defer stateMachine.Stop() @@ -317,9 +325,13 @@ func TestStateMachineDaemonEvents(t *testing.T) { adapters := newDaemonAdapters() - stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( - adapters, startingState, env, fn.None[DaemonEvent](), - ) + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + InitEvent: fn.None[DaemonEvent](), + } + stateMachine := NewStateMachine(cfg) stateMachine.Start() defer stateMachine.Stop() From 424ae0963155d416d558346e49fbccbb0254012f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 25 Jan 2024 17:22:00 -0800 Subject: [PATCH 05/12] protofsm: add ability for state machine to consume wire msgs In this commit, we add the ability for the state machine to consume wire messages. This'll allow the creation of a new generic message router that takes the place of the current peer `readHandler` in an upcoming commit. --- protofsm/daemon_events.go | 2 +- protofsm/msg_mapper.go | 15 ++++++ protofsm/state_machine.go | 86 +++++++++++++++++++++++++++++----- protofsm/state_machine_test.go | 80 +++++++++++++++++++++++++++++-- 4 files changed, 166 insertions(+), 17 deletions(-) create mode 100644 protofsm/msg_mapper.go diff --git a/protofsm/daemon_events.go b/protofsm/daemon_events.go index 5a269c7f1..b65adf012 100644 --- a/protofsm/daemon_events.go +++ b/protofsm/daemon_events.go @@ -8,7 +8,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" ) -// DaemonEvent is a special event that can be emmitted by a state transition +// DaemonEvent is a special event that can be emitted by a state transition // function. A state machine can use this to perform side effects, such as // sending a message to a peer, or broadcasting a transaction. type DaemonEvent interface { diff --git a/protofsm/msg_mapper.go b/protofsm/msg_mapper.go new file mode 100644 index 000000000..b96d677e6 --- /dev/null +++ b/protofsm/msg_mapper.go @@ -0,0 +1,15 @@ +package protofsm + +import ( + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" +) + +// MsgMapper is used to map incoming wire messages into a FSM event. This is +// useful to decouple the translation of an outside or wire message into an +// event type that can be understood by the FSM. +type MsgMapper[Event any] interface { + // MapMsg maps a wire message into a FSM event. If the message is not + // mappable, then an None is returned. + MapMsg(msg lnwire.Message) fn.Option[Event] +} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 6968e36e9..0dfa8e73e 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -64,7 +64,8 @@ type State[Event any, Env Environment] interface { // emitted. ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error) - // IsTerminal returns true if this state is terminal, and false otherwise. + // IsTerminal returns true if this state is terminal, and false + // otherwise. IsTerminal() bool // TODO(roasbeef): also add state serialization? @@ -159,13 +160,17 @@ type StateMachineCfg[Event any, Env Environment] struct { // can be used to set up tracking state such as a txid confirmation // event. InitEvent fn.Option[DaemonEvent] + + // MsgMapper is an optional message mapper that can be used to map + // normal wire messages into FSM events. + MsgMapper fn.Option[MsgMapper[Event]] } // NewStateMachine creates a new state machine given a set of daemon adapters, // an initial state, an environment, and an event to process as if emitted at // the onset of the state machine. Such an event can be used to set up tracking // state such as a txid confirmation event. -func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], +func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env], //nolint:lll ) StateMachine[Event, Env] { return StateMachine[Event, Env]{ @@ -206,6 +211,43 @@ func (s *StateMachine[Event, Env]) SendEvent(event Event) { } } +// CanHandle returns true if the target message can be routed to the state +// machine. +func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool { + cfgMapper := s.cfg.MsgMapper + return fn.MapOptionZ(cfgMapper, func(mapper MsgMapper[Event]) bool { + return mapper.MapMsg(msg).IsSome() + }) +} + +// SendMessage attempts to send a wire message to the state machine. If the +// message can be mapped using the default message mapper, then true is +// returned indicating that the message was processed. Otherwise, false is +// returned. +func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool { + // If we have no message mapper, then return false as we can't process + // this message. + if !s.cfg.MsgMapper.IsSome() { + return false + } + + // Otherwise, try to map the message using the default message mapper. + // If we can't extract an event, then we'll return false to indicate + // that the message wasn't processed. + var processed bool + s.cfg.MsgMapper.WhenSome(func(mapper MsgMapper[Event]) { + event := mapper.MapMsg(msg) + + event.WhenSome(func(event Event) { + s.SendEvent(event) + + processed = true + }) + }) + + return processed +} + // CurrentState returns the current state of the state machine. func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) { query := stateQuery[Event, Env]{ @@ -225,7 +267,9 @@ type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]] // RegisterStateEvents registers a new event listener that will be notified of // new state transitions. -func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, Env] { +func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[ + Event, Env] { + subscriber := fn.NewEventReceiver[State[Event, Env]](10) // TODO(roasbeef): instead give the state and the input event? @@ -237,8 +281,10 @@ func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, // RemoveStateSub removes the target state subscriber from the set of active // subscribers. -func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env]) { - s.newStateEvents.RemoveSubscriber(sub) +func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[ + Event, Env]) { + + _ = s.newStateEvents.RemoveSubscriber(sub) } // executeDaemonEvent executes a daemon event, which is a special type of event @@ -246,7 +292,6 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env // machine. An error is returned if the type of event is unknown. func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { switch daemonEvent := event.(type) { - // This is a send message event, so we'll send the event, and also mind // any preconditions as well as post-send events. case *SendMsgEvent[Event]: @@ -255,7 +300,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { daemonEvent.TargetPeer, daemonEvent.Msgs, ) if err != nil { - return fmt.Errorf("unable to send msgs: %w", err) + return fmt.Errorf("unable to send msgs: %w", + err) } // If a post-send event was specified, then we'll @@ -300,7 +346,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { ) if canSend { - sendAndCleanUp() + err := sendAndCleanUp() + if err != nil { + //nolint:lll + log.Errorf("FSM(%v): unable to send message: %v", err) + } + return } @@ -319,8 +370,6 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { daemonEvent.Tx, daemonEvent.Label, ) if err != nil { - // TODO(roasbeef): hook has channel read event event is - // hit? return fmt.Errorf("unable to broadcast txn: %w", err) } @@ -414,6 +463,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // any new emitted internal events to our event queue. This continues // until we reach a terminal state, or we run out of internal events to // process. + // + //nolint:lll for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() { err := fn.MapOptionZ(nextEvent, func(event Event) error { // Apply the state transition function of the current @@ -426,13 +477,17 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], } newEvents := transition.NewEvents - err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { + err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { //nolint:lll // With the event processed, we'll process any // new daemon events that were emitted as part // of this new state transition. + // + //nolint:lll err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error { for _, dEvent := range dEvents { - err := s.executeDaemonEvent(dEvent) + err := s.executeDaemonEvent( + dEvent, + ) if err != nil { return err } @@ -446,6 +501,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // Next, we'll add any new emitted events to // our event queue. + // + //nolint:lll events.InternalEvent.WhenSome(func(inEvent Event) { eventQueue.Enqueue(inEvent) }) @@ -516,7 +573,10 @@ func (s *StateMachine[Event, Env]) driveMachine() { // An outside caller is querying our state, so we'll return the // latest state. case stateQuery := <-s.stateQuery: - if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { + if !fn.SendOrQuit( + stateQuery.CurrentState, currentState, s.quit, + ) { + return } diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index 82d4431f2..bf7026f4e 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -174,13 +174,17 @@ func newDaemonAdapters() *dummyAdapters { } } -func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error { +func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, + msgs []lnwire.Message) error { + args := d.Called(pub, msgs) return args.Error(0) } -func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error { +func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, + label string) error { + args := d.Called(tx, label) return args.Error(0) @@ -194,6 +198,7 @@ func (d *dummyAdapters) RegisterConfirmationsNtfn(txid *chainhash.Hash, args := d.Called(txid, pkScript, numConfs) err := args.Error(0) + return &chainntnfs.ConfirmationEvent{ Confirmed: d.confChan, }, err @@ -342,7 +347,9 @@ func TestStateMachineDaemonEvents(t *testing.T) { // As soon as we send in the daemon event, we expect the // disable+broadcast events to be processed, as they are unconditional. - adapters.On("BroadcastTransaction", mock.Anything, mock.Anything).Return(nil) + adapters.On( + "BroadcastTransaction", mock.Anything, mock.Anything, + ).Return(nil) adapters.On("SendMessages", *pub2, mock.Anything).Return(nil) // We'll start off by sending in the daemon event, which'll trigger the @@ -374,3 +381,70 @@ func TestStateMachineDaemonEvents(t *testing.T) { adapters.AssertExpectations(t) env.AssertExpectations(t) } + +type dummyMsgMapper struct { + mock.Mock +} + +func (d *dummyMsgMapper) MapMsg(wireMsg lnwire.Message) fn.Option[dummyEvents] { + args := d.Called(wireMsg) + + //nolint:forcetypeassert + return args.Get(0).(fn.Option[dummyEvents]) +} + +// TestStateMachineMsgMapper tests that given a message mapper, we can properly +// send in wire messages get mapped to FSM events. +func TestStateMachineMsgMapper(t *testing.T) { + // First, we'll create our state machine given the env, and our + // starting state. + env := &dummyEnv{} + startingState := &dummyStateStart{} + adapters := newDaemonAdapters() + + // We'll also provide a message mapper that only knows how to map a + // single wire message (error). + dummyMapper := &dummyMsgMapper{} + + // The only thing we know how to map is the error message, which'll + // terminate the state machine. + wireError := &lnwire.Error{} + initMsg := &lnwire.Init{} + dummyMapper.On("MapMsg", wireError).Return( + fn.Some(dummyEvents(&goToFin{})), + ) + dummyMapper.On("MapMsg", initMsg).Return(fn.None[dummyEvents]()) + + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + MsgMapper: fn.Some[MsgMapper[dummyEvents]](dummyMapper), + } + stateMachine := NewStateMachine(cfg) + stateMachine.Start() + defer stateMachine.Stop() + + // As we're triggering internal events, we'll also subscribe to the set + // of new states so we can assert as we go. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + // First, we'll verify that the CanHandle method works as expected. + require.True(t, stateMachine.CanHandle(wireError)) + require.False(t, stateMachine.CanHandle(&lnwire.Init{})) + + // Next, we'll attempt to send the wire message into the state machine. + // We should transition to the final state. + require.True(t, stateMachine.SendMessage(wireError)) + + // We should transition to the final state. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateStart{}, &dummyStateFin{}, + } + assertStateTransitions(t, stateSub, expectedStates) + + dummyMapper.AssertExpectations(t) + adapters.AssertExpectations(t) + env.AssertExpectations(t) +} From 44f035330b4a8d993fb365633929314f86d40162 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 25 Jan 2024 17:27:48 -0800 Subject: [PATCH 06/12] protofsm: add a Name() method to the env This'll be used later to uniquely identify state machines for routing/dispatch purposes. --- protofsm/state_machine.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 0dfa8e73e..2a62ac223 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -220,6 +220,11 @@ func (s *StateMachine[Event, Env]) CanHandle(msg lnwire.Message) bool { }) } +// Name returns the name of the state machine's environment. +func (s *StateMachine[Event, Env]) Name() string { + return s.cfg.Env.Name() +} + // SendMessage attempts to send a wire message to the state machine. If the // message can be mapped using the default message mapper, then true is // returned indicating that the message was processed. Otherwise, false is From 96a98bc0716e920388402333af9972400ed826fe Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 5 Feb 2024 19:05:02 -0800 Subject: [PATCH 07/12] protofsm: add logging --- protofsm/state_machine.go | 76 ++++++++++++++++++++++++++++++++-- protofsm/state_machine_test.go | 6 ++- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 2a62ac223..f922f536c 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -8,8 +8,10 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwire" ) @@ -204,6 +206,10 @@ func (s *StateMachine[Event, Env]) Stop() { // // TODO(roasbeef): bool if processed? func (s *StateMachine[Event, Env]) SendEvent(event Event) { + log.Debugf("FSM(%v): sending event: %v", s.cfg.Env.Name(), + lnutils.SpewLogClosure(event), + ) + select { case s.events <- event: case <-s.quit: @@ -236,6 +242,10 @@ func (s *StateMachine[Event, Env]) SendMessage(msg lnwire.Message) bool { return false } + log.Debugf("FSM(%v): sending msg: %v", s.cfg.Env.Name(), + lnutils.SpewLogClosure(msg), + ) + // Otherwise, try to map the message using the default message mapper. // If we can't extract an event, then we'll return false to indicate // that the message wasn't processed. @@ -295,12 +305,20 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[ // executeDaemonEvent executes a daemon event, which is a special type of event // that can be emitted as part of the state transition function of the state // machine. An error is returned if the type of event is unknown. -func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { +func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen + event DaemonEvent) error { + switch daemonEvent := event.(type) { // This is a send message event, so we'll send the event, and also mind // any preconditions as well as post-send events. case *SendMsgEvent[Event]: sendAndCleanUp := func() error { + log.Debugf("FSM(%v): sending message to target(%v): "+ + "%v", s.cfg.Env.Name(), + daemonEvent.TargetPeer.SerializeCompressed(), + lnutils.SpewLogClosure(daemonEvent.Msgs), + ) + err := s.cfg.Daemon.SendMessages( daemonEvent.TargetPeer, daemonEvent.Msgs, ) @@ -317,6 +335,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { go func() { defer s.wg.Done() + log.Debugf("FSM(%v): sending "+ + "post-send event: %v", + s.cfg.Env.Name(), + lnutils.SpewLogClosure(event), + ) + s.SendEvent(event) }() }) @@ -340,6 +364,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { predicateTicker := time.NewTicker(pollInterval) defer predicateTicker.Stop() + log.Infof("FSM(%v): waiting for send predicate to "+ + "be true", s.cfg.Env.Name()) + for { select { case <-predicateTicker.C: @@ -351,6 +378,10 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { ) if canSend { + log.Infof("FSM(%v): send "+ + "active predicate", + s.cfg.Env.Name()) + err := sendAndCleanUp() if err != nil { //nolint:lll @@ -371,6 +402,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // If this is a broadcast transaction event, then we'll broadcast with // the label attached. case *BroadcastTxn: + log.Debugf("FSM(%v): broadcasting txn, txid=%v", + s.cfg.Env.Name(), daemonEvent.Tx.TxHash()) + err := s.cfg.Daemon.BroadcastTransaction( daemonEvent.Tx, daemonEvent.Label, ) @@ -383,6 +417,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // The state machine has requested a new event to be sent once a // transaction spending a specified outpoint has confirmed. case *RegisterSpend[Event]: + log.Debugf("FSM(%v): registering spend: %v", s.cfg.Env.Name(), + daemonEvent.OutPoint) + spendEvent, err := s.cfg.Daemon.RegisterSpendNtfn( &daemonEvent.OutPoint, daemonEvent.PkScript, daemonEvent.HeightHint, @@ -418,6 +455,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { // The state machine has requested a new event to be sent once a // specified txid+pkScript pair has confirmed. case *RegisterConf[Event]: + log.Debugf("FSM(%v): registering conf: %v", s.cfg.Env.Name(), + daemonEvent.Txid) + numConfs := daemonEvent.NumConfs.UnwrapOr(1) confEvent, err := s.cfg.Daemon.RegisterConfirmationsNtfn( &daemonEvent.Txid, daemonEvent.PkScript, @@ -462,6 +502,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], newEvent Event) (State[Event, Env], error) { + log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(), + lnutils.NewLogClosure(func() string { + return spew.Sdump(newEvent) + }), + ) + eventQueue := fn.NewQueue(newEvent) // Given the next event to handle, we'll process the event, then add @@ -472,6 +518,13 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], //nolint:lll for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() { err := fn.MapOptionZ(nextEvent, func(event Event) error { + log.Debugf("FSM(%v): processing event: %v", + s.cfg.Env.Name(), + lnutils.NewLogClosure(func() string { + return spew.Sdump(event) + }), + ) + // Apply the state transition function of the current // state given this new event and our existing env. transition, err := currentState.ProcessEvent( @@ -489,6 +542,10 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // //nolint:lll err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error { + log.Debugf("FSM(%v): processing "+ + "daemon %v daemon events", + s.cfg.Env.Name(), len(dEvents)) + for _, dEvent := range dEvents { err := s.executeDaemonEvent( dEvent, @@ -508,8 +565,19 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // our event queue. // //nolint:lll - events.InternalEvent.WhenSome(func(inEvent Event) { - eventQueue.Enqueue(inEvent) + events.InternalEvent.WhenSome(func(es Event) { + log.Debugf("FSM(%v): adding "+ + "new internal event "+ + "to queue: %v", + s.cfg.Env.Name(), + lnutils.NewLogClosure(func() string { //nolint:lll + return spew.Sdump( //nolint:lll + es, + ) + }), + ) + + eventQueue.Enqueue(es) }) return nil @@ -544,6 +612,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], func (s *StateMachine[Event, Env]) driveMachine() { defer s.wg.Done() + log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name()) + currentState := s.cfg.InitialState // Before we start, if we have an init daemon event specified, then diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index bf7026f4e..0a09af60b 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -113,7 +113,11 @@ func (d *dummyStateStart) ProcessEvent(event dummyEvents, env *dummyEnv, }, NewEvents: fn.Some(EmittedEvent[dummyEvents]{ ExternalEvents: fn.Some(DaemonEventSet{ - sendEvent, &BroadcastTxn{}, sendEvent2, + sendEvent, sendEvent2, + &BroadcastTxn{ + Tx: &wire.MsgTx{}, + Label: "test", + }, }), }), }, nil From 35ea05d5dc8e961b27b57613e36352c8aef1d5b8 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 29 Feb 2024 16:08:03 -0600 Subject: [PATCH 08/12] protofsm: add ErrorReporter interface We'll use this to be able to signal to a caller that a critical error occurred during the state transition. --- protofsm/state_machine.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index f922f536c..2fb661c9c 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -144,9 +144,21 @@ type StateMachine[Event any, Env Environment] struct { wg sync.WaitGroup } +// ErrorReporter is an interface that's used to report errors that occur during +// state machine execution. +type ErrorReporter interface { + // ReportError is a method that's used to report an error that occurred + // during state machine execution. + ReportError(err error) +} + // StateMachineCfg is a configuration struct that's used to create a new state // machine. type StateMachineCfg[Event any, Env Environment] struct { + // ErrorReporter is used to report errors that occur during state + // transitions. + ErrorReporter ErrorReporter + // Daemon is a set of adapters that will be used to bridge the FSM to // the daemon. Daemon DaemonAdapters @@ -586,6 +598,11 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], return err } + log.Infof("FSM(%v): state transition: from_state=%T, "+ + "to_state=%T", + s.cfg.Env.Name(), currentState, + transition.NextState) + // With our events processed, we'll now update our // internal state. currentState = transition.NextState @@ -638,9 +655,15 @@ func (s *StateMachine[Event, Env]) driveMachine() { case newEvent := <-s.events: newState, err := s.applyEvents(currentState, newEvent) if err != nil { - // TODO(roasbeef): hard error? + s.cfg.ErrorReporter.ReportError(err) + log.Errorf("unable to apply event: %v", err) - continue + + // An error occurred, so we'll tear down the + // entire state machine as we can't proceed. + go s.Stop() + + return } currentState = newState From d805c0fc3c1c244c5cee39a40dcc05f270c0dada Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 29 Feb 2024 16:08:54 -0600 Subject: [PATCH 09/12] protofsm: add CustomPollInterval for mocking purposes Adding this makes a state machine easier to unit test, as the caller can specify a custom polling interval. --- protofsm/state_machine.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 2fb661c9c..71cf54dd7 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -178,6 +178,10 @@ type StateMachineCfg[Event any, Env Environment] struct { // MsgMapper is an optional message mapper that can be used to map // normal wire messages into FSM events. MsgMapper fn.Option[MsgMapper[Event]] + + // CustomPollInterval is an optional custom poll interval that can be + // used to set a quicker interval for tests. + CustomPollInterval fn.Option[time.Duration] } // NewStateMachine creates a new state machine given a set of daemon adapters, @@ -373,7 +377,9 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen go func() { defer s.wg.Done() - predicateTicker := time.NewTicker(pollInterval) + predicateTicker := time.NewTicker( + s.cfg.CustomPollInterval.UnwrapOr(pollInterval), + ) defer predicateTicker.Stop() log.Infof("FSM(%v): waiting for send predicate to "+ From 847c1a789d726f9fb31fb7931b4b1cc2b5045d24 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 29 Feb 2024 16:10:48 -0600 Subject: [PATCH 10/12] protofsm: add SpendMapper to craft custom spend events In this commit, we add the SpendMapper which allows callers to create custom spent events. Before this commit, the caller would be able to have an event sent to them in the case a spend happens, but that event wouldn't have any of the relevant spend details. With this new addition, the caller can specify how to take a generic spend event, and transform it into the state machine specific spend event. --- protofsm/daemon_events.go | 14 +++++++++----- protofsm/state_machine.go | 7 ++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/protofsm/daemon_events.go b/protofsm/daemon_events.go index b65adf012..e5de0b695 100644 --- a/protofsm/daemon_events.go +++ b/protofsm/daemon_events.go @@ -4,6 +4,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" ) @@ -67,8 +68,12 @@ type BroadcastTxn struct { // daemonSealed indicates that this struct is a DaemonEvent instance. func (b *BroadcastTxn) daemonSealed() {} +// SpendMapper is a function that's used to map a spend notification to a +// custom state machine event. +type SpendMapper[Event any] func(*chainntnfs.SpendDetail) Event + // RegisterSpend is used to request that a certain event is sent into the state -// machien once the specified outpoint has been spent. +// machine once the specified outpoint has been spent. type RegisterSpend[Event any] struct { // OutPoint is the outpoint on chain to watch. OutPoint wire.OutPoint @@ -81,10 +86,9 @@ type RegisterSpend[Event any] struct { // far back it needs to start its search. HeightHint uint32 - // PostSpendEvent is an event that's sent back to the requester once a - // transaction spending the outpoint has been confirmed in the main - // chain. - PostSpendEvent fn.Option[Event] + // PostSpendEvent is a special spend mapper, that if present, will be + // used to map the protofsm spend event to a custom event. + PostSpendEvent fn.Option[SpendMapper[Event]] } // daemonSealed indicates that this struct is a DaemonEvent instance. diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 71cf54dd7..eb967f2a2 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -451,13 +451,14 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen defer s.wg.Done() for { select { - case <-spendEvent.Spend: + case spend := <-spendEvent.Spend: // If there's a post-send event, then // we'll send that into the current // state now. postSpend := daemonEvent.PostSpendEvent - postSpend.WhenSome(func(e Event) { - s.SendEvent(e) + postSpend.WhenSome(func(f SpendMapper[Event]) { //nolint:lll + customEvent := f(spend) + s.SendEvent(customEvent) }) return From 6de0615cd57d5eb417389c9c02147c03cbc2f69d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 4 Mar 2024 23:49:11 -0600 Subject: [PATCH 11/12] protofsm: allow multiple internal events to be emitted In this commit, we update the execution logic to allow multiple internal events to be emitted. This is useful to handle potential out of order state transitions, as they can be cached, then emitted once the relevant pre-conditions have been met. --- protofsm/log.go | 2 +- protofsm/state_machine.go | 44 ++++++++++++++++------------------ protofsm/state_machine_test.go | 4 +++- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/protofsm/log.go b/protofsm/log.go index ea1728ab8..8ff9c1b62 100644 --- a/protofsm/log.go +++ b/protofsm/log.go @@ -12,7 +12,7 @@ var log btclog.Logger // The default amount of logging is none. func init() { - UseLogger(build.NewSubLogger("PRCL", nil)) + UseLogger(build.NewSubLogger("PFSM", nil)) } // DisableLog disables all library log output. Logging output is disabled diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index eb967f2a2..b583d4a8d 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -8,7 +8,6 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnutils" @@ -28,7 +27,7 @@ type EmittedEvent[Event any] struct { // InternalEvent is an optional internal event that is to be routed // back to the target state. This enables state to trigger one or many // state transitions without a new external event. - InternalEvent fn.Option[Event] + InternalEvent fn.Option[[]Event] // ExternalEvent is an optional external event that is to be sent to // the daemon for dispatch. Usually, this is some form of I/O. @@ -329,7 +328,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen // any preconditions as well as post-send events. case *SendMsgEvent[Event]: sendAndCleanUp := func() error { - log.Debugf("FSM(%v): sending message to target(%v): "+ + log.Debugf("FSM(%v): sending message to target(%x): "+ "%v", s.cfg.Env.Name(), daemonEvent.TargetPeer.SerializeCompressed(), lnutils.SpewLogClosure(daemonEvent.Msgs), @@ -451,7 +450,11 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen defer s.wg.Done() for { select { - case spend := <-spendEvent.Spend: + case spend, ok := <-spendEvent.Spend: + if !ok { + return + } + // If there's a post-send event, then // we'll send that into the current // state now. @@ -522,11 +525,8 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], newEvent Event) (State[Event, Env], error) { log.Debugf("FSM(%v): applying new event", s.cfg.Env.Name(), - lnutils.NewLogClosure(func() string { - return spew.Sdump(newEvent) - }), + lnutils.SpewLogClosure(newEvent), ) - eventQueue := fn.NewQueue(newEvent) // Given the next event to handle, we'll process the event, then add @@ -539,9 +539,7 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], err := fn.MapOptionZ(nextEvent, func(event Event) error { log.Debugf("FSM(%v): processing event: %v", s.cfg.Env.Name(), - lnutils.NewLogClosure(func() string { - return spew.Sdump(event) - }), + lnutils.SpewLogClosure(event), ) // Apply the state transition function of the current @@ -584,19 +582,19 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // our event queue. // //nolint:lll - events.InternalEvent.WhenSome(func(es Event) { - log.Debugf("FSM(%v): adding "+ - "new internal event "+ - "to queue: %v", - s.cfg.Env.Name(), - lnutils.NewLogClosure(func() string { //nolint:lll - return spew.Sdump( //nolint:lll - es, - ) - }), - ) + events.InternalEvent.WhenSome(func(es []Event) { + for _, inEvent := range es { + log.Debugf("FSM(%v): adding "+ + "new internal event "+ + "to queue: %v", + s.cfg.Env.Name(), + lnutils.SpewLogClosure( + inEvent, + ), + ) - eventQueue.Enqueue(es) + eventQueue.Enqueue(inEvent) + } }) return nil diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index 0a09af60b..0432f386b 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -80,7 +80,9 @@ func (d *dummyStateStart) ProcessEvent(event dummyEvents, env *dummyEnv, return &StateTransition[dummyEvents, *dummyEnv]{ NextState: &dummyStateStart{}, NewEvents: fn.Some(EmittedEvent[dummyEvents]{ - InternalEvent: fn.Some(dummyEvents(&goToFin{})), + InternalEvent: fn.Some( + []dummyEvents{&goToFin{}}, + ), }), }, nil From 2e3c0b2a7d45effece44057744bf40e7ab1de428 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 13 Nov 2024 17:10:30 -0800 Subject: [PATCH 12/12] protofsm: use new fn.GoroutineManager to manage goroutines This fixes an isuse that can occur when we have concurrent calls to `Stop` while the state machine is driving forward. --- protofsm/state_machine.go | 76 ++++++++++++++------------------------- 1 file changed, 27 insertions(+), 49 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index b583d4a8d..ecbd74834 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -1,6 +1,7 @@ package protofsm import ( + "context" "fmt" "sync" "time" @@ -135,12 +136,11 @@ type StateMachine[Event any, Env Environment] struct { // query the internal state machine state. stateQuery chan stateQuery[Event, Env] + wg fn.GoroutineManager + quit chan struct{} + startOnce sync.Once stopOnce sync.Once - - // TODO(roasbeef): also use that context guard here? - quit chan struct{} - wg sync.WaitGroup } // ErrorReporter is an interface that's used to report errors that occur during @@ -194,8 +194,9 @@ func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env] cfg: cfg, events: make(chan Event, 1), stateQuery: make(chan stateQuery[Event, Env]), - quit: make(chan struct{}), + wg: *fn.NewGoroutineManager(context.Background()), newStateEvents: fn.NewEventDistributor[State[Event, Env]](), + quit: make(chan struct{}), } } @@ -203,8 +204,9 @@ func NewStateMachine[Event any, Env Environment](cfg StateMachineCfg[Event, Env] // the state machine to completion. func (s *StateMachine[Event, Env]) Start() { s.startOnce.Do(func() { - s.wg.Add(1) - go s.driveMachine() + _ = s.wg.Go(func(ctx context.Context) { + s.driveMachine() + }) }) } @@ -213,7 +215,7 @@ func (s *StateMachine[Event, Env]) Start() { func (s *StateMachine[Event, Env]) Stop() { s.stopOnce.Do(func() { close(s.quit) - s.wg.Wait() + s.wg.Stop() }) } @@ -320,7 +322,7 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[ // executeDaemonEvent executes a daemon event, which is a special type of event // that can be emitted as part of the state transition function of the state // machine. An error is returned if the type of event is unknown. -func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen +func (s *StateMachine[Event, Env]) executeDaemonEvent( event DaemonEvent) error { switch daemonEvent := event.(type) { @@ -342,14 +344,10 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen err) } - // If a post-send event was specified, then we'll - // funnel that back into the main state machine now as - // well. - daemonEvent.PostSendEvent.WhenSome(func(event Event) { - s.wg.Add(1) - go func() { - defer s.wg.Done() - + // If a post-send event was specified, then we'll funnel + // that back into the main state machine now as well. + return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:lll + return s.wg.Go(func(ctx context.Context) { log.Debugf("FSM(%v): sending "+ "post-send event: %v", s.cfg.Env.Name(), @@ -357,10 +355,8 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen ) s.SendEvent(event) - }() + }) }) - - return nil } // If this doesn't have a SendWhen predicate, then we can just @@ -372,10 +368,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen // Otherwise, this has a SendWhen predicate, so we'll need // launch a goroutine to poll the SendWhen, then send only once // the predicate is true. - s.wg.Add(1) - go func() { - defer s.wg.Done() - + return s.wg.Go(func(ctx context.Context) { predicateTicker := time.NewTicker( s.cfg.CustomPollInterval.UnwrapOr(pollInterval), ) @@ -408,13 +401,11 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen return } - case <-s.quit: + case <-ctx.Done(): return } } - }() - - return nil + }) // If this is a broadcast transaction event, then we'll broadcast with // the label attached. @@ -445,9 +436,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen return fmt.Errorf("unable to register spend: %w", err) } - s.wg.Add(1) - go func() { - defer s.wg.Done() + return s.wg.Go(func(ctx context.Context) { for { select { case spend, ok := <-spendEvent.Spend: @@ -466,13 +455,11 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen return - case <-s.quit: + case <-ctx.Done(): return } } - }() - - return nil + }) // The state machine has requested a new event to be sent once a // specified txid+pkScript pair has confirmed. @@ -489,9 +476,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen return fmt.Errorf("unable to register conf: %w", err) } - s.wg.Add(1) - go func() { - defer s.wg.Done() + return s.wg.Go(func(ctx context.Context) { for { select { case <-confEvent.Confirmed: @@ -508,11 +493,11 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent( //nolint:funlen return - case <-s.quit: + case <-ctx.Done(): return } } - }() + }) } return fmt.Errorf("unknown daemon event: %T", event) @@ -632,8 +617,6 @@ func (s *StateMachine[Event, Env]) applyEvents(currentState State[Event, Env], // incoming events, and then drives the state machine forward until it reaches // a terminal state. func (s *StateMachine[Event, Env]) driveMachine() { - defer s.wg.Done() - log.Debugf("FSM(%v): starting state machine", s.cfg.Env.Name()) currentState := s.cfg.InitialState @@ -676,16 +659,11 @@ func (s *StateMachine[Event, Env]) driveMachine() { // An outside caller is querying our state, so we'll return the // latest state. case stateQuery := <-s.stateQuery: - if !fn.SendOrQuit( - stateQuery.CurrentState, currentState, s.quit, - ) { - + if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { //nolint:lll return } - case <-s.quit: - // TODO(roasbeef): logs, etc - // * something in env? + case <-s.wg.Done(): return } }