itest+lntest: migrate lntemp to lntest

This commit performs the takeover that `lntemp` is now promoted to be
`lntest`, and the scaffolding is now removed as all the refactoring is
finished!
This commit is contained in:
yyforyongyu
2022-08-12 17:03:44 +08:00
parent ee0790493c
commit 9d1d629001
76 changed files with 590 additions and 584 deletions

398
lntest/node/config.go Normal file
View File

@@ -0,0 +1,398 @@
package node
import (
"flag"
"fmt"
"io"
"net"
"os"
"path"
"path/filepath"
"sync/atomic"
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/kvdb/etcd"
"github.com/lightningnetwork/lnd/lntest/wait"
)
const (
// ListenerFormat is the format string that is used to generate local
// listener addresses.
ListenerFormat = "127.0.0.1:%d"
// DefaultCSV is the CSV delay (remotedelay) we will start our test
// nodes with.
DefaultCSV = 4
// defaultNodePort is the start of the range for listening ports of
// harness nodes. Ports are monotonically increasing starting from this
// number and are determined by the results of NextAvailablePort().
defaultNodePort = 5555
)
var (
// lastPort is the last port determined to be free for use by a new
// node. It should be used atomically.
lastPort uint32 = defaultNodePort
// logOutput is a flag that can be set to append the output from the
// seed nodes to log files.
logOutput = flag.Bool("logoutput", false,
"log output from node n to file output-n.log")
// logSubDir is the default directory where the logs are written to if
// logOutput is true.
logSubDir = flag.String("logdir", ".", "default dir to write logs to")
// btcdExecutable is the full path to the btcd binary.
btcdExecutable = flag.String(
"btcdexec", "", "full path to btcd binary",
)
)
type DatabaseBackend int
const (
BackendBbolt DatabaseBackend = iota
BackendEtcd
BackendPostgres
BackendSqlite
)
// Option is a function for updating a node's configuration.
type Option func(*BaseNodeConfig)
// BackendConfig is an interface that abstracts away the specific chain backend
// node implementation.
type BackendConfig interface {
// GenArgs returns the arguments needed to be passed to LND at startup
// for using this node as a chain backend.
GenArgs() []string
// ConnectMiner is called to establish a connection to the test miner.
ConnectMiner() error
// DisconnectMiner is called to disconnect the miner.
DisconnectMiner() error
// Name returns the name of the backend type.
Name() string
// Credentials returns the rpc username, password and host for the
// backend.
Credentials() (string, string, string, error)
}
// BaseNodeConfig is the base node configuration.
type BaseNodeConfig struct {
Name string
// LogFilenamePrefix is used to prefix node log files. Can be used to
// store the current test case for simpler postmortem debugging.
LogFilenamePrefix string
NetParams *chaincfg.Params
BackendCfg BackendConfig
BaseDir string
ExtraArgs []string
OriginalExtraArgs []string
DataDir string
LogDir string
TLSCertPath string
TLSKeyPath string
AdminMacPath string
ReadMacPath string
InvoiceMacPath string
SkipUnlock bool
Password []byte
P2PPort int
RPCPort int
RESTPort int
ProfilePort int
FeeURL string
DbBackend DatabaseBackend
PostgresDsn string
// NodeID is a unique ID used to identify the node.
NodeID uint32
// LndBinary is the full path to the lnd binary that was specifically
// compiled with all required itest flags.
LndBinary string
// backupDbDir is the path where a database backup is stored, if any.
backupDbDir string
// postgresDbName is the name of the postgres database where lnd data
// is stored in.
postgresDbName string
}
func (cfg BaseNodeConfig) P2PAddr() string {
return fmt.Sprintf(ListenerFormat, cfg.P2PPort)
}
func (cfg BaseNodeConfig) RPCAddr() string {
return fmt.Sprintf(ListenerFormat, cfg.RPCPort)
}
func (cfg BaseNodeConfig) RESTAddr() string {
return fmt.Sprintf(ListenerFormat, cfg.RESTPort)
}
// DBDir returns the holding directory path of the graph database.
func (cfg BaseNodeConfig) DBDir() string {
return filepath.Join(cfg.DataDir, "graph", cfg.NetParams.Name)
}
func (cfg BaseNodeConfig) DBPath() string {
return filepath.Join(cfg.DBDir(), "channel.db")
}
func (cfg BaseNodeConfig) ChanBackupPath() string {
return filepath.Join(
cfg.DataDir, "chain", "bitcoin",
fmt.Sprintf(
"%v/%v", cfg.NetParams.Name,
chanbackup.DefaultBackupFileName,
),
)
}
// GenerateListeningPorts generates the ports to listen on designated for the
// current lightning network test.
func (cfg *BaseNodeConfig) GenerateListeningPorts() {
if cfg.P2PPort == 0 {
cfg.P2PPort = NextAvailablePort()
}
if cfg.RPCPort == 0 {
cfg.RPCPort = NextAvailablePort()
}
if cfg.RESTPort == 0 {
cfg.RESTPort = NextAvailablePort()
}
if cfg.ProfilePort == 0 {
cfg.ProfilePort = NextAvailablePort()
}
}
// BaseConfig returns the base node configuration struct.
func (cfg *BaseNodeConfig) BaseConfig() *BaseNodeConfig {
return cfg
}
// GenArgs generates a slice of command line arguments from the lightning node
// config struct.
func (cfg *BaseNodeConfig) GenArgs() []string {
var args []string
switch cfg.NetParams {
case &chaincfg.TestNet3Params:
args = append(args, "--bitcoin.testnet")
case &chaincfg.SimNetParams:
args = append(args, "--bitcoin.simnet")
case &chaincfg.RegressionNetParams:
args = append(args, "--bitcoin.regtest")
}
backendArgs := cfg.BackendCfg.GenArgs()
args = append(args, backendArgs...)
nodeArgs := []string{
"--bitcoin.active",
"--nobootstrap",
"--debuglevel=debug",
"--bitcoin.defaultchanconfs=1",
"--accept-keysend",
"--keep-failed-payment-attempts",
fmt.Sprintf("--db.batch-commit-interval=%v", commitInterval),
fmt.Sprintf("--bitcoin.defaultremotedelay=%v", DefaultCSV),
fmt.Sprintf("--rpclisten=%v", cfg.RPCAddr()),
fmt.Sprintf("--restlisten=%v", cfg.RESTAddr()),
fmt.Sprintf("--restcors=https://%v", cfg.RESTAddr()),
fmt.Sprintf("--listen=%v", cfg.P2PAddr()),
fmt.Sprintf("--externalip=%v", cfg.P2PAddr()),
fmt.Sprintf("--lnddir=%v", cfg.BaseDir),
fmt.Sprintf("--adminmacaroonpath=%v", cfg.AdminMacPath),
fmt.Sprintf("--readonlymacaroonpath=%v", cfg.ReadMacPath),
fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath),
fmt.Sprintf("--trickledelay=%v", trickleDelay),
fmt.Sprintf("--profile=%d", cfg.ProfilePort),
fmt.Sprintf("--caches.rpc-graph-cache-duration=%d", 0),
// Use a small batch window so we can broadcast our sweep
// transactions faster.
"--sweeper.batchwindowduration=5s",
// Use a small batch delay so we can broadcast the
// announcements quickly in the tests.
"--gossip.sub-batch-delay=5ms",
}
args = append(args, nodeArgs...)
if cfg.Password == nil {
args = append(args, "--noseedbackup")
}
switch cfg.DbBackend {
case BackendEtcd:
args = append(args, "--db.backend=etcd")
args = append(args, "--db.etcd.embedded")
args = append(
args, fmt.Sprintf(
"--db.etcd.embedded_client_port=%v",
NextAvailablePort(),
),
)
args = append(
args, fmt.Sprintf(
"--db.etcd.embedded_peer_port=%v",
NextAvailablePort(),
),
)
args = append(
args, fmt.Sprintf(
"--db.etcd.embedded_log_file=%v",
path.Join(cfg.LogDir, "etcd.log"),
),
)
case BackendPostgres:
args = append(args, "--db.backend=postgres")
args = append(args, "--db.postgres.dsn="+cfg.PostgresDsn)
case BackendSqlite:
args = append(args, "--db.backend=sqlite")
args = append(args, fmt.Sprintf("--db.sqlite.busytimeout=%v",
wait.SqliteBusyTimeout))
}
if cfg.FeeURL != "" {
args = append(args, "--feeurl="+cfg.FeeURL)
}
// Put extra args in the end so the args can be overwritten.
if cfg.ExtraArgs != nil {
args = append(args, cfg.ExtraArgs...)
}
return args
}
// ExtraArgsEtcd returns extra args for configuring LND to use an external etcd
// database (for remote channel DB and wallet DB).
func ExtraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool,
leaderSessionTTL int) []string {
extraArgs := []string{
"--db.backend=etcd",
fmt.Sprintf("--db.etcd.host=%v", etcdCfg.Host),
fmt.Sprintf("--db.etcd.user=%v", etcdCfg.User),
fmt.Sprintf("--db.etcd.pass=%v", etcdCfg.Pass),
fmt.Sprintf("--db.etcd.namespace=%v", etcdCfg.Namespace),
}
if etcdCfg.InsecureSkipVerify {
extraArgs = append(extraArgs, "--db.etcd.insecure_skip_verify")
}
if cluster {
clusterArgs := []string{
"--cluster.enable-leader-election",
fmt.Sprintf("--cluster.id=%v", name),
fmt.Sprintf("--cluster.leader-session-ttl=%v",
leaderSessionTTL),
}
extraArgs = append(extraArgs, clusterArgs...)
}
return extraArgs
}
// NextAvailablePort returns the first port that is available for listening by
// a new node. It panics if no port is found and the maximum available TCP port
// is reached.
func NextAvailablePort() int {
port := atomic.AddUint32(&lastPort, 1)
for port < 65535 {
// If there are no errors while attempting to listen on this
// port, close the socket and return it as available. While it
// could be the case that some other process picks up this port
// between the time the socket is closed and it's reopened in
// the harness node, in practice in CI servers this seems much
// less likely than simply some other process already being
// bound at the start of the tests.
addr := fmt.Sprintf(ListenerFormat, port)
l, err := net.Listen("tcp4", addr)
if err == nil {
err := l.Close()
if err == nil {
return int(port)
}
}
port = atomic.AddUint32(&lastPort, 1)
}
// No ports available? Must be a mistake.
panic("no ports available for listening")
}
// GetLogDir returns the passed --logdir flag or the default value if it wasn't
// set.
func GetLogDir() string {
if logSubDir != nil && *logSubDir != "" {
return *logSubDir
}
return "."
}
// CopyFile copies the file src to dest.
func CopyFile(dest, src string) error {
s, err := os.Open(src)
if err != nil {
return err
}
defer s.Close()
d, err := os.Create(dest)
if err != nil {
return err
}
if _, err := io.Copy(d, s); err != nil {
d.Close()
return err
}
return d.Close()
}
// GetBtcdBinary returns the full path to the binary of the custom built btcd
// executable or an empty string if none is set.
func GetBtcdBinary() string {
if btcdExecutable != nil {
return *btcdExecutable
}
return ""
}
// GenerateBtcdListenerAddresses is a function that returns two listener
// addresses with unique ports and should be used to overwrite rpctest's
// default generator which is prone to use colliding ports.
func GenerateBtcdListenerAddresses() (string, string) {
return fmt.Sprintf(ListenerFormat, NextAvailablePort()),
fmt.Sprintf(ListenerFormat, NextAvailablePort())
}
// ApplyPortOffset adds the given offset to the lastPort variable, making it
// possible to run the tests in parallel without colliding on the same ports.
func ApplyPortOffset(offset uint32) {
_ = atomic.AddUint32(&lastPort, offset)
}

