itest: add run context to harness node

This commit adds a running context to HarnessNode which replaces all the
background context used and also serves as a way to signal quit when the
test is shutting down.
This commit is contained in:
yyforyongyu
2021-09-18 12:10:01 +08:00
parent 98d7e64dc1
commit f81bcdf4db

View File

@@ -6,6 +6,7 @@ import (
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@@ -23,7 +24,6 @@ import (
"github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/go-errors/errors"
"github.com/jackc/pgx/v4/pgxpool" "github.com/jackc/pgx/v4/pgxpool"
"github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
@@ -36,7 +36,9 @@ import (
"github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/macaroon.v2" "gopkg.in/macaroon.v2"
) )
@@ -316,8 +318,13 @@ type HarnessNode struct {
// node and the outpoint. // node and the outpoint.
policyUpdates policyUpdateMap policyUpdates policyUpdateMap
quit chan struct{} // runCtx is a context with cancel method. It's used to signal when the
wg sync.WaitGroup // node needs to quit, and used as the parent context when spawning
// children contexts for RPC requests.
runCtx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
lnrpc.LightningClient lnrpc.LightningClient
@@ -634,7 +641,10 @@ func renameFile(fromFileName, toFileName string) {
func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
wait bool) error { wait bool) error {
hn.quit = make(chan struct{}) // Init the runCtx.
ctxt, cancel := context.WithCancel(context.Background())
hn.runCtx = ctxt
hn.cancel = cancel
args := hn.Cfg.genArgs() args := hn.Cfg.genArgs()
hn.cmd = exec.Command(lndBinary, args...) hn.cmd = exec.Command(lndBinary, args...)
@@ -739,7 +749,7 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
err := hn.cmd.Wait() err := hn.cmd.Wait()
if err != nil { if err != nil {
lndError <- errors.Errorf("%v\n%v\n", err, errb.String()) lndError <- fmt.Errorf("%v\n%v", err, errb.String())
} }
// Signal any onlookers that this process has exited. // Signal any onlookers that this process has exited.
@@ -816,7 +826,7 @@ func (hn *HarnessNode) waitForState(conn grpc.ClientConnInterface,
predicate func(state lnrpc.WalletState) bool) error { predicate func(state lnrpc.WalletState) bool) error {
stateClient := lnrpc.NewStateClient(conn) stateClient := lnrpc.NewStateClient(conn)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(hn.runCtx)
defer cancel() defer cancel()
stateStream, err := stateClient.SubscribeState( stateStream, err := stateClient.SubscribeState(
@@ -1016,8 +1026,7 @@ func (hn *HarnessNode) Unlock(ctx context.Context,
// waitTillServerStarted makes a subscription to the server's state change and // waitTillServerStarted makes a subscription to the server's state change and
// blocks until the server is in state ServerActive. // blocks until the server is in state ServerActive.
func (hn *HarnessNode) waitTillServerStarted() error { func (hn *HarnessNode) waitTillServerStarted() error {
ctxb := context.Background() ctxt, cancel := context.WithTimeout(hn.runCtx, NodeStartTimeout)
ctxt, cancel := context.WithTimeout(ctxb, NodeStartTimeout)
defer cancel() defer cancel()
client, err := hn.StateClient.SubscribeState( client, err := hn.StateClient.SubscribeState(
@@ -1183,7 +1192,7 @@ func (hn *HarnessNode) ConnectRPCWithMacaroon(mac *macaroon.Macaroon) (
grpc.WithTransportCredentials(tlsCreds), grpc.WithTransportCredentials(tlsCreds),
} }
ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) ctx, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel() defer cancel()
if mac == nil { if mac == nil {
@@ -1247,10 +1256,9 @@ func (hn *HarnessNode) stop() error {
// Don't watch for error because sometimes the RPC connection gets // Don't watch for error because sometimes the RPC connection gets
// closed before a response is returned. // closed before a response is returned.
req := lnrpc.StopRequest{} req := lnrpc.StopRequest{}
ctx := context.Background()
err := wait.NoError(func() error { err := wait.NoError(func() error {
_, err := hn.LightningClient.StopDaemon(ctx, &req) _, err := hn.LightningClient.StopDaemon(hn.runCtx, &req)
switch { switch {
case err == nil: case err == nil:
return nil return nil
@@ -1275,10 +1283,10 @@ func (hn *HarnessNode) stop() error {
return fmt.Errorf("process did not exit") return fmt.Errorf("process did not exit")
} }
close(hn.quit) // Stop the runCtx and wait for goroutines to finish.
hn.cancel()
hn.wg.Wait() hn.wg.Wait()
hn.quit = nil
hn.processExit = nil hn.processExit = nil
hn.LightningClient = nil hn.LightningClient = nil
hn.WalletUnlockerClient = nil hn.WalletUnlockerClient = nil
@@ -1287,13 +1295,25 @@ func (hn *HarnessNode) stop() error {
// Close any attempts at further grpc connections. // Close any attempts at further grpc connections.
if hn.conn != nil { if hn.conn != nil {
err := hn.conn.Close() err := status.Code(hn.conn.Close())
if err != nil && switch err {
!strings.Contains(err.Error(), "connection is closing") { case codes.OK:
return nil
return fmt.Errorf("error attempting to stop grpc "+ // When the context is canceled above, we might get the
"client: %v", err) // following error as the context is no longer active.
case codes.Canceled:
return nil
case codes.Unknown:
return fmt.Errorf("unknown error attempting to stop "+
"grpc client: %v", err)
default:
return fmt.Errorf("error attempting to stop "+
"grpc client: %v", err)
} }
} }
return nil return nil
@@ -1440,7 +1460,7 @@ func (hn *HarnessNode) lightningNetworkWatcher() {
hn.handlePolicyUpdateWatchRequest(watchRequest) hn.handlePolicyUpdateWatchRequest(watchRequest)
} }
case <-hn.quit: case <-hn.runCtx.Done():
return return
} }
} }
@@ -1577,7 +1597,7 @@ func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("timeout while waiting for " + return fmt.Errorf("timeout while waiting for " +
"blockchain sync") "blockchain sync")
case <-hn.quit: case <-hn.runCtx.Done():
return nil return nil
case <-ticker.C: case <-ticker.C:
} }
@@ -1586,13 +1606,14 @@ func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error {
// WaitForBalance waits until the node sees the expected confirmed/unconfirmed // WaitForBalance waits until the node sees the expected confirmed/unconfirmed
// balance within their wallet. // balance within their wallet.
func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount, confirmed bool) error { func (hn *HarnessNode) WaitForBalance(expectedBalance btcutil.Amount,
ctx := context.Background() confirmed bool) error {
req := &lnrpc.WalletBalanceRequest{} req := &lnrpc.WalletBalanceRequest{}
var lastBalance btcutil.Amount var lastBalance btcutil.Amount
doesBalanceMatch := func() bool { doesBalanceMatch := func() bool {
balance, err := hn.WalletBalance(ctx, req) balance, err := hn.WalletBalance(hn.runCtx, req)
if err != nil { if err != nil {
return false return false
} }
@@ -1705,7 +1726,7 @@ func (hn *HarnessNode) handleOpenChannelWatchRequest(req *chanWatchRequest) {
// node. This lets us handle the case where a node has already seen a // node. This lets us handle the case where a node has already seen a
// channel before a notification has been requested, causing us to miss // channel before a notification has been requested, causing us to miss
// it. // it.
chanFound := checkChanPointInGraph(context.Background(), hn, targetChan) chanFound := checkChanPointInGraph(hn.runCtx, hn, targetChan)
if chanFound { if chanFound {
close(req.eventChan) close(req.eventChan)
return return
@@ -1795,16 +1816,14 @@ func (hn *HarnessNode) newTopologyClient(
func (hn *HarnessNode) receiveTopologyClientStream( func (hn *HarnessNode) receiveTopologyClientStream(
receiver chan *lnrpc.GraphTopologyUpdate) error { receiver chan *lnrpc.GraphTopologyUpdate) error {
ctxb := context.Background()
// Create a topology client to receive graph updates. // Create a topology client to receive graph updates.
client, err := hn.newTopologyClient(ctxb) client, err := hn.newTopologyClient(hn.runCtx)
if err != nil { if err != nil {
return fmt.Errorf("create topologyClient failed: %v", err) return fmt.Errorf("create topologyClient failed: %v", err)
} }
// We use the context to time out when retrying graph subscription. // We use the context to time out when retrying graph subscription.
ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout) ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
defer cancel() defer cancel()
for { for {
@@ -1823,12 +1842,12 @@ func (hn *HarnessNode) receiveTopologyClientStream(
return fmt.Errorf("graph subscription: " + return fmt.Errorf("graph subscription: " +
"router not started before timeout") "router not started before timeout")
case <-time.After(wait.PollInterval): case <-time.After(wait.PollInterval):
case <-hn.quit: case <-hn.runCtx.Done():
return nil return nil
} }
// Re-create the topology client. // Re-create the topology client.
client, err = hn.newTopologyClient(ctxb) client, err = hn.newTopologyClient(hn.runCtx)
if err != nil { if err != nil {
return fmt.Errorf("create topologyClient "+ return fmt.Errorf("create topologyClient "+
"failed: %v", err) "failed: %v", err)
@@ -1840,6 +1859,10 @@ func (hn *HarnessNode) receiveTopologyClientStream(
// End of subscription stream. Do nothing and quit. // End of subscription stream. Do nothing and quit.
return nil return nil
case strings.Contains(err.Error(), context.Canceled.Error()):
// End of subscription stream. Do nothing and quit.
return nil
default: default:
// An expected error is returned, return and leave it // An expected error is returned, return and leave it
// to be handled by the caller. // to be handled by the caller.
@@ -1849,7 +1872,7 @@ func (hn *HarnessNode) receiveTopologyClientStream(
// Send the update or quit. // Send the update or quit.
select { select {
case receiver <- update: case receiver <- update:
case <-hn.quit: case <-hn.runCtx.Done():
return nil return nil
} }
} }
@@ -1920,9 +1943,7 @@ func (hn *HarnessNode) handlePolicyUpdateWatchRequest(req *chanWatchRequest) {
// the format defined in type policyUpdateMap. // the format defined in type policyUpdateMap.
func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap { func (hn *HarnessNode) getChannelPolicies(include bool) policyUpdateMap {
ctxt, cancel := context.WithTimeout( ctxt, cancel := context.WithTimeout(hn.runCtx, DefaultTimeout)
context.Background(), DefaultTimeout,
)
defer cancel() defer cancel()
graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{ graph, err := hn.DescribeGraph(ctxt, &lnrpc.ChannelGraphRequest{