diff --git a/protofsm/daemon_events.go b/protofsm/daemon_events.go new file mode 100644 index 000000000..39719d65f --- /dev/null +++ b/protofsm/daemon_events.go @@ -0,0 +1,66 @@ +package protofsm + +import ( + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" +) + +// DaemonEvent is a special event that can be emmitted by a state transition +// function. A state machine can use this to perform side effects, such as +// sending a message to a peer, or broadcasting a transaction. +type DaemonEvent interface { + daemonSealed() +} + +// DaemonEventSet is a set of daemon events that can be emitted by a state +// transition. +type DaemonEventSet []DaemonEvent + +// DaemonEvents is a special type constraint that enumerates all the possible +// types of daemon events. +type DaemonEvents interface { + SendMsgEvent[any] | BroadcastTxn +} + +// SendPredicate is a function that returns true if the target message should +// sent. +type SendPredicate = func() bool + +// SendMsgEvent is a special event that can be emitted by a state transition +// that instructs the daemon to send the contained message to the target peer. +type SendMsgEvent[Event any] struct { + // TargetPeer is the peer to send the message to. + TargetPeer btcec.PublicKey + + // Msgs is the set of messages to send to the target peer. + Msgs []lnwire.Message + + // SendWhen implements a system for a conditional send once a special + // send predicate has been met. + // + // TODO(roasbeef): contrast with usage of OnCommitFlush, etc + SendWhen fn.Option[SendPredicate] + + // PostSendEvent is an optional event that is to be emitted after the + // message has been sent. If a SendWhen is specified, then this will + // only be executed after that returns true to unblock the send. + PostSendEvent fn.Option[Event] +} + +// daemonSealed indicates that this struct is a DaemonEvent instance. +func (s *SendMsgEvent[E]) daemonSealed() {} + +// BroadcastTxn indicates the target transaction should be broadcast to the +// network. +type BroadcastTxn struct { + // Tx is the transaction to broadcast. + Tx *wire.MsgTx + + // Label is an optional label to attach to the transaction. + Label string +} + +// daemonSealed indicates that this struct is a DaemonEvent instance. +func (b *BroadcastTxn) daemonSealed() {} diff --git a/protofsm/log.go b/protofsm/log.go new file mode 100644 index 000000000..ea1728ab8 --- /dev/null +++ b/protofsm/log.go @@ -0,0 +1,29 @@ +package protofsm + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("PRCL", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go new file mode 100644 index 000000000..f1a1e3b54 --- /dev/null +++ b/protofsm/state_machine.go @@ -0,0 +1,403 @@ +package protofsm + +import ( + "fmt" + "sync" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" +) + +const ( + // pollInterval is the interval at which we'll poll the SendWhen + // predicate if specified. + pollInterval = time.Millisecond * 100 +) + +// EmittedEvent is a special type that can be emitted by a state transition. +// This can container internal events which are to be routed back to the state, +// or external events which are to be sent to the daemon. +type EmittedEvent[Event any] struct { + // InternalEvent is an optional internal event that is to be routed + // back to the target state. This enables state to trigger one or many + // state transitions without a new external event. + InternalEvent fn.Option[Event] + + // ExternalEvent is an optional external event that is to be sent to + // the daemon for dispatch. Usually, this is some form of I/O. + ExternalEvents fn.Option[DaemonEventSet] +} + +// StateTransition is a state transition type. It denotes the next state to go +// to, and also the set of events to emit. +type StateTransition[Event any, Env Environment] struct { + // NextState is the next state to transition to. + NextState State[Event, Env] + + // NewEvents is the set of events to emit. + NewEvents fn.Option[EmittedEvent[Event]] +} + +// Environment is an abstract interface that represents the environment that +// the state machine will execute using. From the PoV of the main state machine +// executor, we just care about being able to clean up any resources that were +// allocated by the environment. +type Environment interface { + // Name returns the name of the environment. This is used to uniquely + // identify the environment of related state machines. + Name() string +} + +// State defines an abstract state along, namely its state transition function +// that takes as input an event and an environment, and returns a state +// transition (next state, and set of events to emit). As state can also either +// be terminal, or not, a terminal event causes state execution to halt. +type State[Event any, Env Environment] interface { + // ProcessEvent takes an event and an environment, and returns a new + // state transition. This will be iteratively called until either a + // terminal state is reached, or no further internal events are + // emitted. + ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error) + + // IsTerminal returns true if this state is terminal, and false otherwise. + IsTerminal() bool + + // TODO(roasbeef): also add state serialization? +} + +// DaemonAdapters is a set of methods that server as adapters to bridge the +// pure world of the FSM to the real world of the daemon. These will be used to +// do things like broadcast transactions, or send messages to peers. +type DaemonAdapters interface { + // SendMessages sends the target set of messages to the target peer. + SendMessages(btcec.PublicKey, []lnwire.Message) error + + // BroadcastTransaction broadcasts a transaction with the target label. + BroadcastTransaction(*wire.MsgTx, string) error +} + +// stateQuery is used by outside callers to query the internal state of the +// state machine. +type stateQuery[Event any, Env Environment] struct { + // CurrentState is a channel that will be sent the current state of the + // state machine. + CurrentState chan State[Event, Env] +} + +// StateMachine represents an abstract FSM that is able to process new incoming +// events and drive a state machine to termination. This implementation uses +// type params to abstract over the types of events and environment. Events +// trigger new state transitions, that use the environment to perform some +// action. +// +// TODO(roasbeef): terminal check, daemon event execution, init? +type StateMachine[Event any, Env Environment] struct { + currentState State[Event, Env] + env Env + + daemon DaemonAdapters + + events chan Event + + quit chan struct{} + wg sync.WaitGroup + + // newStateEvents is an EventDistributor that will be used to notify + // any relevant callers of new state transitions that occur. + newStateEvents *fn.EventDistributor[State[Event, Env]] + + stateQuery chan stateQuery[Event, Env] + + startOnce sync.Once + stopOnce sync.Once + + // TODO(roasbeef): also use that context guard here? +} + +// NewStateMachine creates a new state machine given a set of daemon adapters, +// an initial state, and an environment. +func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters, + initialState State[Event, Env], + env Env) StateMachine[Event, Env] { + + return StateMachine[Event, Env]{ + daemon: adapters, + events: make(chan Event, 1), + currentState: initialState, + stateQuery: make(chan stateQuery[Event, Env]), + quit: make(chan struct{}), + env: env, + newStateEvents: fn.NewEventDistributor[State[Event, Env]](), + } +} + +// Start starts the state machine. This will spawn a goroutine that will drive +// the state machine to completion. +func (s *StateMachine[Event, Env]) Start() { + s.startOnce.Do(func() { + s.wg.Add(1) + go s.driveMachine() + }) +} + +// Stop stops the state machine. This will block until the state machine has +// reached a stopping point. +func (s *StateMachine[Event, Env]) Stop() { + s.stopOnce.Do(func() { + close(s.quit) + s.wg.Wait() + }) +} + +// SendEvent sends a new event to the state machine. +// +// TODO(roasbeef): bool if processed? +func (s *StateMachine[Event, Env]) SendEvent(event Event) { + select { + case s.events <- event: + case <-s.quit: + return + } +} + +// CurrentState returns the current state of the state machine. +func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) { + query := stateQuery[Event, Env]{ + CurrentState: make(chan State[Event, Env], 1), + } + + if !fn.SendOrQuit(s.stateQuery, query, s.quit) { + return nil, fmt.Errorf("state machine is shutting down") + } + + return fn.RecvOrTimeout(query.CurrentState, time.Second) +} + +// StateSubscriber represents an active subscription to be notified of new +// state transitions. +type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]] + +// RegisterStateEvents registers a new event listener that will be notified of +// new state transitions. +func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, Env] { + subscriber := fn.NewEventReceiver[State[Event, Env]](10) + + // TODO(roasbeef): instead give the state and the input event? + + s.newStateEvents.RegisterSubscriber(subscriber) + + return subscriber +} + +// RemoveStateSub removes the target state subscriber from the set of active +// subscribers. +func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env]) { + s.newStateEvents.RemoveSubscriber(sub) +} + +// executeDaemonEvent executes a daemon event, which is a special type of event +// that can be emitted as part of the state transition function of the state +// machine. An error is returned if the type of event is unknown. +func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error { + switch daemonEvent := event.(type) { + + // This is a send message event, so we'll send the event, and also mind + // any preconditions as well as post-send events. + case *SendMsgEvent[Event]: + sendAndCleanUp := func() error { + err := s.daemon.SendMessages( + daemonEvent.TargetPeer, daemonEvent.Msgs, + ) + if err != nil { + return fmt.Errorf("unable to send msgs: %w", err) + } + + // If a post-send event was specified, then we'll + // funnel that back into the main state machine now as + // well. + daemonEvent.PostSendEvent.WhenSome(func(event Event) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + + s.SendEvent(event) + }() + }) + + return nil + } + + // If this doesn't have a SendWhen predicate, then we can just + // send it off right away. + if !daemonEvent.SendWhen.IsSome() { + return sendAndCleanUp() + } + + // Otherwise, this has a SendWhen predicate, so we'll need + // launch a goroutine to poll the SendWhen, then send only once + // the predicate is true. + s.wg.Add(1) + go func() { + defer s.wg.Done() + + predicateTicker := time.NewTicker(pollInterval) + defer predicateTicker.Stop() + + for { + select { + case <-predicateTicker.C: + canSend := fn.MapOptionZ( + daemonEvent.SendWhen, + func(pred SendPredicate) bool { + return pred() + }, + ) + + if canSend { + sendAndCleanUp() + return + } + + case <-s.quit: + return + } + } + }() + + return nil + + // If this is a broadcast transaction event, then we'll broadcast with + // the label attached. + case *BroadcastTxn: + err := s.daemon.BroadcastTransaction( + daemonEvent.Tx, daemonEvent.Label, + ) + if err != nil { + // TODO(roasbeef): hook has channel read event event is + // hit? + return fmt.Errorf("unable to broadcast txn: %w", err) + } + + return nil + } + + return fmt.Errorf("unknown daemon event: %T", event) +} + +// applyEvents applies a new event to the state machine. This will continue +// until no further events are emitted by the state machine. Along the way, +// we'll also ensure to execute any daemon events that are emitted. +func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env], error) { + // TODO(roasbeef): make starting state as part of env? + currentState := s.currentState + + eventQueue := fn.NewQueue(newEvent) + + // Given the next event to handle, we'll process the event, then add + // any new emitted internal events to our event queue. This continues + // until we reach a terminal state, or we run out of internal events to + // process. + for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() { + err := fn.MapOptionZ(nextEvent, func(event Event) error { + // Apply the state transition function of the current + // state given this new event and our existing env. + transition, err := currentState.ProcessEvent( + event, s.env, + ) + if err != nil { + return err + } + + newEvents := transition.NewEvents + err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error { + // With the event processed, we'll process any + // new daemon events that were emitted as part + // of this new state transition. + err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error { + for _, dEvent := range dEvents { + err := s.executeDaemonEvent(dEvent) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + // Next, we'll add any new emitted events to + // our event queue. + events.InternalEvent.WhenSome(func(inEvent Event) { + eventQueue.Enqueue(inEvent) + }) + + return nil + }) + if err != nil { + return err + } + + // With our events processed, we'll now update our + // internal state. + currentState = transition.NextState + + // Notify our subscribers of the new state transition. + s.newStateEvents.NotifySubscribers(currentState) + + return nil + }) + if err != nil { + return currentState, err + } + } + + return currentState, nil +} + +// driveMachine is the main event loop of the state machine. It accepts any new +// incoming events, and then drives the state machine forward until it reaches +// a terminal state. +func (s *StateMachine[Event, Env]) driveMachine() { + defer s.wg.Done() + + // TODO(roasbeef): move into env? read only to start with + currentState := s.currentState + + // We just started driving the state machine, so we'll notify our + // subscribers of this starting state. + s.newStateEvents.NotifySubscribers(currentState) + + for { + select { + // We have a new external event, so we'll drive the state + // machine forward until we either run out of internal events, + // or we reach a terminal state. + case newEvent := <-s.events: + newState, err := s.applyEvents(newEvent) + if err != nil { + // TODO(roasbeef): hard error? + log.Errorf("unable to apply event: %v", err) + continue + } + + currentState = newState + + // An outside caller is querying our state, so we'll return the + // latest state. + case stateQuery := <-s.stateQuery: + if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) { + return + } + + case <-s.quit: + // TODO(roasbeef): logs, etc + // * something in env? + return + } + } +} diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go new file mode 100644 index 000000000..71760726e --- /dev/null +++ b/protofsm/state_machine_test.go @@ -0,0 +1,281 @@ +package protofsm + +import ( + "encoding/hex" + "fmt" + "sync/atomic" + "testing" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type dummyEvents interface { + dummy() +} + +type goToFin struct { +} + +func (g *goToFin) dummy() { +} + +type emitInternal struct { +} + +func (e *emitInternal) dummy() { +} + +type daemonEvents struct { +} + +func (s *daemonEvents) dummy() { +} + +type dummyEnv struct { + mock.Mock +} + +func (d *dummyEnv) Name() string { + return "test" +} + +type dummyStateStart struct { + canSend *atomic.Bool +} + +var ( + hexDecode = func(keyStr string) []byte { + keyBytes, _ := hex.DecodeString(keyStr) + return keyBytes + } + pub1, _ = btcec.ParsePubKey(hexDecode( + "02ec95e4e8ad994861b95fc5986eedaac24739e5ea3d0634db4c8ccd44cd" + + "a126ea", + )) + pub2, _ = btcec.ParsePubKey(hexDecode( + "0356167ba3e54ac542e86e906d4186aba9ca0b9df45001c62b753d33fe06" + + "f5b4e8", + )) +) + +func (d *dummyStateStart) ProcessEvent(event dummyEvents, env *dummyEnv, +) (*StateTransition[dummyEvents, *dummyEnv], error) { + + switch event.(type) { + case *goToFin: + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateFin{}, + }, nil + + // This state will loop back upon itself, but will also emit an event + // to head to the terminal state. + case *emitInternal: + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateStart{}, + NewEvents: fn.Some(EmittedEvent[dummyEvents]{ + InternalEvent: fn.Some(dummyEvents(&goToFin{})), + }), + }, nil + + // This state will proceed to the terminal state, but will emit all the + // possible daemon events. + case *daemonEvents: + // This send event can only succeed once the bool turns to + // true. After that, then we'll expect another event to take us + // to the final state. + sendEvent := &SendMsgEvent[dummyEvents]{ + TargetPeer: *pub1, + SendWhen: fn.Some(func() bool { + return d.canSend.Load() + }), + PostSendEvent: fn.Some(dummyEvents(&goToFin{})), + } + + // We'll also send out a normal send event that doesn't have + // any preconditions. + sendEvent2 := &SendMsgEvent[dummyEvents]{ + TargetPeer: *pub2, + } + + return &StateTransition[dummyEvents, *dummyEnv]{ + // We'll state in this state until the send succeeds + // based on our predicate. Then it'll transition to the + // final state. + NextState: &dummyStateStart{ + canSend: d.canSend, + }, + NewEvents: fn.Some(EmittedEvent[dummyEvents]{ + ExternalEvents: fn.Some(DaemonEventSet{ + sendEvent, &BroadcastTxn{}, sendEvent2, + }), + }), + }, nil + } + + return nil, fmt.Errorf("unknown event: %T", event) +} + +func (d *dummyStateStart) IsTerminal() bool { + return false +} + +type dummyStateFin struct { +} + +func (d *dummyStateFin) ProcessEvent(event dummyEvents, env *dummyEnv, +) (*StateTransition[dummyEvents, *dummyEnv], error) { + + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateFin{}, + }, nil +} + +func (d *dummyStateFin) IsTerminal() bool { + return true +} + +func assertState[Event any, Env Environment](t *testing.T, + m *StateMachine[Event, Env], expectedState State[Event, Env]) { + + state, err := m.CurrentState() + require.NoError(t, err) + require.IsType(t, expectedState, state) +} + +func assertStateTransitions[Event any, Env Environment]( + t *testing.T, stateSub StateSubscriber[Event, Env], + expectedStates []State[Event, Env]) { + + for _, expectedState := range expectedStates { + newState := <-stateSub.NewItemCreated.ChanOut() + + require.IsType(t, expectedState, newState) + } +} + +type dummyAdapters struct { + mock.Mock +} + +func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error { + args := d.Called(pub, msgs) + + return args.Error(0) +} + +func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error { + args := d.Called(tx, label) + + return args.Error(0) +} + +// TestStateMachineInternalEvents tests that the state machine is able to add +// new internal events to the event queue for further processing during a state +// transition. +func TestStateMachineInternalEvents(t *testing.T) { + t.Parallel() + + // First, we'll create our state machine given the env, and our + // starting state. + env := &dummyEnv{} + startingState := &dummyStateStart{} + + adapters := &dummyAdapters{} + + stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( + adapters, startingState, env, + ) + stateMachine.Start() + defer stateMachine.Stop() + + // As we're triggering internal events, we'll also subscribe to the set + // of new states so we can assert as we go. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + // For this transition, we'll send in the emitInternal event, which'll + // send us back to the starting event, but emit an internal event. + stateMachine.SendEvent(&emitInternal{}) + + // We'll now also assert the path we took to get here to ensure the + // internal events were processed. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateStart{}, &dummyStateStart{}, &dummyStateFin{}, + } + assertStateTransitions( + t, stateSub, expectedStates, + ) + + // We should ultimately end up in the terminal state. + assertState[dummyEvents, *dummyEnv](t, &stateMachine, &dummyStateFin{}) + + // Make sure all the env expectations were met. + env.AssertExpectations(t) +} + +// TestStateMachineDaemonEvents tests that the state machine is able to process +// daemon emitted as part of the state transition process. +func TestStateMachineDaemonEvents(t *testing.T) { + t.Parallel() + + // First, we'll create our state machine given the env, and our + // starting state. + env := &dummyEnv{} + + var boolTrigger atomic.Bool + startingState := &dummyStateStart{ + canSend: &boolTrigger, + } + + adapters := &dummyAdapters{} + + stateMachine := NewStateMachine[dummyEvents, *dummyEnv]( + adapters, startingState, env, + ) + stateMachine.Start() + defer stateMachine.Stop() + + // As we're triggering internal events, we'll also subscribe to the set + // of new states so we can assert as we go. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + // As soon as we send in the daemon event, we expect the + // disable+broadcast events to be processed, as they are unconditional. + adapters.On("BroadcastTransaction", mock.Anything, mock.Anything).Return(nil) + adapters.On("SendMessages", *pub2, mock.Anything).Return(nil) + + // We'll start off by sending in the daemon event, which'll trigger the + // state machine to execute the series of daemon events. + stateMachine.SendEvent(&daemonEvents{}) + + // We should transition back to the starting state now, after we + // started from the very same state. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateStart{}, &dummyStateStart{}, + } + assertStateTransitions(t, stateSub, expectedStates) + + // At this point, we expect that the two methods above were called. + adapters.AssertExpectations(t) + + // However, we don't expect the SendMessages for the first peer target + // to be called yet, as the condition hasn't yet been met. + adapters.AssertNotCalled(t, "SendMessages", *pub1) + + // We'll now flip the bool to true, which should cause the SendMessages + // method to be called, and for us to transition to the final state. + boolTrigger.Store(true) + adapters.On("SendMessages", *pub1, mock.Anything).Return(nil) + + expectedStates = []State[dummyEvents, *dummyEnv]{&dummyStateFin{}} + assertStateTransitions(t, stateSub, expectedStates) + + adapters.AssertExpectations(t) + env.AssertExpectations(t) +}