1034
lntest/node/harness_node.go Normal file

File diff suppressed because it is too large Load Diff

365
lntest/node/state.go Normal file
View File

@@ -0,0 +1,365 @@
package node
import (
"encoding/json"
"fmt"
"math"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntest/rpc"
"github.com/lightningnetwork/lnd/lnutils"
)
type (
// PolicyUpdate defines a type to store channel policy updates for a
// given advertisingNode. It has the format,
// {"advertisingNode": [policy1, policy2, ...]}.
PolicyUpdate map[string][]*PolicyUpdateInfo
// policyUpdateMap defines a type to store channel policy updates. It
// has the format,
// {
// "chanPoint1": {
// "advertisingNode1": [
// policy1, policy2, ...
// ],
// "advertisingNode2": [
// policy1, policy2, ...
// ]
// },
// "chanPoint2": ...
// }.
policyUpdateMap map[string]map[string][]*lnrpc.RoutingPolicy
)
// PolicyUpdateInfo stores the RoutingPolicy plus the connecting node info.
type PolicyUpdateInfo struct {
*lnrpc.RoutingPolicy
// ConnectingNode specifies the node that is connected with the
// advertising node.
ConnectingNode string `json:"connecting_node"`
// Timestamp records the time the policy update is made.
Timestamp time.Time `json:"timestamp"`
}
// OpenChannelUpdate stores the open channel updates.
type OpenChannelUpdate struct {
// AdvertisingNode specifies the node that advertised this update.
AdvertisingNode string `json:"advertising_node"`
// ConnectingNode specifies the node that is connected with the
// advertising node.
ConnectingNode string `json:"connecting_node"`
// Timestamp records the time the policy update is made.
Timestamp time.Time `json:"timestamp"`
}
// openChannelCount stores the total number of channel related counts.
type openChannelCount struct {
Active int
Inactive int
Pending int
Public int
Private int
NumUpdates uint64
}
// closedChannelCount stores the total number of closed, waiting and pending
// force close channels.
type closedChannelCount struct {
PendingForceClose int
WaitingClose int
Closed int
}
// utxoCount counts the total confirmed and unconfirmed UTXOs.
type utxoCount struct {
Confirmed int
Unconfirmed int
}
// edgeCount counts the total and public edges.
type edgeCount struct {
Total int
Public int
}
// paymentCount counts the complete(settled/failed) and incomplete payments.
type paymentCount struct {
Total int
Completed int
LastIndexOffset uint64
}
// invoiceCount counts the complete(settled/failed) and incomplete invoices.
type invoiceCount struct {
Total int
Completed int
LastIndexOffset uint64
}
// walletBalance provides a summary over balances related the node's wallet.
type walletBalance struct {
TotalBalance int64
ConfirmedBalance int64
UnconfirmedBalance int64
AccountBalance map[string]*lnrpc.WalletAccountBalance
}
// State records the current state for a given node. It provides a simple count
// over the node so that the test can track its state. For a channel-specific
// state check, use dedicated function to query the channel as each channel is
// meant to be unique.
type State struct {
// rpc is the RPC clients used for the current node.
rpc *rpc.HarnessRPC
// OpenChannel gives the summary of open channel related counts.
OpenChannel openChannelCount
// CloseChannel gives the summary of close channel related counts.
CloseChannel closedChannelCount
// Wallet gives the summary of the wallet balance.
Wallet walletBalance
// HTLC counts the total active HTLCs.
HTLC int
// Edge counts the total private/public edges.
Edge edgeCount
// ChannelUpdate counts the total channel updates seen from the graph
// subscription.
ChannelUpdate int
// NodeUpdate counts the total node announcements seen from the graph
// subscription.
NodeUpdate int
// UTXO counts the total active UTXOs.
UTXO utxoCount
// Payment counts the total payment of the node.
Payment paymentCount
// Invoice counts the total invoices made by the node.
Invoice invoiceCount
// openChans records each opened channel and how many times it has
// heard the announcements from its graph subscription.
openChans *lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]
// closedChans records each closed channel and its close channel update
// message received from its graph subscription.
closedChans *lnutils.SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]
// numChanUpdates records the number of channel updates seen by each
// channel.
numChanUpdates *lnutils.SyncMap[wire.OutPoint, int]
// nodeUpdates records the node announcements seen by each node.
nodeUpdates *lnutils.SyncMap[string, []*lnrpc.NodeUpdate]
// policyUpdates defines a type to store channel policy updates. It has
// the format,
// {
// "chanPoint1": {
// "advertisingNode1": [
// policy1, policy2, ...
// ],
// "advertisingNode2": [
// policy1, policy2, ...
// ]
// },
// "chanPoint2": ...
// }
policyUpdates *lnutils.SyncMap[wire.OutPoint, PolicyUpdate]
}
// newState initialize a new state with every field being set to its zero
// value.
func newState(rpc *rpc.HarnessRPC) *State {
return &State{
rpc: rpc,
openChans: &lnutils.SyncMap[
wire.OutPoint, []*OpenChannelUpdate,
]{},
closedChans: &lnutils.SyncMap[
wire.OutPoint, *lnrpc.ClosedChannelUpdate,
]{},
numChanUpdates: &lnutils.SyncMap[wire.OutPoint, int]{},
nodeUpdates: &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{},
policyUpdates: &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{},
}
}
// updateChannelStats gives the stats on open channel related fields.
func (s *State) updateChannelStats() {
req := &lnrpc.ListChannelsRequest{}
resp := s.rpc.ListChannels(req)
for _, channel := range resp.Channels {
if channel.Active {
s.OpenChannel.Active++
} else {
s.OpenChannel.Inactive++
}
if channel.Private {
s.OpenChannel.Private++
} else {
s.OpenChannel.Public++
}
s.OpenChannel.NumUpdates += channel.NumUpdates
s.HTLC += len(channel.PendingHtlcs)
}
}
// updateCloseChannelStats gives the stats on close channel related fields.
func (s *State) updateCloseChannelStats() {
resp := s.rpc.PendingChannels()
s.CloseChannel.PendingForceClose += len(
resp.PendingForceClosingChannels,
)
s.CloseChannel.WaitingClose += len(resp.WaitingCloseChannels)
closeReq := &lnrpc.ClosedChannelsRequest{}
closed := s.rpc.ClosedChannels(closeReq)
s.CloseChannel.Closed += len(closed.Channels)
s.OpenChannel.Pending += len(resp.PendingOpenChannels)
}
// updatePaymentStats counts the total payments made.
func (s *State) updatePaymentStats() {
req := &lnrpc.ListPaymentsRequest{
IndexOffset: s.Payment.LastIndexOffset,
}
resp := s.rpc.ListPayments(req)
// Exit early when the there's no payment.
//
// NOTE: we need to exit early here because when there's no invoice the
// `LastOffsetIndex` will be zero.
if len(resp.Payments) == 0 {
return
}
s.Payment.LastIndexOffset = resp.LastIndexOffset
for _, payment := range resp.Payments {
if payment.Status == lnrpc.Payment_FAILED ||
payment.Status == lnrpc.Payment_SUCCEEDED {
s.Payment.Completed++
}
}
s.Payment.Total += len(resp.Payments)
}
// updateInvoiceStats counts the total invoices made.
func (s *State) updateInvoiceStats() {
req := &lnrpc.ListInvoiceRequest{
NumMaxInvoices: math.MaxUint64,
IndexOffset: s.Invoice.LastIndexOffset,
}
resp := s.rpc.ListInvoices(req)
// Exit early when the there's no invoice.
//
// NOTE: we need to exit early here because when there's no invoice the
// `LastOffsetIndex` will be zero.
if len(resp.Invoices) == 0 {
return
}
s.Invoice.LastIndexOffset = resp.LastIndexOffset
for _, invoice := range resp.Invoices {
if invoice.State == lnrpc.Invoice_SETTLED ||
invoice.State == lnrpc.Invoice_CANCELED {
s.Invoice.Completed++
}
}
s.Invoice.Total += len(resp.Invoices)
}
// updateUTXOStats counts the total UTXOs made.
func (s *State) updateUTXOStats() {
req := &walletrpc.ListUnspentRequest{}
resp := s.rpc.ListUnspent(req)
for _, utxo := range resp.Utxos {
if utxo.Confirmations > 0 {
s.UTXO.Confirmed++
} else {
s.UTXO.Unconfirmed++
}
}
}
// updateEdgeStats counts the total edges.
func (s *State) updateEdgeStats() {
req := &lnrpc.ChannelGraphRequest{IncludeUnannounced: true}
resp := s.rpc.DescribeGraph(req)
s.Edge.Total = len(resp.Edges)
req = &lnrpc.ChannelGraphRequest{IncludeUnannounced: false}
resp = s.rpc.DescribeGraph(req)
s.Edge.Public = len(resp.Edges)
}
// updateWalletBalance creates stats for the node's wallet balance.
func (s *State) updateWalletBalance() {
resp := s.rpc.WalletBalance()
s.Wallet.TotalBalance = resp.TotalBalance
s.Wallet.ConfirmedBalance = resp.ConfirmedBalance
s.Wallet.UnconfirmedBalance = resp.UnconfirmedBalance
s.Wallet.AccountBalance = resp.AccountBalance
}
// updateState updates the internal state of the node.
func (s *State) updateState() {
s.updateChannelStats()
s.updateCloseChannelStats()
s.updatePaymentStats()
s.updateInvoiceStats()
s.updateUTXOStats()
s.updateEdgeStats()
s.updateWalletBalance()
}
// String encodes the node's state for debugging.
func (s *State) String() string {
stateBytes, err := json.MarshalIndent(s, "", "\t")
if err != nil {
return fmt.Sprintf("\n encode node state with err: %v", err)
}
return fmt.Sprintf("\n%s", stateBytes)
}
// resetEphermalStates resets the current state with a new HarnessRPC and empty
// private fields which are used to track state only valid for the last test.
func (s *State) resetEphermalStates(rpc *rpc.HarnessRPC) {
s.rpc = rpc
// Reset ephermal states which are used to record info from finished
// tests.
s.openChans = &lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}
s.closedChans = &lnutils.SyncMap[
wire.OutPoint, *lnrpc.ClosedChannelUpdate,
]{}
s.numChanUpdates = &lnutils.SyncMap[wire.OutPoint, int]{}
s.nodeUpdates = &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{}
s.policyUpdates = &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{}
}

