itest: use node.rpc namespace inside harness net

This commit adds a new struct RPCClients to better handle rpc clients.
A private field, rpc, is added to HarnessNode to prevent direct access
to its clients. Inside RPCClients, all clients are exported in case a
test case need access to a specific client.
This commit is contained in:
yyforyongyu 2021-09-18 13:45:55 +08:00
parent f8cf7c8775
commit f5f1289dab
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -296,8 +296,8 @@ type HarnessNode struct {
PubKey [33]byte PubKey [33]byte
PubKeyStr string PubKeyStr string
cmd *exec.Cmd // rpc holds a list of RPC clients.
logFile *os.File rpc *RPCClients
// chanWatchRequests receives a request for watching a particular event // chanWatchRequests receives a request for watching a particular event
// for a given channel. // for a given channel.
@ -317,41 +317,50 @@ type HarnessNode struct {
// node and the outpoint. // node and the outpoint.
policyUpdates policyUpdateMap policyUpdates policyUpdateMap
// runCtx is a context with cancel method. It's used to signal when the
// node needs to quit, and used as the parent context when spawning
// children contexts for RPC requests.
runCtx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
lnrpc.LightningClient
lnrpc.WalletUnlockerClient
invoicesrpc.InvoicesClient
// SignerClient cannot be embedded because the name collisions of the
// methods SignMessage and VerifyMessage.
SignerClient signrpc.SignerClient
// conn is the underlying connection to the grpc endpoint of the node.
conn *grpc.ClientConn
// RouterClient, WalletKitClient, WatchtowerClient cannot be embedded,
// because a name collision would occur with LightningClient.
RouterClient routerrpc.RouterClient
WalletKitClient walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
StateClient lnrpc.StateClient
// backupDbDir is the path where a database backup is stored, if any. // backupDbDir is the path where a database backup is stored, if any.
backupDbDir string backupDbDir string
// postgresDbName is the name of the postgres database where lnd data is // postgresDbName is the name of the postgres database where lnd data is
// stored in. // stored in.
postgresDbName string postgresDbName string
// runCtx is a context with cancel method. It's used to signal when the
// node needs to quit, and used as the parent context when spawning
// children contexts for RPC requests.
runCtx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
cmd *exec.Cmd
logFile *os.File
// TODO(yy): remove
lnrpc.LightningClient
lnrpc.WalletUnlockerClient
invoicesrpc.InvoicesClient
SignerClient signrpc.SignerClient
RouterClient routerrpc.RouterClient
WalletKitClient walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
StateClient lnrpc.StateClient
}
// RPCClients wraps a list of RPC clients into a single struct for easier
// access.
type RPCClients struct {
// conn is the underlying connection to the grpc endpoint of the node.
conn *grpc.ClientConn
LN lnrpc.LightningClient
WalletUnlocker lnrpc.WalletUnlockerClient
Invoice invoicesrpc.InvoicesClient
Signer signrpc.SignerClient
Router routerrpc.RouterClient
WalletKit walletrpc.WalletKitClient
Watchtower watchtowerrpc.WatchtowerClient
WatchtowerClient wtclientrpc.WatchtowerClientClient
State lnrpc.StateClient
} }
// Assert *HarnessNode implements the lnrpc.LightningClient interface. // Assert *HarnessNode implements the lnrpc.LightningClient interface.
@ -719,16 +728,20 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
return err return err
} }
// Init all the RPC clients.
hn.initRPCClients(conn)
// If the node was created with a seed, we will need to perform an // If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will // additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to // only use the TLS certs, and can only perform operations necessary to
// unlock the daemon. // unlock the daemon.
if hn.Cfg.HasSeed { if hn.Cfg.HasSeed {
// TODO(yy): remove
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn) hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
return nil return nil
} }
return hn.initLightningClient(conn) return hn.initLightningClient()
} }
// WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START". // WaitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
@ -757,7 +770,7 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface,
predicate func(state lnrpc.WalletState) bool) error { predicate func(state lnrpc.WalletState) bool) error {
stateClient := lnrpc.NewStateClient(conn) stateClient := lnrpc.NewStateClient(conn)
ctx, cancel := context.WithCancel(hn.runCtx) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
stateStream, err := stateClient.SubscribeState( stateStream, err := stateClient.SubscribeState(
@ -820,16 +833,21 @@ func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
return err return err
} }
// Init all the RPC clients.
hn.initRPCClients(conn)
// If the node was created with a seed, we will need to perform an // If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will // additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to // only use the TLS certs, and can only perform operations necessary to
// unlock the daemon. // unlock the daemon.
if hn.Cfg.HasSeed { if hn.Cfg.HasSeed {
// TODO(yy): remove
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn) hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
return nil return nil
} }
return hn.initLightningClient(conn) return hn.initLightningClient()
} }
// initClientWhenReady waits until the main gRPC server is detected as active, // initClientWhenReady waits until the main gRPC server is detected as active,
@ -847,7 +865,10 @@ func (hn *HarnessNode) initClientWhenReady(timeout time.Duration) error {
return err return err
} }
return hn.initLightningClient(conn) // Init all the RPC clients.
hn.initRPCClients(conn)
return hn.initLightningClient()
} }
// Init initializes a harness node by passing the init request via rpc. After // Init initializes a harness node by passing the init request via rpc. After
@ -889,7 +910,10 @@ func (hn *HarnessNode) Init(
return nil, err return nil, err
} }
return response, hn.initLightningClient(conn) // Init all the RPC clients.
hn.initRPCClients(conn)
return response, hn.initLightningClient()
} }
// InitChangePassword initializes a harness node by passing the change password // InitChangePassword initializes a harness node by passing the change password
@ -932,7 +956,10 @@ func (hn *HarnessNode) InitChangePassword(
return nil, err return nil, err
} }
return response, hn.initLightningClient(conn) // Init all the RPC clients.
hn.initRPCClients(conn)
return response, hn.initLightningClient()
} }
// Unlock attempts to unlock the wallet of the target HarnessNode. This method // Unlock attempts to unlock the wallet of the target HarnessNode. This method
@ -945,7 +972,8 @@ func (hn *HarnessNode) Unlock(unlockReq *lnrpc.UnlockWalletRequest) error {
// Otherwise, we'll need to unlock the node before it's able to start // Otherwise, we'll need to unlock the node before it's able to start
// up properly. // up properly.
if _, err := hn.UnlockWallet(ctxt, unlockReq); err != nil { _, err := hn.rpc.WalletUnlocker.UnlockWallet(ctxt, unlockReq)
if err != nil {
return err return err
} }
@ -960,7 +988,7 @@ func (hn *HarnessNode) waitTillServerStarted() error {
ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout) ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout)
defer cancel() defer cancel()
client, err := hn.StateClient.SubscribeState( client, err := hn.rpc.State.SubscribeState(
ctxt, &lnrpc.SubscribeStateRequest{}, ctxt, &lnrpc.SubscribeStateRequest{},
) )
if err != nil { if err != nil {
@ -978,17 +1006,33 @@ func (hn *HarnessNode) waitTillServerStarted() error {
return nil return nil
} }
} }
} }
// initLightningClient constructs the grpc LightningClient from the given client // initRPCClients initializes a list of RPC clients for the node.
// connection and subscribes the harness node to graph topology updates. func (hn *HarnessNode) initRPCClients(c *grpc.ClientConn) {
// This method also spawns a lightning network watcher for this node, hn.rpc = &RPCClients{
// which watches for topology changes. conn: c,
func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error { LN: lnrpc.NewLightningClient(c),
Invoice: invoicesrpc.NewInvoicesClient(c),
Router: routerrpc.NewRouterClient(c),
WalletKit: walletrpc.NewWalletKitClient(c),
WalletUnlocker: lnrpc.NewWalletUnlockerClient(c),
Watchtower: watchtowerrpc.NewWatchtowerClient(c),
WatchtowerClient: wtclientrpc.NewWatchtowerClientClient(c),
Signer: signrpc.NewSignerClient(c),
State: lnrpc.NewStateClient(c),
}
}
// initLightningClient blocks until the lnd server is fully started and
// subscribes the harness node to graph topology updates. This method also
// spawns a lightning network watcher for this node, which watches for topology
// changes.
func (hn *HarnessNode) initLightningClient() error {
// TODO(yy): remove
// Construct the LightningClient that will allow us to use the // Construct the LightningClient that will allow us to use the
// HarnessNode directly for normal rpc operations. // HarnessNode directly for normal rpc operations.
hn.conn = conn conn := hn.rpc.conn
hn.LightningClient = lnrpc.NewLightningClient(conn) hn.LightningClient = lnrpc.NewLightningClient(conn)
hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn) hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn)
hn.RouterClient = routerrpc.NewRouterClient(conn) hn.RouterClient = routerrpc.NewRouterClient(conn)
@ -1021,8 +1065,7 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error {
// FetchNodeInfo queries an unlocked node to retrieve its public key. // FetchNodeInfo queries an unlocked node to retrieve its public key.
func (hn *HarnessNode) FetchNodeInfo() error { func (hn *HarnessNode) FetchNodeInfo() error {
// Obtain the lnid of this node for quick identification purposes. // Obtain the lnid of this node for quick identification purposes.
ctxb := context.Background() info, err := hn.rpc.LN.GetInfo(hn.runCtx, &lnrpc.GetInfoRequest{})
info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{})
if err != nil { if err != nil {
return err return err
} }
@ -1163,21 +1206,23 @@ func (hn *HarnessNode) stop() error {
return nil return nil
} }
// If start() failed before creating a client, we will just wait for the // If start() failed before creating clients, we will just wait for the
// child process to die. // child process to die.
if hn.LightningClient != nil { if hn.rpc != nil && hn.rpc.LN != nil {
// Don't watch for error because sometimes the RPC connection gets // Don't watch for error because sometimes the RPC connection
// closed before a response is returned. // gets closed before a response is returned.
req := lnrpc.StopRequest{} req := lnrpc.StopRequest{}
err := wait.NoError(func() error { err := wait.NoError(func() error {
_, err := hn.LightningClient.StopDaemon(hn.runCtx, &req) _, err := hn.rpc.LN.StopDaemon(hn.runCtx, &req)
switch { switch {
case err == nil: case err == nil:
return nil return nil
// Try again if a recovery/rescan is in progress. // Try again if a recovery/rescan is in progress.
case strings.Contains(err.Error(), "recovery in progress"): case strings.Contains(
err.Error(), "recovery in progress",
):
return err return err
default: default:
@ -1217,8 +1262,8 @@ func (hn *HarnessNode) stop() error {
hn.WatchtowerClient = nil hn.WatchtowerClient = nil
// Close any attempts at further grpc connections. // Close any attempts at further grpc connections.
if hn.conn != nil { if hn.rpc.conn != nil {
err := status.Code(hn.conn.Close()) err := status.Code(hn.rpc.conn.Close())
switch err { switch err {
case codes.OK: case codes.OK:
return nil return nil
@ -1497,7 +1542,7 @@ func (hn *HarnessNode) WaitForBlockchainSync() error {
defer ticker.Stop() defer ticker.Stop()
for { for {
resp, err := hn.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) resp, err := hn.rpc.LN.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
if err != nil { if err != nil {
return err return err
} }
@ -1525,7 +1570,7 @@ func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount,
var lastBalance btcutil.Amount var lastBalance btcutil.Amount
doesBalanceMatch := func() bool { doesBalanceMatch := func() bool {
balance, err := hn.WalletBalance(hn.runCtx, req) balance, err := hn.rpc.LN.WalletBalance(hn.runCtx, req)
if err != nil { if err != nil {
return false return false
} }
@ -1695,7 +1740,7 @@ func (hn *HarnessNode) newTopologyClient(
ctx context.Context) (topologyClient, error) { ctx context.Context) (topologyClient, error) {
req := &lnrpc.GraphTopologySubscription{} req := &lnrpc.GraphTopologySubscription{}
client, err := hn.SubscribeChannelGraph(ctx, req) client, err := hn.rpc.LN.SubscribeChannelGraph(ctx, req)
if err != nil { if err != nil {
return nil, fmt.Errorf("%s(%d): unable to create topology "+ return nil, fmt.Errorf("%s(%d): unable to create topology "+
"client: %v (%s)", hn.Name(), hn.NodeID, err, "client: %v (%s)", hn.Name(), hn.NodeID, err,
@ -1765,7 +1810,7 @@ func (hn *HarnessNode) receiveTopologyClientStream(
default: default:
// An expected error is returned, return and leave it // An expected error is returned, return and leave it
// to be handled by the caller. // to be handled by the caller.
return fmt.Errorf("graph subscription err: %v", err) return fmt.Errorf("graph subscription err: %w", err)
} }
// Send the update or quit. // Send the update or quit.
@ -1814,7 +1859,7 @@ func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout) ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel() defer cancel()
graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{ graph, err := hn.rpc.LN.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{
IncludeUnannounced: include, IncludeUnannounced: include,
}) })
if err != nil { if err != nil {