lntest: Rename structs with proper visibility so lnd_test runs.

This commit is contained in:
Jim Posen
2017-11-03 11:52:02 -07:00
committed by Olaoluwa Osuntokun
parent 3cb0705b8e
commit 43e501feb9
3 changed files with 250 additions and 177 deletions

View File

@@ -1,5 +1,34 @@
package lntest
import (
"bytes"
"encoding/hex"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
macaroon "gopkg.in/macaroon.v1"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/rpcclient"
"github.com/roasbeef/btcd/wire"
)
var (
// numActiveNodes is the number of active nodes within the test network.
numActiveNodes = 0
@@ -18,8 +47,6 @@ var (
// as such: defaultP2pPort + (2 * harness.nodeNum).
defaultClientPort = 19556
harnessNetParams = &chaincfg.SimNetParams
// logOutput is a flag that can be set to append the output from the
// seed nodes to log files.
logOutput = flag.Bool("logoutput", false,
@@ -119,13 +146,14 @@ func (cfg nodeConfig) genArgs() []string {
return args
}
// lightningNode represents an instance of lnd running within our test network
// harness. Each lightningNode instance also fully embedds an RPC client in
// HarnessNode represents an instance of lnd running within our test network
// harness. Each HarnessNode instance also fully embedds an RPC client in
// order to pragmatically drive the node.
type lightningNode struct {
type HarnessNode struct {
cfg *nodeConfig
nodeID int
// NodeID is a unique identifier for the node within a NetworkHarness.
NodeID int
// PubKey is the serialized compressed identity public key of the node.
// This field will only be populated once the node itself has been
@@ -137,7 +165,7 @@ type lightningNode struct {
pidFile string
// processExit is a channel that's closed once it's detected that the
// process this instance of lightningNode is bound to has exited.
// process this instance of HarnessNode is bound to has exited.
processExit chan struct{}
chanWatchRequests chan *chanWatchRequest
@@ -148,9 +176,11 @@ type lightningNode struct {
lnrpc.LightningClient
}
// newLightningNode creates a new test lightning node instance from the passed
// rpc config and slice of extra arguments.
func newLightningNode(cfg nodeConfig) (*lightningNode, error) {
// Assert *HarnessNode implements the lnrpc.LightningClient interface.
var _ lnrpc.LightningClient = (*HarnessNode)(nil)
// newNode creates a new test lightning node instance from the passed config.
func newNode(cfg nodeConfig) (*HarnessNode, error) {
if cfg.BaseDir == "" {
var err error
cfg.BaseDir, err = ioutil.TempDir("", "lndtest-node")
@@ -170,30 +200,35 @@ func newLightningNode(cfg nodeConfig) (*lightningNode, error) {
nodeNum := numActiveNodes
numActiveNodes++
return &lightningNode{
return &HarnessNode{
cfg: &cfg,
nodeID: nodeNum,
NodeID: nodeNum,
chanWatchRequests: make(chan *chanWatchRequest),
processExit: make(chan struct{}),
quit: make(chan struct{}),
}, nil
}
// DBPath returns the filepath to the channeldb database file for this node.
func (hn *HarnessNode) DBPath() string {
return hn.cfg.DBPath()
}
// Start launches a new process running lnd. Additionally, the PID of the
// launched process is saved in order to possibly kill the process forcibly
// later.
func (l *lightningNode) Start(lndError chan<- error) error {
args := l.cfg.genArgs()
l.cmd = exec.Command("lnd", args...)
func (hn *HarnessNode) start(lndError chan<- error) error {
args := hn.cfg.genArgs()
hn.cmd = exec.Command("lnd", args...)
// Redirect stderr output to buffer
var errb bytes.Buffer
l.cmd.Stderr = &errb
hn.cmd.Stderr = &errb
// If the logoutput flag is passed, redirect output from the nodes to
// log files.
if *logOutput {
logFile := fmt.Sprintf("output%d.log", l.nodeID)
logFile := fmt.Sprintf("output%d.log", hn.NodeID)
// Create file if not exists, otherwise append.
file, err := os.OpenFile(logFile,
@@ -204,69 +239,70 @@ func (l *lightningNode) Start(lndError chan<- error) error {
// Pass node's stderr to both errb and the file.
w := io.MultiWriter(&errb, file)
l.cmd.Stderr = w
hn.cmd.Stderr = w
// Pass the node's stdout only to the file.
l.cmd.Stdout = file
hn.cmd.Stdout = file
}
if err := l.cmd.Start(); err != nil {
if err := hn.cmd.Start(); err != nil {
return err
}
// Launch a new goroutine which that bubbles up any potential fatal
// process errors to the goroutine running the tests.
go func() {
err := l.cmd.Wait()
err := hn.cmd.Wait()
if err != nil {
lndError <- errors.Errorf("%v\n%v\n", err, errb.String())
}
// Signal any onlookers that this process has exited.
close(l.processExit)
close(hn.processExit)
}()
// Write process ID to a file.
if err := l.writePidFile(); err != nil {
l.cmd.Process.Kill()
if err := hn.writePidFile(); err != nil {
hn.cmd.Process.Kill()
return err
}
// Since Stop uses the LightningClient to stop the node, if we fail to get a
// connected client, we have to kill the process.
conn, err := l.connectRPC()
conn, err := hn.connectRPC()
if err != nil {
l.cmd.Process.Kill()
hn.cmd.Process.Kill()
return err
}
l.LightningClient = lnrpc.NewLightningClient(conn)
hn.LightningClient = lnrpc.NewLightningClient(conn)
// Obtain the lnid of this node for quick identification purposes.
ctxb := context.Background()
info, err := l.GetInfo(ctxb, &lnrpc.GetInfoRequest{})
info, err := hn.GetInfo(ctxb, &lnrpc.GetInfoRequest{})
if err != nil {
return err
}
l.PubKeyStr = info.IdentityPubkey
hn.PubKeyStr = info.IdentityPubkey
pubkey, err := hex.DecodeString(info.IdentityPubkey)
if err != nil {
return err
}
copy(l.PubKey[:], pubkey)
copy(hn.PubKey[:], pubkey)
// Launch the watcher that'll hook into graph related topology change
// from the PoV of this node.
l.wg.Add(1)
go l.lightningNetworkWatcher()
hn.wg.Add(1)
go hn.lightningNetworkWatcher()
return nil
}
// writePidFile writes the process ID of the running lnd process to a .pid file.
func (l *lightningNode) writePidFile() error {
filePath := filepath.Join(l.cfg.BaseDir, fmt.Sprintf("%v.pid", l.nodeID))
func (hn *HarnessNode) writePidFile() error {
filePath := filepath.Join(hn.cfg.BaseDir, fmt.Sprintf("%v.pid", hn.NodeID))
pid, err := os.Create(filePath)
if err != nil {
@@ -274,22 +310,22 @@ func (l *lightningNode) writePidFile() error {
}
defer pid.Close()
_, err = fmt.Fprintf(pid, "%v\n", l.cmd.Process.Pid)
_, err = fmt.Fprintf(pid, "%v\n", hn.cmd.Process.Pid)
if err != nil {
return err
}
l.pidFile = filePath
hn.pidFile = filePath
return nil
}
// connectRPC uses the TLS certificate and admin macaroon files written by the
// lnd node to create a gRPC client connection.
func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) {
func (hn *HarnessNode) connectRPC() (*grpc.ClientConn, error) {
// Wait until TLS certificate and admin macaroon are created before
// using them, up to 20 sec.
tlsTimeout := time.After(30 * time.Second)
for !fileExists(l.cfg.TLSCertPath) || !fileExists(l.cfg.AdminMacPath) {
for !fileExists(hn.cfg.TLSCertPath) || !fileExists(hn.cfg.AdminMacPath) {
select {
case <-tlsTimeout:
return nil, fmt.Errorf("timeout waiting for TLS cert file " +
@@ -299,11 +335,11 @@ func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) {
}
}
tlsCreds, err := credentials.NewClientTLSFromFile(l.cfg.TLSCertPath, "")
tlsCreds, err := credentials.NewClientTLSFromFile(hn.cfg.TLSCertPath, "")
if err != nil {
return nil, err
}
macBytes, err := ioutil.ReadFile(l.cfg.AdminMacPath)
macBytes, err := ioutil.ReadFile(hn.cfg.AdminMacPath)
if err != nil {
return nil, err
}
@@ -317,26 +353,26 @@ func (l *lightningNode) connectRPC() (*grpc.ClientConn, error) {
grpc.WithBlock(),
grpc.WithTimeout(time.Second * 20),
}
return grpc.Dial(l.rpcAddr, opts...)
return grpc.Dial(hn.cfg.RPCAddr(), opts...)
}
// cleanup cleans up all the temporary files created by the node's process.
func (l *lightningNode) cleanup() error {
return os.RemoveAll(l.cfg.BaseDir)
func (hn *HarnessNode) cleanup() error {
return os.RemoveAll(hn.cfg.BaseDir)
}
// Stop attempts to stop the active lnd process.
func (l *lightningNode) Stop() error {
func (hn *HarnessNode) stop() error {
// Do nothing if the process never started successfully.
if l.LightningClient == nil {
if hn.LightningClient == nil {
return nil
}
// Do nothing if the process already finished.
select {
case <-l.quit:
case <-hn.quit:
return nil
case <-l.processExit:
case <-hn.processExit:
return nil
default:
}
@@ -345,10 +381,10 @@ func (l *lightningNode) Stop() error {
// closed before a response is returned.
req := lnrpc.StopRequest{}
ctx := context.Background()
l.LightningClient.StopDaemon(ctx, &req)
hn.LightningClient.StopDaemon(ctx, &req)
close(l.quit)
l.wg.Wait()
close(hn.quit)
hn.wg.Wait()
return nil
}
@@ -358,17 +394,17 @@ func (l *lightningNode) Stop() error {
// connection attempt is successful. Additionally, if a callback is passed, the
// closure will be executed after the node has been shutdown, but before the
// process has been started up again.
func (l *lightningNode) Restart(errChan chan error, callback func() error) error {
if err := l.Stop(); err != nil {
func (hn *HarnessNode) restart(errChan chan error, callback func() error) error {
if err := hn.stop(); err != nil {
return err
}
<-l.processExit
<-hn.processExit
l.LightningClient = nil
l.processExit = make(chan struct{})
l.quit = make(chan struct{})
l.wg = sync.WaitGroup{}
hn.LightningClient = nil
hn.processExit = make(chan struct{})
hn.quit = make(chan struct{})
hn.wg = sync.WaitGroup{}
if callback != nil {
if err := callback(); err != nil {
@@ -376,16 +412,16 @@ func (l *lightningNode) Restart(errChan chan error, callback func() error) error
}
}
return l.Start(errChan)
return hn.start(errChan)
}
// Shutdown stops the active lnd process and clean up any temporary directories
// created along the way.
func (l *lightningNode) Shutdown() error {
if err := l.Stop(); err != nil {
func (hn *HarnessNode) Shutdown() error {
if err := hn.stop(); err != nil {
return err
}
if err := l.cleanup(); err != nil {
if err := hn.cleanup(); err != nil {
return err
}
return nil
@@ -407,17 +443,17 @@ type chanWatchRequest struct {
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
func (l *lightningNode) lightningNetworkWatcher() {
defer l.wg.Done()
func (hn *HarnessNode) lightningNetworkWatcher() {
defer hn.wg.Done()
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
l.wg.Add(1)
hn.wg.Add(1)
go func() {
defer l.wg.Done()
defer hn.wg.Done()
ctxb := context.Background()
req := &lnrpc.GraphTopologySubscription{}
topologyClient, err := l.SubscribeChannelGraph(ctxb, req)
topologyClient, err := hn.SubscribeChannelGraph(ctxb, req)
if err != nil {
// We panic here in case of an error as failure to
// create the topology client will cause all subsequent
@@ -436,7 +472,7 @@ func (l *lightningNode) lightningNetworkWatcher() {
select {
case graphUpdates <- update:
case <-l.quit:
case <-hn.quit:
return
}
}
@@ -506,7 +542,7 @@ func (l *lightningNode) lightningNetworkWatcher() {
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-l.chanWatchRequests:
case watchRequest := <-hn.chanWatchRequests:
targetChan := watchRequest.chanPoint
// TODO(roasbeef): add update type also, checks for
@@ -540,7 +576,7 @@ func (l *lightningNode) lightningNetworkWatcher() {
closeClients[targetChan] = append(closeClients[targetChan],
watchRequest.eventChan)
case <-l.quit:
case <-hn.quit:
return
}
}
@@ -550,7 +586,7 @@ func (l *lightningNode) lightningNetworkWatcher() {
// outpoint is seen as being fully advertised within the network. A channel is
// considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network.
func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context,
func (hn *HarnessNode) WaitForNetworkChannelOpen(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
@@ -560,7 +596,7 @@ func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context,
return err
}
l.chanWatchRequests <- &chanWatchRequest{
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
@@ -581,7 +617,7 @@ func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context,
// outpoint is seen as closed within the network. A channel is considered
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context,
func (hn *HarnessNode) WaitForNetworkChannelClose(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
@@ -591,7 +627,7 @@ func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context,
return err
}
l.chanWatchRequests <- &chanWatchRequest{
hn.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
@@ -613,7 +649,7 @@ func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context,
// timeout, then the goroutine will continually poll until the timeout has
// elapsed. In the case that the chain isn't synced before the timeout is up,
// then this function will return an error.
func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error {
func (hn *HarnessNode) WaitForBlockchainSync(ctx context.Context) error {
errChan := make(chan error, 1)
retryDelay := time.Millisecond * 100
@@ -621,13 +657,13 @@ func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error {
for {
select {
case <-ctx.Done():
case <-l.quit:
case <-hn.quit:
return
default:
}
getInfoReq := &lnrpc.GetInfoRequest{}
getInfoResp, err := l.GetInfo(ctx, getInfoReq)
getInfoResp, err := hn.GetInfo(ctx, getInfoReq)
if err != nil {
errChan <- err
return
@@ -646,7 +682,7 @@ func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error {
}()
select {
case <-l.quit:
case <-hn.quit:
return nil
case err := <-errChan:
return err
@@ -654,3 +690,14 @@ func (l *lightningNode) WaitForBlockchainSync(ctx context.Context) error {
return fmt.Errorf("Timeout while waiting for blockchain sync")
}
}
// fileExists reports whether the named file or directory exists.
// This function is taken from https://github.com/btcsuite/btcd
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}