689
lntest/node/watcher.go Normal file
View File

@@ -0,0 +1,689 @@
package node
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest/rpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnutils"
)
type chanWatchType uint8
const (
// watchOpenChannel specifies that this is a request to watch an open
// channel event.
watchOpenChannel chanWatchType = iota
// watchCloseChannel specifies that this is a request to watch a close
// channel event.
watchCloseChannel
// watchPolicyUpdate specifies that this is a request to watch a policy
// update event.
watchPolicyUpdate
)
// chanWatchRequest is a request to the lightningNetworkWatcher to be notified
// once it's detected within the test Lightning Network, that a channel has
// either been added or closed.
type chanWatchRequest struct {
chanPoint wire.OutPoint
chanWatchType chanWatchType
eventChan chan struct{}
advertisingNode string
policy *lnrpc.RoutingPolicy
includeUnannounced bool
}
// nodeWatcher is a topology watcher for a HarnessNode. It keeps track of all
// the topology updates seen in a given node, including NodeUpdate,
// ChannelEdgeUpdate, and ClosedChannelUpdate.
type nodeWatcher struct {
// rpc is the RPC clients used for the current node.
rpc *rpc.HarnessRPC
// state is the node's current state.
state *State
// chanWatchRequests receives a request for watching a particular event
// for a given channel.
chanWatchRequests chan *chanWatchRequest
// For each outpoint, we'll track an integer which denotes the number
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}]
closeChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}]
wg sync.WaitGroup
}
func newNodeWatcher(rpc *rpc.HarnessRPC, state *State) *nodeWatcher {
return &nodeWatcher{
rpc: rpc,
state: state,
chanWatchRequests: make(chan *chanWatchRequest, 100),
openChanWatchers: &lnutils.SyncMap[
wire.OutPoint, []chan struct{},
]{},
closeChanWatchers: &lnutils.SyncMap[
wire.OutPoint, []chan struct{},
]{},
}
}
// GetNumChannelUpdates reads the num of channel updates inside a lock and
// returns the value.
func (nw *nodeWatcher) GetNumChannelUpdates(op wire.OutPoint) int {
result, _ := nw.state.numChanUpdates.Load(op)
return result
}
// GetPolicyUpdates returns the node's policyUpdates state.
func (nw *nodeWatcher) GetPolicyUpdates(op wire.OutPoint) PolicyUpdate {
result, _ := nw.state.policyUpdates.Load(op)
return result
}
// GetNodeUpdates reads the node updates inside a lock and returns the value.
func (nw *nodeWatcher) GetNodeUpdates(pubkey string) []*lnrpc.NodeUpdate {
result, _ := nw.state.nodeUpdates.Load(pubkey)
return result
}
// WaitForNumChannelUpdates will block until a given number of updates has been
// seen in the node's network topology.
func (nw *nodeWatcher) WaitForNumChannelUpdates(op wire.OutPoint,
expected int) error {
checkNumUpdates := func() error {
num := nw.GetNumChannelUpdates(op)
if num >= expected {
return nil
}
return fmt.Errorf("timeout waiting for num channel updates, "+
"want %d, got %d", expected, num)
}
return wait.NoError(checkNumUpdates, wait.DefaultTimeout)
}
// WaitForNumNodeUpdates will block until a given number of node updates has
// been seen in the node's network topology.
func (nw *nodeWatcher) WaitForNumNodeUpdates(pubkey string,
expected int) ([]*lnrpc.NodeUpdate, error) {
updates := make([]*lnrpc.NodeUpdate, 0)
checkNumUpdates := func() error {
updates = nw.GetNodeUpdates(pubkey)
num := len(updates)
if num >= expected {
return nil
}
return fmt.Errorf("timeout waiting for num node updates, "+
"want %d, got %d", expected, num)
}
err := wait.NoError(checkNumUpdates, wait.DefaultTimeout)
return updates, err
}
// WaitForChannelOpen will block until a channel with the target outpoint is
// seen as being fully advertised within the network. A channel is considered
// "fully advertised" once both of its directional edges has been advertised in
// the node's network topology.
func (nw *nodeWatcher) WaitForChannelOpen(chanPoint *lnrpc.ChannelPoint) error {
op := nw.rpc.MakeOutpoint(chanPoint)
eventChan := make(chan struct{})
nw.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchOpenChannel,
}
timer := time.After(wait.DefaultTimeout)
select {
case <-eventChan:
return nil
case <-timer:
updates, err := syncMapToJSON(&nw.state.openChans.Map)
if err != nil {
return err
}
return fmt.Errorf("channel:%s not heard before timeout: "+
"node has heard: %s", op, updates)
}
}
// WaitForChannelClose will block until a channel with the target outpoint is
// seen as closed within the node's network topology. A channel is considered
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (nw *nodeWatcher) WaitForChannelClose(
chanPoint *lnrpc.ChannelPoint) (*lnrpc.ClosedChannelUpdate, error) {
op := nw.rpc.MakeOutpoint(chanPoint)
eventChan := make(chan struct{})
nw.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchCloseChannel,
}
timer := time.After(wait.DefaultTimeout)
select {
case <-eventChan:
closedChan, ok := nw.state.closedChans.Load(op)
if !ok {
return nil, fmt.Errorf("channel:%s expected to find "+
"a closed channel in node's state:%s", op,
nw.state)
}
return closedChan, nil
case <-timer:
return nil, fmt.Errorf("channel:%s not closed before timeout: "+
"%s", op, nw.state)
}
}
// WaitForChannelPolicyUpdate will block until a channel policy with the target
// outpoint and advertisingNode is seen within the network.
func (nw *nodeWatcher) WaitForChannelPolicyUpdate(
advertisingNode *HarnessNode, policy *lnrpc.RoutingPolicy,
chanPoint *lnrpc.ChannelPoint, includeUnannounced bool) error {
op := nw.rpc.MakeOutpoint(chanPoint)
ticker := time.NewTicker(wait.PollInterval)
timer := time.After(wait.DefaultTimeout)
defer ticker.Stop()
eventChan := make(chan struct{})
for {
select {
// Send a watch request every second.
case <-ticker.C:
// Did the event can close in the meantime? We want to
// avoid a "close of closed channel" panic since we're
// re-using the same event chan for multiple requests.
select {
case <-eventChan:
return nil
default:
}
nw.chanWatchRequests <- &chanWatchRequest{
chanPoint: op,
eventChan: eventChan,
chanWatchType: watchPolicyUpdate,
policy: policy,
advertisingNode: advertisingNode.PubKeyStr,
includeUnannounced: includeUnannounced,
}
case <-eventChan:
return nil
case <-timer:
expected, err := json.MarshalIndent(policy, "", "\t")
if err != nil {
return fmt.Errorf("encode policy err: %v", err)
}
policies, err := syncMapToJSON(
&nw.state.policyUpdates.Map,
)
if err != nil {
return err
}
return fmt.Errorf("policy not updated before timeout:"+
"\nchannel: %v \nadvertisingNode: %s:%v"+
"\nwant policy:%s\nhave updates:%s", op,
advertisingNode.Name(),
advertisingNode.PubKeyStr, expected, policies)
}
}
}
// syncMapToJSON is a helper function that creates json bytes from the sync.Map
// used in the node. Expect the sync.Map to have map[string]interface.
func syncMapToJSON(state *sync.Map) ([]byte, error) {
m := map[string]interface{}{}
state.Range(func(k, v interface{}) bool {
op := k.(wire.OutPoint)
m[op.String()] = v
return true
})
policies, err := json.MarshalIndent(m, "", "\t")
if err != nil {
return nil, fmt.Errorf("encode polices err: %v", err)
}
return policies, nil
}
// topologyWatcher is a goroutine which is able to dispatch notifications once
// it has been observed that a target channel has been closed or opened within
// the network. In order to dispatch these notifications, the
// GraphTopologySubscription client exposed as part of the gRPC interface is
// used.
//
// NOTE: must be run as a goroutine.
func (nw *nodeWatcher) topologyWatcher(ctxb context.Context,
started chan error) {
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
client, err := nw.newTopologyClient(ctxb)
started <- err
// Exit if there's an error.
if err != nil {
return
}
// Start a goroutine to receive graph updates.
nw.wg.Add(1)
go func() {
defer nw.wg.Done()
// With the client being created, we now start receiving the
// updates.
err = nw.receiveTopologyClientStream(ctxb, client, graphUpdates)
if err != nil {
started <- fmt.Errorf("receiveTopologyClientStream "+
"got err: %v", err)
}
}()
for {
select {
// A new graph update has just been received, so we'll examine
// the current set of registered clients to see if we can
// dispatch any requests.
case graphUpdate := <-graphUpdates:
nw.handleChannelEdgeUpdates(graphUpdate.ChannelUpdates)
nw.handleClosedChannelUpdate(graphUpdate.ClosedChans)
nw.handleNodeUpdates(graphUpdate.NodeUpdates)
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-nw.chanWatchRequests:
switch watchRequest.chanWatchType {
case watchOpenChannel:
// TODO(roasbeef): add update type also, checks
// for multiple of 2
nw.handleOpenChannelWatchRequest(watchRequest)
case watchCloseChannel:
nw.handleCloseChannelWatchRequest(watchRequest)
case watchPolicyUpdate:
nw.handlePolicyUpdateWatchRequest(watchRequest)
}
case <-ctxb.Done():
return
}
}
}
func (nw *nodeWatcher) handleNodeUpdates(updates []*lnrpc.NodeUpdate) {
for _, nodeUpdate := range updates {
nw.updateNodeStateNodeUpdates(nodeUpdate)
}
}
// handleChannelEdgeUpdates takes a series of channel edge updates, extracts
// the outpoints, and saves them to harness node's internal state.
func (nw *nodeWatcher) handleChannelEdgeUpdates(
updates []*lnrpc.ChannelEdgeUpdate) {
// For each new channel, we'll increment the number of edges seen by
// one.
for _, newChan := range updates {
op := nw.rpc.MakeOutpoint(newChan.ChanPoint)
// Update the num of channel updates.
nw.updateNodeStateNumChanUpdates(op)
// Update the open channels.
nw.updateNodeStateOpenChannel(op, newChan)
// Check whether there's a routing policy update. If so, save
// it to the node state.
if newChan.RoutingPolicy != nil {
nw.updateNodeStatePolicy(op, newChan)
}
}
}
// updateNodeStateNumChanUpdates updates the internal state of the node
// regarding the num of channel update seen.
func (nw *nodeWatcher) updateNodeStateNumChanUpdates(op wire.OutPoint) {
oldNum, _ := nw.state.numChanUpdates.Load(op)
nw.state.numChanUpdates.Store(op, oldNum+1)
}
// updateNodeStateNodeUpdates updates the internal state of the node regarding
// the node updates seen.
func (nw *nodeWatcher) updateNodeStateNodeUpdates(update *lnrpc.NodeUpdate) {
oldUpdates, _ := nw.state.nodeUpdates.Load(update.IdentityKey)
nw.state.nodeUpdates.Store(
update.IdentityKey, append(oldUpdates, update),
)
}
// updateNodeStateOpenChannel updates the internal state of the node regarding
// the open channels.
func (nw *nodeWatcher) updateNodeStateOpenChannel(op wire.OutPoint,
newChan *lnrpc.ChannelEdgeUpdate) {
// Load the old updates the node has heard so far.
updates, _ := nw.state.openChans.Load(op)
// Create a new update based on this newChan.
newUpdate := &OpenChannelUpdate{
AdvertisingNode: newChan.AdvertisingNode,
ConnectingNode: newChan.ConnectingNode,
Timestamp: time.Now(),
}
// Update the node's state.
updates = append(updates, newUpdate)
nw.state.openChans.Store(op, updates)
// For this new channel, if the number of edges seen is less
// than two, then the channel hasn't been fully announced yet.
if len(updates) < 2 {
return
}
// Otherwise, we'll notify all the registered watchers and
// remove the dispatched watchers.
watcherResult, loaded := nw.openChanWatchers.LoadAndDelete(op)
if !loaded {
return
}
for _, eventChan := range watcherResult {
close(eventChan)
}
}
// updateNodeStatePolicy updates the internal state of the node regarding the
// policy updates.
func (nw *nodeWatcher) updateNodeStatePolicy(op wire.OutPoint,
newChan *lnrpc.ChannelEdgeUpdate) {
// Init an empty policy map and overwrite it if the channel point can
// be found in the node's policyUpdates.
policies, ok := nw.state.policyUpdates.Load(op)
if !ok {
policies = make(PolicyUpdate)
}
node := newChan.AdvertisingNode
// Append the policy to the slice and update the node's state.
newPolicy := PolicyUpdateInfo{
newChan.RoutingPolicy, newChan.ConnectingNode, time.Now(),
}
policies[node] = append(policies[node], &newPolicy)
nw.state.policyUpdates.Store(op, policies)
}
// handleOpenChannelWatchRequest processes a watch open channel request by
// checking the number of the edges seen for a given channel point. If the
// number is no less than 2 then the channel is considered open. Otherwise, we
// will attempt to find it in its channel graph. If neither can be found, the
// request is added to a watch request list than will be handled by
// handleChannelEdgeUpdates.
func (nw *nodeWatcher) handleOpenChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is an open request, then it can be dispatched if the number
// of edges seen for the channel is at least two.
result, _ := nw.state.openChans.Load(targetChan)
if len(result) >= 2 {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of open channel watchers for
// this out point.
watchers, _ := nw.openChanWatchers.Load(targetChan)
nw.openChanWatchers.Store(
targetChan, append(watchers, req.eventChan),
)
}
// handleClosedChannelUpdate takes a series of closed channel updates, extracts
// the outpoints, saves them to harness node's internal state, and notifies all
// registered clients.
func (nw *nodeWatcher) handleClosedChannelUpdate(
updates []*lnrpc.ClosedChannelUpdate) {
// For each channel closed, we'll mark that we've detected a channel
// closure while lnd was pruning the channel graph.
for _, closedChan := range updates {
op := nw.rpc.MakeOutpoint(closedChan.ChanPoint)
nw.state.closedChans.Store(op, closedChan)
// As the channel has been closed, we'll notify all register
// watchers.
watchers, loaded := nw.closeChanWatchers.LoadAndDelete(op)
if !loaded {
continue
}
for _, eventChan := range watchers {
close(eventChan)
}
}
}
// handleCloseChannelWatchRequest processes a watch close channel request by
// checking whether the given channel point can be found in the node's internal
// state. If not, the request is added to a watch request list than will be
// handled by handleCloseChannelWatchRequest.
func (nw *nodeWatcher) handleCloseChannelWatchRequest(req *chanWatchRequest) {
targetChan := req.chanPoint
// If this is a close request, then it can be immediately dispatched if
// we've already seen a channel closure for this channel.
if _, ok := nw.state.closedChans.Load(targetChan); ok {
close(req.eventChan)
return
}
// Otherwise, we'll add this to the list of close channel watchers for
// this out point.
oldWatchers, _ := nw.closeChanWatchers.Load(targetChan)
nw.closeChanWatchers.Store(
targetChan, append(oldWatchers, req.eventChan),
)
}
// handlePolicyUpdateWatchRequest checks that if the expected policy can be
// found either in the node's interval state or describe graph response. If
// found, it will signal the request by closing the event channel. Otherwise it
// does nothing but returns nil.
func (nw *nodeWatcher) handlePolicyUpdateWatchRequest(req *chanWatchRequest) {
op := req.chanPoint
var policies []*PolicyUpdateInfo
// Get a list of known policies for this chanPoint+advertisingNode
// combination. Start searching in the node state first.
policyMap, ok := nw.state.policyUpdates.Load(op)
if ok {
policies, ok = policyMap[req.advertisingNode]
if !ok {
return
}
} else {
// If it cannot be found in the node state, try searching it
// from the node's DescribeGraph.
policyMap := nw.getChannelPolicies(req.includeUnannounced)
result, ok := policyMap[op.String()][req.advertisingNode]
if !ok {
return
}
for _, policy := range result {
// Use empty from node to mark it being loaded from
// DescribeGraph.
policies = append(
policies, &PolicyUpdateInfo{
policy, "", time.Now(),
},
)
}
}
// Check if the latest policy is matched.
policy := policies[len(policies)-1]
if CheckChannelPolicy(policy.RoutingPolicy, req.policy) == nil {
close(req.eventChan)
return
}
}
type topologyClient lnrpc.Lightning_SubscribeChannelGraphClient
// newTopologyClient creates a topology client.
func (nw *nodeWatcher) newTopologyClient(
ctx context.Context) (topologyClient, error) {
req := &lnrpc.GraphTopologySubscription{}
client, err := nw.rpc.LN.SubscribeChannelGraph(ctx, req)
if err != nil {
return nil, fmt.Errorf("%s: unable to create topology client: "+
"%v (%s)", nw.rpc.Name, err, time.Now().String())
}
return client, nil
}
// receiveTopologyClientStream takes a topologyClient and receives graph
// updates.
//
// NOTE: must be run as a goroutine.
func (nw *nodeWatcher) receiveTopologyClientStream(ctxb context.Context,
client topologyClient,
receiver chan *lnrpc.GraphTopologyUpdate) error {
for {
update, err := client.Recv()
switch {
case err == nil:
// Good case. We will send the update to the receiver.
case strings.Contains(err.Error(), "EOF"):
// End of subscription stream. Do nothing and quit.
return nil
case strings.Contains(err.Error(), context.Canceled.Error()):
// End of subscription stream. Do nothing and quit.
return nil
default:
// An expected error is returned, return and leave it
// to be handled by the caller.
return fmt.Errorf("graph subscription err: %w", err)
}
// Send the update or quit.
select {
case receiver <- update:
case <-ctxb.Done():
return nil
}
}
}
// getChannelPolicies queries the channel graph and formats the policies into
// the format defined in type policyUpdateMap.
func (nw *nodeWatcher) getChannelPolicies(include bool) policyUpdateMap {
req := &lnrpc.ChannelGraphRequest{IncludeUnannounced: include}
graph := nw.rpc.DescribeGraph(req)
policyUpdates := policyUpdateMap{}
for _, e := range graph.Edges {
policies := policyUpdates[e.ChanPoint]
// If the map[op] is nil, we need to initialize the map first.
if policies == nil {
policies = make(map[string][]*lnrpc.RoutingPolicy)
}
if e.Node1Policy != nil {
policies[e.Node1Pub] = append(
policies[e.Node1Pub], e.Node1Policy,
)
}
if e.Node2Policy != nil {
policies[e.Node2Pub] = append(
policies[e.Node2Pub], e.Node2Policy,
)
}
policyUpdates[e.ChanPoint] = policies
}
return policyUpdates
}
// CheckChannelPolicy checks that the policy matches the expected one.
func CheckChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error {
if policy.FeeBaseMsat != expectedPolicy.FeeBaseMsat {
return fmt.Errorf("expected base fee %v, got %v",
expectedPolicy.FeeBaseMsat, policy.FeeBaseMsat)
}
if policy.FeeRateMilliMsat != expectedPolicy.FeeRateMilliMsat {
return fmt.Errorf("expected fee rate %v, got %v",
expectedPolicy.FeeRateMilliMsat,
policy.FeeRateMilliMsat)
}
if policy.TimeLockDelta != expectedPolicy.TimeLockDelta {
return fmt.Errorf("expected time lock delta %v, got %v",
expectedPolicy.TimeLockDelta,
policy.TimeLockDelta)
}
if policy.MinHtlc != expectedPolicy.MinHtlc {
return fmt.Errorf("expected min htlc %v, got %v",
expectedPolicy.MinHtlc, policy.MinHtlc)
}
if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat {
return fmt.Errorf("expected max htlc %v, got %v",
expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat)
}
if policy.Disabled != expectedPolicy.Disabled {
return errors.New("edge should be disabled but isn't")
}
return nil
}