test: Allow rpc_bind.py --usecli

This allows to run the test under bitcoin-cli, except for one small
part, which is skipped for now.

This also inlines rpc_url directly into create_new_rpc_connection,
because this is the only place where it is needed.
This commit is contained in:
MarcoFalke
2026-05-20 14:30:34 +02:00
parent fa8f25118c
commit fa00c7c7a4
3 changed files with 19 additions and 19 deletions

View File

@@ -7,14 +7,13 @@
from test_framework.netutil import all_interfaces, addr_to_hex, get_bind_addrs, test_ipv6_local
from test_framework.test_framework import BitcoinTestFramework, SkipTest
from test_framework.test_node import ErrorMatch
from test_framework.util import assert_equal, assert_raises_rpc_error, get_rpc_proxy, rpc_port, rpc_url
from test_framework.util import assert_equal, assert_raises_rpc_error, rpc_port
class RPCBindTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.bind_to_localhost_only = False
self.num_nodes = 1
self.supports_cli = False
def skip_test_if_missing_module(self):
self.skip_if_platform_not_posix()
@@ -40,6 +39,7 @@ class RPCBindTest(BitcoinTestFramework):
base_args += ['-rpcallowip=' + x for x in allow_ips]
binds = ['-rpcbind='+addr for addr in addresses]
self.nodes[0].rpchost = connect_to
self.nodes[0].cli = self.nodes[0].create_new_rpc_connection(mode="CLI") # Pass `connect_to` to cli.
self.start_node(0, base_args + binds)
pid = self.nodes[0].process.pid
assert_equal(set(get_bind_addrs(pid)), set(expected))
@@ -70,8 +70,9 @@ class RPCBindTest(BitcoinTestFramework):
['-rpcbind='+addr for addr in ['127.0.0.1', "%s:%d" % (rpchost, rpcport)]] # Bind to localhost as well so start_nodes doesn't hang
self.nodes[0].rpchost = None
self.start_nodes([node_args])
self.nodes[0].rpchost = f"{rpchost}:{rpcport}"
# connect to node through non-loopback interface
node = get_rpc_proxy(rpc_url(self.nodes[0].datadir_path, 0, self.chain, "%s:%d" % (rpchost, rpcport)), 0, coveragedir=self.options.coveragedir)
node = self.nodes[0].create_new_rpc_connection()
node.getnetworkinfo()
self.stop_nodes()
@@ -164,6 +165,9 @@ class RPCBindTest(BitcoinTestFramework):
# Check that with invalid rpcallowip, we are denied
self.run_allowip_test([self.non_loopback_ip], self.non_loopback_ip, self.defaultport)
if self.options.usecli:
self.log.info("Skip negative IP test with CLI, because the CLI can not throw the tested exception type")
return
assert_raises_rpc_error(-342, "non-JSON HTTP response with '403 Forbidden' from server", self.run_allowip_test, ['1.1.1.1'], self.non_loopback_ip, self.defaultport)
if __name__ == '__main__':

View File

@@ -37,7 +37,7 @@ from .util import (
delete_cookie_file,
get_auth_cookie,
get_rpc_proxy,
rpc_url,
rpc_port,
wait_until_helper_internal,
p2p_port,
tor_port,
@@ -310,9 +310,18 @@ class TestNode():
if mode == RPCConnectionType.AUTO:
mode = RPCConnectionType.CLI if self.use_cli else RPCConnectionType.AUTHPROXY
client_timeout = client_timeout or (self.rpc_timeout // 2) # Shorter timeout to allow for one retry in case of ETIMEDOUT
host = "127.0.0.1"
port = rpc_port(self.index)
if self.rpchost:
parts = self.rpchost.split(":")
if len(parts) == 2:
host, port = parts
else:
host = self.rpchost
if mode == RPCConnectionType.AUTHPROXY:
rpc_u, rpc_p = get_auth_cookie(self.datadir_path, self.chain)
rpc = get_rpc_proxy(
rpc_url(self.datadir_path, self.index, self.chain, self.rpchost),
f"http://{rpc_u}:{rpc_p}@{host}:{port}",
self.index,
timeout=client_timeout,
coveragedir=self.coverage_dir,
@@ -320,7 +329,7 @@ class TestNode():
rpc.auth_service_proxy_instance.reuse_http_connections = self.reuse_http_connections
return rpc
else: # mode==CLI
return self.cli(f"-rpcclienttimeout={client_timeout}")
return self.cli(f"-rpcclienttimeout={client_timeout}", f"-rpcconnect={host}", f"-rpcport={port}")
def wait_for_rpc_connection(self, *, wait_for_import=True):
"""Sets up an RPC connection to the bitcoind process. Returns False if unable to connect."""

View File

@@ -517,19 +517,6 @@ def tor_port(n):
return p2p_port(n) + PORT_RANGE * 2
def rpc_url(datadir, i, chain, rpchost):
rpc_u, rpc_p = get_auth_cookie(datadir, chain)
host = '127.0.0.1'
port = rpc_port(i)
if rpchost:
parts = rpchost.split(':')
if len(parts) == 2:
host, port = parts
else:
host = rpchost
return "http://%s:%s@%s:%d" % (rpc_u, rpc_p, host, int(port))
# Node functions
################