diff --git a/itest/lnd_etcd_failover_test.go b/itest/lnd_etcd_failover_test.go index 9ec6bd8a8..c2a345889 100644 --- a/itest/lnd_etcd_failover_test.go +++ b/itest/lnd_etcd_failover_test.go @@ -13,7 +13,7 @@ import ( "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" - "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/port" "github.com/stretchr/testify/require" ) @@ -56,8 +56,8 @@ func testEtcdFailover(ht *lntest.HarnessTest) { func testEtcdFailoverCase(ht *lntest.HarnessTest, kill bool) { etcdCfg, cleanup, err := kvdb.StartEtcdTestBackend( - ht.T.TempDir(), uint16(node.NextAvailablePort()), - uint16(node.NextAvailablePort()), "", + ht.T.TempDir(), uint16(port.NextAvailablePort()), + uint16(port.NextAvailablePort()), "", ) require.NoError(ht, err, "Failed to start etcd instance") defer cleanup() diff --git a/itest/lnd_network_test.go b/itest/lnd_network_test.go index b4c73af60..fd17d0465 100644 --- a/itest/lnd_network_test.go +++ b/itest/lnd_network_test.go @@ -9,6 +9,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/port" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/stretchr/testify/require" ) @@ -111,7 +112,7 @@ func testReconnectAfterIPChange(ht *lntest.HarnessTest) { // We derive an extra port for Dave, and we initialise his node with // the port advertised as `--externalip` arguments. - ip2 := node.NextAvailablePort() + ip2 := port.NextAvailablePort() // Create a new node, Dave, which will initialize a P2P port for him. daveArgs := []string{fmt.Sprintf("--externalip=127.0.0.1:%d", ip2)} @@ -190,7 +191,7 @@ func testReconnectAfterIPChange(ht *lntest.HarnessTest) { // address. // Change Dave's listening port and restart. - dave.Cfg.P2PPort = node.NextAvailablePort() + dave.Cfg.P2PPort = port.NextAvailablePort() dave.Cfg.ExtraArgs = []string{ fmt.Sprintf( "--externalip=127.0.0.1:%d", dave.Cfg.P2PPort, diff --git a/itest/lnd_test.go b/itest/lnd_test.go index 9c823b5a8..b5886bc29 100644 --- a/itest/lnd_test.go +++ b/itest/lnd_test.go @@ -17,6 +17,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/port" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/stretchr/testify/require" "google.golang.org/grpc/grpclog" @@ -90,7 +91,6 @@ func TestLightningNetworkDaemon(t *testing.T) { // Get the test cases to be run in this tranche. testCases, trancheIndex, trancheOffset := getTestCaseSplitTranche() - node.ApplyPortOffset(uint32(trancheIndex) * 1000) // Create a simple fee service. feeService := lntest.NewFeeService(t) @@ -216,9 +216,10 @@ func init() { // Before we start any node, we need to make sure that any btcd node // that is started through the RPC harness uses a unique port as well // to avoid any port collisions. - rpctest.ListenAddressGenerator = node.GenerateBtcdListenerAddresses + rpctest.ListenAddressGenerator = + port.GenerateSystemUniqueListenerAddresses - // Swap out grpc's default logger with out fake logger which drops the + // Swap out grpc's default logger with our fake logger which drops the // statements on the floor. fakeLogger := grpclog.NewLoggerV2(io.Discard, io.Discard, io.Discard) grpclog.SetLoggerV2(fakeLogger) diff --git a/itest/lnd_watchtower_test.go b/itest/lnd_watchtower_test.go index bfd1425a5..ad724e45a 100644 --- a/itest/lnd_watchtower_test.go +++ b/itest/lnd_watchtower_test.go @@ -13,6 +13,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc/wtclientrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/port" "github.com/lightningnetwork/lnd/lntest/rpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/stretchr/testify/require" @@ -595,7 +596,7 @@ func testRevokedCloseRetributionAltruistWatchtowerCase(ht *lntest.HarnessTest, func setUpNewTower(ht *lntest.HarnessTest, name, externalIP string) ([]byte, string, *rpc.HarnessRPC) { - port := node.NextAvailablePort() + port := port.NextAvailablePort() listenAddr := fmt.Sprintf("0.0.0.0:%d", port) diff --git a/lntest/bitcoind_common.go b/lntest/bitcoind_common.go index 9ce962dc2..b5c43c448 100644 --- a/lntest/bitcoind_common.go +++ b/lntest/bitcoind_common.go @@ -15,6 +15,7 @@ import ( "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/rpcclient" "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/port" ) // logDirPattern is the pattern of the name of the temporary log directory. @@ -111,11 +112,10 @@ func newBackend(miner string, netParams *chaincfg.Params, extraArgs []string, } zmqBlockAddr := fmt.Sprintf("tcp://127.0.0.1:%d", - node.NextAvailablePort()) - zmqTxAddr := fmt.Sprintf("tcp://127.0.0.1:%d", - node.NextAvailablePort()) - rpcPort := node.NextAvailablePort() - p2pPort := node.NextAvailablePort() + port.NextAvailablePort()) + zmqTxAddr := fmt.Sprintf("tcp://127.0.0.1:%d", port.NextAvailablePort()) + rpcPort := port.NextAvailablePort() + p2pPort := port.NextAvailablePort() cmdArgs := []string{ "-datadir=" + tempBitcoindDir, diff --git a/lntest/fee_service.go b/lntest/fee_service.go index 592430b8e..49bd953ac 100644 --- a/lntest/fee_service.go +++ b/lntest/fee_service.go @@ -10,7 +10,7 @@ import ( "testing" "github.com/lightningnetwork/lnd" - "github.com/lightningnetwork/lnd/lntest/node" + "github.com/lightningnetwork/lnd/lntest/port" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/stretchr/testify/require" ) @@ -60,11 +60,11 @@ type FeeService struct { // Compile-time check for the WebFeeService interface. var _ WebFeeService = (*FeeService)(nil) -// Start spins up a go-routine to serve fee estimates. +// NewFeeService spins up a go-routine to serve fee estimates. func NewFeeService(t *testing.T) *FeeService { t.Helper() - port := node.NextAvailablePort() + port := port.NextAvailablePort() f := FeeService{ T: t, url: fmt.Sprintf( diff --git a/lntest/node/config.go b/lntest/node/config.go index 1573af42e..5a7013a21 100644 --- a/lntest/node/config.go +++ b/lntest/node/config.go @@ -4,16 +4,16 @@ import ( "flag" "fmt" "io" - "net" "os" "path" "path/filepath" - "sync/atomic" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/integration/rpctest" "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/kvdb/etcd" + "github.com/lightningnetwork/lnd/lntest/port" "github.com/lightningnetwork/lnd/lntest/wait" ) @@ -25,18 +25,9 @@ const ( // DefaultCSV is the CSV delay (remotedelay) we will start our test // nodes with. DefaultCSV = 4 - - // defaultNodePort is the start of the range for listening ports of - // harness nodes. Ports are monotonically increasing starting from this - // number and are determined by the results of NextAvailablePort(). - defaultNodePort = 5555 ) var ( - // lastPort is the last port determined to be free for use by a new - // node. It should be used atomically. - lastPort uint32 = defaultNodePort - // logOutput is a flag that can be set to append the output from the // seed nodes to log files. logOutput = flag.Bool("logoutput", false, @@ -171,16 +162,16 @@ func (cfg BaseNodeConfig) ChanBackupPath() string { // current lightning network test. func (cfg *BaseNodeConfig) GenerateListeningPorts() { if cfg.P2PPort == 0 { - cfg.P2PPort = NextAvailablePort() + cfg.P2PPort = port.NextAvailablePort() } if cfg.RPCPort == 0 { - cfg.RPCPort = NextAvailablePort() + cfg.RPCPort = port.NextAvailablePort() } if cfg.RESTPort == 0 { - cfg.RESTPort = NextAvailablePort() + cfg.RESTPort = port.NextAvailablePort() } if cfg.ProfilePort == 0 { - cfg.ProfilePort = NextAvailablePort() + cfg.ProfilePort = port.NextAvailablePort() } } @@ -259,13 +250,13 @@ func (cfg *BaseNodeConfig) GenArgs() []string { args = append( args, fmt.Sprintf( "--db.etcd.embedded_client_port=%v", - NextAvailablePort(), + port.NextAvailablePort(), ), ) args = append( args, fmt.Sprintf( "--db.etcd.embedded_peer_port=%v", - NextAvailablePort(), + port.NextAvailablePort(), ), ) args = append( @@ -333,34 +324,6 @@ func ExtraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool, return extraArgs } -// NextAvailablePort returns the first port that is available for listening by -// a new node. It panics if no port is found and the maximum available TCP port -// is reached. -func NextAvailablePort() int { - port := atomic.AddUint32(&lastPort, 1) - for port < 65535 { - // If there are no errors while attempting to listen on this - // port, close the socket and return it as available. While it - // could be the case that some other process picks up this port - // between the time the socket is closed and it's reopened in - // the harness node, in practice in CI servers this seems much - // less likely than simply some other process already being - // bound at the start of the tests. - addr := fmt.Sprintf(ListenerFormat, port) - l, err := net.Listen("tcp4", addr) - if err == nil { - err := l.Close() - if err == nil { - return int(port) - } - } - port = atomic.AddUint32(&lastPort, 1) - } - - // No ports available? Must be a mistake. - panic("no ports available for listening") -} - // GetLogDir returns the passed --logdir flag or the default value if it wasn't // set. func GetLogDir() string { @@ -402,16 +365,10 @@ func GetBtcdBinary() string { return "" } -// GenerateBtcdListenerAddresses is a function that returns two listener -// addresses with unique ports and should be used to overwrite rpctest's -// default generator which is prone to use colliding ports. -func GenerateBtcdListenerAddresses() (string, string) { - return fmt.Sprintf(ListenerFormat, NextAvailablePort()), - fmt.Sprintf(ListenerFormat, NextAvailablePort()) -} - -// ApplyPortOffset adds the given offset to the lastPort variable, making it -// possible to run the tests in parallel without colliding on the same ports. -func ApplyPortOffset(offset uint32) { - _ = atomic.AddUint32(&lastPort, offset) +func init() { + // Before we start any node, we need to make sure that any btcd or + // bitcoind node that is started through the RPC harness uses a unique + // port as well to avoid any port collisions. + rpctest.ListenAddressGenerator = + port.GenerateSystemUniqueListenerAddresses } diff --git a/lntest/port/port.go b/lntest/port/port.go new file mode 100644 index 000000000..28db1918d --- /dev/null +++ b/lntest/port/port.go @@ -0,0 +1,150 @@ +package port + +import ( + "fmt" + "net" + "os" + "path/filepath" + "strconv" + "sync" + "time" +) + +const ( + // ListenerFormat is the format string that is used to generate local + // listener addresses. + ListenerFormat = "127.0.0.1:%d" + + // defaultNodePort is the start of the range for listening ports of + // harness nodes. Ports are monotonically increasing starting from this + // number and are determined by the results of NextAvailablePort(). + defaultNodePort int = 10000 + + // uniquePortFile is the name of the file that is used to store the + // last port that was used by a node. This is used to make sure that + // the same port is not used by multiple nodes at the same time. The + // file is located in the temp directory of a system. + uniquePortFile = "rpctest-port" +) + +var ( + // portFileMutex is a mutex that is used to make sure that the port file + // is not accessed by multiple goroutines of the same process at the + // same time. This is used in conjunction with the lock file to make + // sure that the port file is not accessed by multiple processes at the + // same time either. So the lock file is to guard between processes and + // the mutex is to guard between goroutines of the same process. + portFileMutex sync.Mutex +) + +// NextAvailablePort returns the first port that is available for listening by a +// new node, using a lock file to make sure concurrent access for parallel tasks +// on the same system don't re-use the same port. +func NextAvailablePort() int { + portFileMutex.Lock() + defer portFileMutex.Unlock() + + lockFile := filepath.Join(os.TempDir(), uniquePortFile+".lock") + timeout := time.After(time.Second) + + var ( + lockFileHandle *os.File + err error + ) + for { + // Attempt to acquire the lock file. If it already exists, wait + // for a bit and retry. + lockFileHandle, err = os.OpenFile( + lockFile, os.O_CREATE|os.O_EXCL, 0600, + ) + if err == nil { + // Lock acquired. + break + } + + // Wait for a bit and retry. + select { + case <-timeout: + panic("timeout waiting for lock file") + case <-time.After(10 * time.Millisecond): + } + } + + // Release the lock file when we're done. + defer func() { + // Always close file first, Windows won't allow us to remove it + // otherwise. + _ = lockFileHandle.Close() + err := os.Remove(lockFile) + if err != nil { + panic(fmt.Errorf("couldn't remove lock file: %w", err)) + } + }() + + portFile := filepath.Join(os.TempDir(), uniquePortFile) + port, err := os.ReadFile(portFile) + if err != nil { + if !os.IsNotExist(err) { + panic(fmt.Errorf("error reading port file: %w", err)) + } + port = []byte(strconv.Itoa(defaultNodePort)) + } + + lastPort, err := strconv.Atoi(string(port)) + if err != nil { + panic(fmt.Errorf("error parsing port: %w", err)) + } + + // We take the next one. + lastPort++ + for lastPort < 65535 { + // If there are no errors while attempting to listen on this + // port, close the socket and return it as available. While it + // could be the case that some other process picks up this port + // between the time the socket is closed, and it's reopened in + // the harness node, in practice in CI servers this seems much + // less likely than simply some other process already being + // bound at the start of the tests. + addr := fmt.Sprintf(ListenerFormat, lastPort) + l, err := net.Listen("tcp4", addr) + if err == nil { + err := l.Close() + if err == nil { + err := os.WriteFile( + portFile, + []byte(strconv.Itoa(lastPort)), 0600, + ) + if err != nil { + panic(fmt.Errorf("error updating "+ + "port file: %w", err)) + } + + return lastPort + } + } + lastPort++ + + // Start from the beginning if we reached the end of the port + // range. We need to do this because the lock file now is + // persistent across runs on the same machine during the same + // boot/uptime cycle. So in order to make this work on + // developer's machines, we need to reset the port to the + // default value when we reach the end of the range. + if lastPort == 65535 { + lastPort = defaultNodePort + } + } + + // No ports available? Must be a mistake. + panic("no ports available for listening") +} + +// GenerateSystemUniqueListenerAddresses is a function that returns two +// listener addresses with unique ports per system and should be used to +// overwrite rpctest's default generator which is prone to use colliding ports. +func GenerateSystemUniqueListenerAddresses() (string, string) { + port1 := NextAvailablePort() + port2 := NextAvailablePort() + return fmt.Sprintf(ListenerFormat, port1), + fmt.Sprintf(ListenerFormat, port2) +}