mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-26 21:51:27 +02:00
multi: update to fn v2
This commit is contained in:
@@ -5,7 +5,7 @@ import (
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/fn/v2"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
|
@@ -1,7 +1,7 @@
|
||||
package protofsm
|
||||
|
||||
import (
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/fn/v2"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
|
||||
|
@@ -10,7 +10,7 @@ import (
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/fn/v2"
|
||||
"github.com/lightningnetwork/lnd/lnutils"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
)
|
||||
@@ -21,6 +21,12 @@ const (
|
||||
pollInterval = time.Millisecond * 100
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrStateMachineShutdown occurs when trying to feed an event to a
|
||||
// StateMachine that has been asked to Stop.
|
||||
ErrStateMachineShutdown = fmt.Errorf("StateMachine is shutting down")
|
||||
)
|
||||
|
||||
// EmittedEvent is a special type that can be emitted by a state transition.
|
||||
// This can container internal events which are to be routed back to the state,
|
||||
// or external events which are to be sent to the daemon.
|
||||
@@ -287,7 +293,7 @@ func (s *StateMachine[Event, Env]) CurrentState() (State[Event, Env], error) {
|
||||
}
|
||||
|
||||
if !fn.SendOrQuit(s.stateQuery, query, s.quit) {
|
||||
return nil, fmt.Errorf("state machine is shutting down")
|
||||
return nil, ErrStateMachineShutdown
|
||||
}
|
||||
|
||||
return fn.RecvOrTimeout(query.CurrentState, time.Second)
|
||||
@@ -322,6 +328,8 @@ func (s *StateMachine[Event, Env]) RemoveStateSub(sub StateSubscriber[
|
||||
// executeDaemonEvent executes a daemon event, which is a special type of event
|
||||
// that can be emitted as part of the state transition function of the state
|
||||
// machine. An error is returned if the type of event is unknown.
|
||||
//
|
||||
//nolint:funlen
|
||||
func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
event DaemonEvent) error {
|
||||
|
||||
@@ -347,7 +355,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
// If a post-send event was specified, then we'll funnel
|
||||
// that back into the main state machine now as well.
|
||||
return fn.MapOptionZ(daemonEvent.PostSendEvent, func(event Event) error { //nolint:ll
|
||||
return s.wg.Go(func(ctx context.Context) {
|
||||
launched := s.wg.Go(func(ctx context.Context) {
|
||||
log.Debugf("FSM(%v): sending "+
|
||||
"post-send event: %v",
|
||||
s.cfg.Env.Name(),
|
||||
@@ -356,6 +364,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
|
||||
s.SendEvent(event)
|
||||
})
|
||||
|
||||
if !launched {
|
||||
return ErrStateMachineShutdown
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -368,7 +382,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
// Otherwise, this has a SendWhen predicate, so we'll need
|
||||
// launch a goroutine to poll the SendWhen, then send only once
|
||||
// the predicate is true.
|
||||
return s.wg.Go(func(ctx context.Context) {
|
||||
launched := s.wg.Go(func(ctx context.Context) {
|
||||
predicateTicker := time.NewTicker(
|
||||
s.cfg.CustomPollInterval.UnwrapOr(pollInterval),
|
||||
)
|
||||
@@ -407,6 +421,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
}
|
||||
})
|
||||
|
||||
if !launched {
|
||||
return ErrStateMachineShutdown
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
// If this is a broadcast transaction event, then we'll broadcast with
|
||||
// the label attached.
|
||||
case *BroadcastTxn:
|
||||
@@ -436,7 +456,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
return fmt.Errorf("unable to register spend: %w", err)
|
||||
}
|
||||
|
||||
return s.wg.Go(func(ctx context.Context) {
|
||||
launched := s.wg.Go(func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case spend, ok := <-spendEvent.Spend:
|
||||
@@ -461,6 +481,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
}
|
||||
})
|
||||
|
||||
if !launched {
|
||||
return ErrStateMachineShutdown
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
// The state machine has requested a new event to be sent once a
|
||||
// specified txid+pkScript pair has confirmed.
|
||||
case *RegisterConf[Event]:
|
||||
@@ -476,7 +502,7 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
return fmt.Errorf("unable to register conf: %w", err)
|
||||
}
|
||||
|
||||
return s.wg.Go(func(ctx context.Context) {
|
||||
launched := s.wg.Go(func(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-confEvent.Confirmed:
|
||||
@@ -498,6 +524,12 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if !launched {
|
||||
return ErrStateMachineShutdown
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("unknown daemon event: %T", event)
|
||||
|
@@ -10,7 +10,7 @@ import (
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/fn/v2"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
Reference in New Issue
Block a user