protofsm: add new package for driving generic protocol FSMs

In this PR, we create a new package, `protofsm` which is intended to
abstract away from something we've done dozens of time in the daemon:
create a new event-drive protocol FSM. One example of this is the co-op
close state machine, and also the channel state machine itself.

This packages picks out the common themes of:

  * clear states and transitions between them
  * calling out to special daemon adapters for I/O such as transaction
    broadcast or sending a message to a peer
  * cleaning up after state machine execution
  * notifying relevant callers of updates to the state machine

The goal of this PR, is that devs can now implement a state machine
based off of this primary interface:
```go
// State defines an abstract state along, namely its state transition function
// that takes as input an event and an environment, and returns a state
// transition (next state, and set of events to emit). As state can also either
// be terminal, or not, a terminal event causes state execution to halt.
type State[Event any, Env Environment] interface {
	// ProcessEvent takes an event and an environment, and returns a new
	// state transition. This will be iteratively called until either a
	// terminal state is reached, or no further internal events are
	// emitted.
	ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)

	// IsTerminal returns true if this state is terminal, and false otherwise.
	IsTerminal() bool
}
```

With their focus being _only_ on each state transition, rather than all
the boiler plate involved (processing new events, advancing to
completion, doing I/O, etc, etc).

Instead, they just make their states, then create the state machine
given the starting state and env. The only other custom component needed
is something capable of mapping wire messages or other events from the
"outside world" into the domain of the state machine.

The set of types is based on a pseudo sum type system wherein you
declare an interface, make the sole method private, then create other
instances based on that interface. This restricts call sites (must pass
in that interface) type, and with some tooling, exhaustive matching can
also be enforced via a linter.

The best way to get a hang of the pattern proposed here is to check out
the tests. They make a mock state machine, and then use the new executor
to drive it to completion. You'll also get a view of how the code will
actually look, with the focus being on the: input event, current state,
and output transition (can also emit events to drive itself forward).
This commit is contained in:
Olaoluwa Osuntokun
2024-01-02 17:30:20 -08:00
parent 95b248a1ef
commit 3bae7f32cd
4 changed files with 779 additions and 0 deletions

66
protofsm/daemon_events.go Normal file
View File

@@ -0,0 +1,66 @@
package protofsm
import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
)
// DaemonEvent is a special event that can be emmitted by a state transition
// function. A state machine can use this to perform side effects, such as
// sending a message to a peer, or broadcasting a transaction.
type DaemonEvent interface {
daemonSealed()
}
// DaemonEventSet is a set of daemon events that can be emitted by a state
// transition.
type DaemonEventSet []DaemonEvent
// DaemonEvents is a special type constraint that enumerates all the possible
// types of daemon events.
type DaemonEvents interface {
SendMsgEvent[any] | BroadcastTxn
}
// SendPredicate is a function that returns true if the target message should
// sent.
type SendPredicate = func() bool
// SendMsgEvent is a special event that can be emitted by a state transition
// that instructs the daemon to send the contained message to the target peer.
type SendMsgEvent[Event any] struct {
// TargetPeer is the peer to send the message to.
TargetPeer btcec.PublicKey
// Msgs is the set of messages to send to the target peer.
Msgs []lnwire.Message
// SendWhen implements a system for a conditional send once a special
// send predicate has been met.
//
// TODO(roasbeef): contrast with usage of OnCommitFlush, etc
SendWhen fn.Option[SendPredicate]
// PostSendEvent is an optional event that is to be emitted after the
// message has been sent. If a SendWhen is specified, then this will
// only be executed after that returns true to unblock the send.
PostSendEvent fn.Option[Event]
}
// daemonSealed indicates that this struct is a DaemonEvent instance.
func (s *SendMsgEvent[E]) daemonSealed() {}
// BroadcastTxn indicates the target transaction should be broadcast to the
// network.
type BroadcastTxn struct {
// Tx is the transaction to broadcast.
Tx *wire.MsgTx
// Label is an optional label to attach to the transaction.
Label string
}
// daemonSealed indicates that this struct is a DaemonEvent instance.
func (b *BroadcastTxn) daemonSealed() {}

29
protofsm/log.go Normal file
View File

@@ -0,0 +1,29 @@
package protofsm
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("PRCL", nil))
}
// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

403
protofsm/state_machine.go Normal file
View File

