lnrpc: add StateService

This commit is contained in:
Johan T. Halseth
2021-02-09 15:12:21 +01:00
parent 419ad86209
commit 9ef556624e
8 changed files with 715 additions and 2 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/subscribe"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
@@ -66,6 +67,10 @@ var (
"/lnrpc.WalletUnlocker/InitWallet": {},
"/lnrpc.WalletUnlocker/UnlockWallet": {},
"/lnrpc.WalletUnlocker/ChangePassword": {},
// The State service must be available at all times, even
// before we can check macaroons, so we whitelist it.
"/lnrpc.State/SubscribeState": {},
}
)
@@ -73,9 +78,16 @@ var (
// intercepting API calls. This is useful for logging, enforcing permissions
// etc.
type InterceptorChain struct {
started sync.Once
stopped sync.Once
// state is the current RPC state of our RPC server.
state rpcState
// ntfnServer is a subscription server we use to notify clients of the
// State service when the state changes.
ntfnServer *subscribe.Server
// noMacaroons should be set true if we don't want to check macaroons.
noMacaroons bool
@@ -89,9 +101,14 @@ type InterceptorChain struct {
// rpcsLog is the logger used to log calles to the RPCs intercepted.
rpcsLog btclog.Logger
quit chan struct{}
sync.RWMutex
}
// A compile time check to ensure that InterceptorChain fully implements the
// StateServer gRPC service.
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
// NewInterceptorChain creates a new InterceptorChain.
func NewInterceptorChain(log btclog.Logger, noMacaroons,
walletExists bool) *InterceptorChain {
@@ -103,12 +120,36 @@ func NewInterceptorChain(log btclog.Logger, noMacaroons,
return &InterceptorChain{
state: startState,
ntfnServer: subscribe.NewServer(),
noMacaroons: noMacaroons,
permissionMap: make(map[string][]bakery.Op),
rpcsLog: log,
quit: make(chan struct{}),
}
}
// Start starts the InterceptorChain, which is needed to start the state
// subscription server it powers.
func (r *InterceptorChain) Start() error {
var err error
r.started.Do(func() {
err = r.ntfnServer.Start()
})
return err
}
// Stop stops the InterceptorChain and its internal state subscription server.
func (r *InterceptorChain) Stop() error {
var err error
r.stopped.Do(func() {
close(r.quit)
err = r.ntfnServer.Stop()
})
return err
}
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
// walletLocked to walletUnlocked.
func (r *InterceptorChain) SetWalletUnlocked() {
@@ -116,6 +157,7 @@ func (r *InterceptorChain) SetWalletUnlocked() {
defer r.Unlock()
r.state = walletUnlocked
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
@@ -124,6 +166,75 @@ func (r *InterceptorChain) SetRPCActive() {
defer r.Unlock()
r.state = rpcActive
_ = r.ntfnServer.SendUpdate(r.state)
}
// SubscribeState subscribes to the state of the wallet. The current wallet
// state will always be delivered immediately.
//
// NOTE: Part of the StateService interface.
func (r *InterceptorChain) SubscribeState(req *lnrpc.SubscribeStateRequest,
stream lnrpc.State_SubscribeStateServer) error {
sendStateUpdate := func(state rpcState) error {
resp := &lnrpc.SubscribeStateResponse{}
switch state {
case walletNotCreated:
resp.State = lnrpc.WalletState_NON_EXISTING
case walletLocked:
resp.State = lnrpc.WalletState_LOCKED
case walletUnlocked:
resp.State = lnrpc.WalletState_UNLOCKED
case rpcActive:
resp.State = lnrpc.WalletState_RPC_ACTIVE
default:
return fmt.Errorf("unknown wallet "+
"state %v", state)
}
return stream.Send(resp)
}
// Subscribe to state updates.
client, err := r.ntfnServer.Subscribe()
if err != nil {
return err
}
defer client.Cancel()
// Always start by sending the current state.
r.RLock()
state := r.state
r.RUnlock()
if err := sendStateUpdate(state); err != nil {
return err
}
for {
select {
case e := <-client.Updates():
newState := e.(rpcState)
// Ignore already sent state.
if newState == state {
continue
}
state = newState
err := sendStateUpdate(state)
if err != nil {
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-r.quit:
return fmt.Errorf("server exiting")
}
}
}
// AddMacaroonService adds a macaroon service to the interceptor. After this is
@@ -335,6 +446,13 @@ func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerIn
// checkRPCState checks whether a call to the given server is allowed in the
// current RPC state.
func (r *InterceptorChain) checkRPCState(srv interface{}) error {
// The StateService is being accessed, we allow the call regardless of
// the current state.
_, ok := srv.(lnrpc.StateServer)
if ok {
return nil
}
r.RLock()
state := r.state
r.RUnlock()