@@ -0,0 +1,403 @@
package protofsm
import (
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
)
const (
// pollInterval is the interval at which we'll poll the SendWhen
// predicate if specified.
pollInterval = time.Millisecond * 100
)
// EmittedEvent is a special type that can be emitted by a state transition.
// This can container internal events which are to be routed back to the state,
// or external events which are to be sent to the daemon.
type EmittedEvent[Event any] struct {
// InternalEvent is an optional internal event that is to be routed
// back to the target state. This enables state to trigger one or many
// state transitions without a new external event.
InternalEvent fn.Option[Event]
// ExternalEvent is an optional external event that is to be sent to
// the daemon for dispatch. Usually, this is some form of I/O.
ExternalEvents fn.Option[DaemonEventSet]
}
// StateTransition is a state transition type. It denotes the next state to go
// to, and also the set of events to emit.
type StateTransition[Event any, Env Environment] struct {
// NextState is the next state to transition to.
NextState State[Event, Env]
// NewEvents is the set of events to emit.
NewEvents fn.Option[EmittedEvent[Event]]
}
// Environment is an abstract interface that represents the environment that
// the state machine will execute using. From the PoV of the main state machine
// executor, we just care about being able to clean up any resources that were
// allocated by the environment.
type Environment interface {
// Name returns the name of the environment. This is used to uniquely
// identify the environment of related state machines.
Name() string
}
// State defines an abstract state along, namely its state transition function
// that takes as input an event and an environment, and returns a state
// transition (next state, and set of events to emit). As state can also either
// be terminal, or not, a terminal event causes state execution to halt.
type State[Event any, Env Environment] interface {
// ProcessEvent takes an event and an environment, and returns a new
// state transition. This will be iteratively called until either a
// terminal state is reached, or no further internal events are
// emitted.
ProcessEvent(event Event, env Env) (*StateTransition[Event, Env], error)
// IsTerminal returns true if this state is terminal, and false otherwise.
IsTerminal() bool
// TODO(roasbeef): also add state serialization?
}
// DaemonAdapters is a set of methods that server as adapters to bridge the
// pure world of the FSM to the real world of the daemon. These will be used to
// do things like broadcast transactions, or send messages to peers.
type DaemonAdapters interface {
// SendMessages sends the target set of messages to the target peer.
SendMessages(btcec.PublicKey, []lnwire.Message) error
// BroadcastTransaction broadcasts a transaction with the target label.
BroadcastTransaction(*wire.MsgTx, string) error
}
// stateQuery is used by outside callers to query the internal state of the
// state machine.
type stateQuery[Event any, Env Environment] struct {
// CurrentState is a channel that will be sent the current state of the
// state machine.
CurrentState chan State[Event, Env]
}
// StateMachine represents an abstract FSM that is able to process new incoming
// events and drive a state machine to termination. This implementation uses
// type params to abstract over the types of events and environment. Events
// trigger new state transitions, that use the environment to perform some
// action.
//
// TODO(roasbeef): terminal check, daemon event execution, init?
type StateMachine[Event any, Env Environment] struct {
currentState State[Event, Env]
env Env
daemon DaemonAdapters
events chan Event
quit chan struct{}
wg sync.WaitGroup
// newStateEvents is an EventDistributor that will be used to notify
// any relevant callers of new state transitions that occur.
newStateEvents *fn.EventDistributor[State[Event, Env]]
stateQuery chan stateQuery[Event, Env]
startOnce sync.Once
stopOnce sync.Once
// TODO(roasbeef): also use that context guard here?
}
// NewStateMachine creates a new state machine given a set of daemon adapters,
// an initial state, and an environment.
func NewStateMachine[Event any, Env Environment](adapters DaemonAdapters,
initialState State[Event, Env],
env Env) StateMachine[Event, Env] {
return StateMachine[Event, Env]{
daemon: adapters,
events: make(chan Event, 1),
currentState: initialState,
stateQuery: make(chan stateQuery[Event, Env]),
quit: make(chan struct{}),
env: env,
newStateEvents: fn.NewEventDistributor[State[Event, Env]](),
}
}
// Start starts the state machine. This will spawn a goroutine that will drive
// the state machine to completion.
func (s *StateMachine[Event, Env]) Start() {
s.startOnce.Do(func() {
s.wg.Add(1)
go s.driveMachine()
})
}
// Stop stops the state machine. This will block until the state machine has
// reached a stopping point.
func (s *StateMachine[Event, Env]) Stop() {
s.stopOnce.Do(func() {
close(s.quit)
s.wg.Wait()
})
}
// SendEvent sends a new event to the state machine.
//
// TODO(roasbeef): bool if processed?
func (s *StateMachine[Event, Env]) SendEvent(event Event) {
select {
case s.events <- event:
case <-s.quit:
return
}
}
// CurrentState returns the current state of the state machine.
func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
query := stateQuery[Event, Env]{
CurrentState: make(chan State[Event, Env], 1),
}
if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
return nil, fmt.Errorf("state machine is shutting down")
}
return fn.RecvOrTimeout(query.CurrentState, time.Second)
}
// StateSubscriber represents an active subscription to be notified of new
// state transitions.
type StateSubscriber[E any, F Environment] *fn.EventReceiver[State[E, F]]
// RegisterStateEvents registers a new event listener that will be notified of
// new state transitions.
func (s *StateMachine[Event, Env]) RegisterStateEvents() StateSubscriber[Event, Env] {
subscriber := fn.NewEventReceiver[State[Event, Env]](10)
// TODO(roasbeef): instead give the state and the input event?
s.newStateEvents.RegisterSubscriber(subscriber)
return subscriber
}
// RemoveStateSub removes the target state subscriber from the set of active
// subscribers.
func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[Event, Env]) {
s.newStateEvents.RemoveSubscriber(sub)
}
// executeDaemonEvent executes a daemon event, which is a special type of event
// that can be emitted as part of the state transition function of the state
// machine. An error is returned if the type of event is unknown.
func (s *StateMachine[Event, Env]) executeDaemonEvent(event DaemonEvent) error {
switch daemonEvent := event.(type) {
// This is a send message event, so we'll send the event, and also mind
// any preconditions as well as post-send events.
case *SendMsgEvent[Event]:
sendAndCleanUp := func() error {
err := s.daemon.SendMessages(
daemonEvent.TargetPeer, daemonEvent.Msgs,
)
if err != nil {
return fmt.Errorf("unable to send msgs: %w", err)
}
// If a post-send event was specified, then we'll
// funnel that back into the main state machine now as
// well.
daemonEvent.PostSendEvent.WhenSome(func(event Event) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.SendEvent(event)
}()
})
return nil
}
// If this doesn't have a SendWhen predicate, then we can just
// send it off right away.
if !daemonEvent.SendWhen.IsSome() {
return sendAndCleanUp()
}
// Otherwise, this has a SendWhen predicate, so we'll need
// launch a goroutine to poll the SendWhen, then send only once
// the predicate is true.
s.wg.Add(1)
go func() {
defer s.wg.Done()
predicateTicker := time.NewTicker(pollInterval)
defer predicateTicker.Stop()
for {
select {
case <-predicateTicker.C:
canSend := fn.MapOptionZ(
daemonEvent.SendWhen,
func(pred SendPredicate) bool {
return pred()
},
)
if canSend {
sendAndCleanUp()
return
}
case <-s.quit:
return
}
}
}()
return nil
// If this is a broadcast transaction event, then we'll broadcast with
// the label attached.
case *BroadcastTxn:
err := s.daemon.BroadcastTransaction(
daemonEvent.Tx, daemonEvent.Label,
)
if err != nil {
// TODO(roasbeef): hook has channel read event event is
// hit?
return fmt.Errorf("unable to broadcast txn: %w", err)
}
return nil
}
return fmt.Errorf("unknown daemon event: %T", event)
}
// applyEvents applies a new event to the state machine. This will continue
// until no further events are emitted by the state machine. Along the way,
// we'll also ensure to execute any daemon events that are emitted.
func (s *StateMachine[Event, Env]) applyEvents(newEvent Event) (State[Event, Env], error) {
// TODO(roasbeef): make starting state as part of env?
currentState := s.currentState
eventQueue := fn.NewQueue(newEvent)
// Given the next event to handle, we'll process the event, then add
// any new emitted internal events to our event queue. This continues
// until we reach a terminal state, or we run out of internal events to
// process.
for nextEvent := eventQueue.Dequeue(); nextEvent.IsSome(); nextEvent = eventQueue.Dequeue() {
err := fn.MapOptionZ(nextEvent, func(event Event) error {
// Apply the state transition function of the current
// state given this new event and our existing env.
transition, err := currentState.ProcessEvent(
event, s.env,
)
if err != nil {
return err
}
newEvents := transition.NewEvents
err = fn.MapOptionZ(newEvents, func(events EmittedEvent[Event]) error {
// With the event processed, we'll process any
// new daemon events that were emitted as part
// of this new state transition.
err := fn.MapOptionZ(events.ExternalEvents, func(dEvents DaemonEventSet) error {
for _, dEvent := range dEvents {
err := s.executeDaemonEvent(dEvent)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
// Next, we'll add any new emitted events to
// our event queue.
events.InternalEvent.WhenSome(func(inEvent Event) {
eventQueue.Enqueue(inEvent)
})
return nil
})
if err != nil {
return err
}
// With our events processed, we'll now update our
// internal state.
currentState = transition.NextState
// Notify our subscribers of the new state transition.
s.newStateEvents.NotifySubscribers(currentState)
return nil
})
if err != nil {
return currentState, err
}
}
return currentState, nil
}
// driveMachine is the main event loop of the state machine. It accepts any new
// incoming events, and then drives the state machine forward until it reaches
// a terminal state.
func (s *StateMachine[Event, Env]) driveMachine() {
defer s.wg.Done()
// TODO(roasbeef): move into env? read only to start with
currentState := s.currentState
// We just started driving the state machine, so we'll notify our
// subscribers of this starting state.
s.newStateEvents.NotifySubscribers(currentState)
for {
select {
// We have a new external event, so we'll drive the state
// machine forward until we either run out of internal events,
// or we reach a terminal state.
case newEvent := <-s.events:
newState, err := s.applyEvents(newEvent)
if err != nil {
// TODO(roasbeef): hard error?
log.Errorf("unable to apply event: %v", err)
continue
}
currentState = newState
// An outside caller is querying our state, so we'll return the
// latest state.
case stateQuery := <-s.stateQuery:
if !fn.SendOrQuit(stateQuery.CurrentState, currentState, s.quit) {
return
}
case <-s.quit:
// TODO(roasbeef): logs, etc
// * something in env?
return
}
}
}

View File

@@ -0,0 +1,281 @@
package protofsm
import (
"encoding/hex"
"fmt"
"sync/atomic"
"testing"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type dummyEvents interface {
dummy()
}
type goToFin struct {
}
func (g *goToFin) dummy() {
}
type emitInternal struct {
}
func (e *emitInternal) dummy() {
}
type daemonEvents struct {
}
func (s *daemonEvents) dummy() {
}
type dummyEnv struct {
mock.Mock
}
func (d *dummyEnv) Name() string {
return "test"
}
type dummyStateStart struct {
canSend *atomic.Bool
}
var (
hexDecode = func(keyStr string) []byte {
keyBytes, _ := hex.DecodeString(keyStr)
return keyBytes
}
pub1, _ = btcec.ParsePubKey(hexDecode(
"02ec95e4e8ad994861b95fc5986eedaac24739e5ea3d0634db4c8ccd44cd" +
"a126ea",
))
pub2, _ = btcec.ParsePubKey(hexDecode(
"0356167ba3e54ac542e86e906d4186aba9ca0b9df45001c62b753d33fe06" +
"f5b4e8",
))
)
func (d *dummyStateStart) ProcessEvent(event dummyEvents, env *dummyEnv,
) (*StateTransition[dummyEvents, *dummyEnv], error) {
switch event.(type) {
case *goToFin:
return &StateTransition[dummyEvents, *dummyEnv]{
NextState: &dummyStateFin{},
}, nil
// This state will loop back upon itself, but will also emit an event
// to head to the terminal state.
case *emitInternal:
return &StateTransition[dummyEvents, *dummyEnv]{
NextState: &dummyStateStart{},
NewEvents: fn.Some(EmittedEvent[dummyEvents]{
InternalEvent: fn.Some(dummyEvents(&goToFin{})),
}),
}, nil
// This state will proceed to the terminal state, but will emit all the
// possible daemon events.
case *daemonEvents:
// This send event can only succeed once the bool turns to
// true. After that, then we'll expect another event to take us
// to the final state.
sendEvent := &SendMsgEvent[dummyEvents]{
TargetPeer: *pub1,
SendWhen: fn.Some(func() bool {
return d.canSend.Load()
}),
PostSendEvent: fn.Some(dummyEvents(&goToFin{})),
}
// We'll also send out a normal send event that doesn't have
// any preconditions.
sendEvent2 := &SendMsgEvent[dummyEvents]{
TargetPeer: *pub2,
}
return &StateTransition[dummyEvents, *dummyEnv]{
// We'll state in this state until the send succeeds
// based on our predicate. Then it'll transition to the
// final state.
NextState: &dummyStateStart{
canSend: d.canSend,
},
NewEvents: fn.Some(EmittedEvent[dummyEvents]{
ExternalEvents: fn.Some(DaemonEventSet{
sendEvent, &BroadcastTxn{}, sendEvent2,
}),
}),
}, nil
}
return nil, fmt.Errorf("unknown event: %T", event)
}
func (d *dummyStateStart) IsTerminal() bool {
return false
}
type dummyStateFin struct {
}
func (d *dummyStateFin) ProcessEvent(event dummyEvents, env *dummyEnv,
) (*StateTransition[dummyEvents, *dummyEnv], error) {
return &StateTransition[dummyEvents, *dummyEnv]{
NextState: &dummyStateFin{},
}, nil
}
func (d *dummyStateFin) IsTerminal() bool {
return true
}
func assertState[Event any, Env Environment](t *testing.T,
m *StateMachine[Event, Env], expectedState State[Event, Env]) {
state, err := m.CurrentState()
require.NoError(t, err)
require.IsType(t, expectedState, state)
}
func assertStateTransitions[Event any, Env Environment](
t *testing.T, stateSub StateSubscriber[Event, Env],
expectedStates []State[Event, Env]) {
for _, expectedState := range expectedStates {
newState := <-stateSub.NewItemCreated.ChanOut()
require.IsType(t, expectedState, newState)
}
}
type dummyAdapters struct {
mock.Mock
}
func (d *dummyAdapters) SendMessages(pub btcec.PublicKey, msgs []lnwire.Message) error {
args := d.Called(pub, msgs)
return args.Error(0)
}
func (d *dummyAdapters) BroadcastTransaction(tx *wire.MsgTx, label string) error {
args := d.Called(tx, label)
return args.Error(0)
}
// TestStateMachineInternalEvents tests that the state machine is able to add
// new internal events to the event queue for further processing during a state
// transition.
func TestStateMachineInternalEvents(t *testing.T) {
t.Parallel()
// First, we'll create our state machine given the env, and our
// starting state.
env := &dummyEnv{}
startingState := &dummyStateStart{}
adapters := &dummyAdapters{}
stateMachine := NewStateMachine[dummyEvents, *dummyEnv](
adapters, startingState, env,
)
stateMachine.Start()
defer stateMachine.Stop()
// As we're triggering internal events, we'll also subscribe to the set
// of new states so we can assert as we go.
stateSub := stateMachine.RegisterStateEvents()
defer stateMachine.RemoveStateSub(stateSub)
// For this transition, we'll send in the emitInternal event, which'll
// send us back to the starting event, but emit an internal event.
stateMachine.SendEvent(&emitInternal{})
// We'll now also assert the path we took to get here to ensure the
// internal events were processed.
expectedStates := []State[dummyEvents, *dummyEnv]{
&dummyStateStart{}, &dummyStateStart{}, &dummyStateFin{},
}
assertStateTransitions(
t, stateSub, expectedStates,
)
// We should ultimately end up in the terminal state.
assertState[dummyEvents, *dummyEnv](t, &stateMachine, &dummyStateFin{})
// Make sure all the env expectations were met.
env.AssertExpectations(t)
}
// TestStateMachineDaemonEvents tests that the state machine is able to process
// daemon emitted as part of the state transition process.
func TestStateMachineDaemonEvents(t *testing.T) {
t.Parallel()
// First, we'll create our state machine given the env, and our
// starting state.
env := &dummyEnv{}
var boolTrigger atomic.Bool
startingState := &dummyStateStart{
canSend: &boolTrigger,
}
adapters := &dummyAdapters{}
stateMachine := NewStateMachine[dummyEvents, *dummyEnv](
adapters, startingState, env,
)
stateMachine.Start()
defer stateMachine.Stop()
// As we're triggering internal events, we'll also subscribe to the set
// of new states so we can assert as we go.
stateSub := stateMachine.RegisterStateEvents()
defer stateMachine.RemoveStateSub(stateSub)
// As soon as we send in the daemon event, we expect the
// disable+broadcast events to be processed, as they are unconditional.
adapters.On("BroadcastTransaction", mock.Anything, mock.Anything).Return(nil)
adapters.On("SendMessages", *pub2, mock.Anything).Return(nil)
// We'll start off by sending in the daemon event, which'll trigger the
// state machine to execute the series of daemon events.
stateMachine.SendEvent(&daemonEvents{})
// We should transition back to the starting state now, after we
// started from the very same state.
expectedStates := []State[dummyEvents, *dummyEnv]{
&dummyStateStart{}, &dummyStateStart{},
}
assertStateTransitions(t, stateSub, expectedStates)
// At this point, we expect that the two methods above were called.
adapters.AssertExpectations(t)
// However, we don't expect the SendMessages for the first peer target
// to be called yet, as the condition hasn't yet been met.
adapters.AssertNotCalled(t, "SendMessages", *pub1)
// We'll now flip the bool to true, which should cause the SendMessages
// method to be called, and for us to transition to the final state.
boolTrigger.Store(true)
adapters.On("SendMessages", *pub1, mock.Anything).Return(nil)
expectedStates = []State[dummyEvents, *dummyEnv]{&dummyStateFin{}}
assertStateTransitions(t, stateSub, expectedStates)
adapters.AssertExpectations(t)
env.AssertExpectations(t)
}