mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-08-23 13:12:22 +02:00
tests: Add RPCOverloadWrapper which overloads some disabled RPCs
RPCOverloadWrapper overloads some deprecated or disabled RPCs with an implementation using other RPCs to avoid having a ton of code churn around replacing those RPCs.
This commit is contained in:
@@ -22,6 +22,7 @@ import shlex
|
||||
import sys
|
||||
|
||||
from .authproxy import JSONRPCException
|
||||
from .descriptors import descsum_create
|
||||
from .util import (
|
||||
MAX_NODES,
|
||||
append_config,
|
||||
@@ -170,10 +171,10 @@ class TestNode():
|
||||
def __getattr__(self, name):
|
||||
"""Dispatches any unrecognised messages to the RPC connection or a CLI instance."""
|
||||
if self.use_cli:
|
||||
return getattr(self.cli, name)
|
||||
return getattr(RPCOverloadWrapper(self.cli, True), name)
|
||||
else:
|
||||
assert self.rpc_connected and self.rpc is not None, self._node_msg("Error: no RPC connection")
|
||||
return getattr(self.rpc, name)
|
||||
return getattr(RPCOverloadWrapper(self.rpc), name)
|
||||
|
||||
def start(self, extra_args=None, *, cwd=None, stdout=None, stderr=None, **kwargs):
|
||||
"""Start the node."""
|
||||
@@ -265,11 +266,11 @@ class TestNode():
|
||||
|
||||
def get_wallet_rpc(self, wallet_name):
|
||||
if self.use_cli:
|
||||
return self.cli("-rpcwallet={}".format(wallet_name))
|
||||
return RPCOverloadWrapper(self.cli("-rpcwallet={}".format(wallet_name)), True)
|
||||
else:
|
||||
assert self.rpc_connected and self.rpc, self._node_msg("RPC not connected")
|
||||
wallet_path = "wallet/{}".format(urllib.parse.quote(wallet_name))
|
||||
return self.rpc / wallet_path
|
||||
return RPCOverloadWrapper(self.rpc / wallet_path)
|
||||
|
||||
def stop_node(self, expected_stderr='', wait=0):
|
||||
"""Stop the node."""
|
||||
@@ -595,3 +596,103 @@ class TestNodeCLI():
|
||||
return json.loads(cli_stdout, parse_float=decimal.Decimal)
|
||||
except json.JSONDecodeError:
|
||||
return cli_stdout.rstrip("\n")
|
||||
|
||||
class RPCOverloadWrapper():
|
||||
def __init__(self, rpc, cli=False):
|
||||
self.rpc = rpc
|
||||
self.is_cli = cli
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.rpc, name)
|
||||
|
||||
def importprivkey(self, privkey, label=None, rescan=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if self.is_cli:
|
||||
if label is None:
|
||||
label = 'null'
|
||||
if rescan is None:
|
||||
rescan = 'null'
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('importprivkey')(privkey, label, rescan)
|
||||
desc = descsum_create('combo(' + privkey + ')')
|
||||
req = [{
|
||||
'desc': desc,
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
}]
|
||||
import_res = self.importdescriptors(req)
|
||||
if not import_res[0]['success']:
|
||||
raise JSONRPCException(import_res[0]['error'])
|
||||
|
||||
def addmultisigaddress(self, nrequired, keys, label=None, address_type=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if self.is_cli:
|
||||
if label is None:
|
||||
label = 'null'
|
||||
if address_type is None:
|
||||
address_type = 'null'
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('addmultisigaddress')(nrequired, keys, label, address_type)
|
||||
cms = self.createmultisig(nrequired, keys, address_type)
|
||||
req = [{
|
||||
'desc': cms['descriptor'],
|
||||
'timestamp': 0,
|
||||
'label': label if label else ''
|
||||
}]
|
||||
import_res = self.importdescriptors(req)
|
||||
if not import_res[0]['success']:
|
||||
raise JSONRPCException(import_res[0]['error'])
|
||||
return cms
|
||||
|
||||
def importpubkey(self, pubkey, label=None, rescan=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if self.is_cli:
|
||||
if label is None:
|
||||
label = 'null'
|
||||
if rescan is None:
|
||||
rescan = 'null'
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('importpubkey')(pubkey, label, rescan)
|
||||
desc = descsum_create('combo(' + pubkey + ')')
|
||||
req = [{
|
||||
'desc': desc,
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
}]
|
||||
import_res = self.importdescriptors(req)
|
||||
if not import_res[0]['success']:
|
||||
raise JSONRPCException(import_res[0]['error'])
|
||||
|
||||
def importaddress(self, address, label=None, rescan=None, p2sh=None):
|
||||
wallet_info = self.getwalletinfo()
|
||||
if self.is_cli:
|
||||
if label is None:
|
||||
label = 'null'
|
||||
if rescan is None:
|
||||
rescan = 'null'
|
||||
if p2sh is None:
|
||||
p2sh = 'null'
|
||||
if 'descriptors' not in wallet_info or ('descriptors' in wallet_info and not wallet_info['descriptors']):
|
||||
return self.__getattr__('importaddress')(address, label, rescan, p2sh)
|
||||
is_hex = False
|
||||
try:
|
||||
int(address ,16)
|
||||
is_hex = True
|
||||
desc = descsum_create('raw(' + address + ')')
|
||||
except:
|
||||
desc = descsum_create('addr(' + address + ')')
|
||||
reqs = [{
|
||||
'desc': desc,
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
}]
|
||||
if is_hex and p2sh:
|
||||
reqs.append({
|
||||
'desc': descsum_create('p2sh(raw(' + address + '))'),
|
||||
'timestamp': 0 if rescan else 'now',
|
||||
'label': label if label else ''
|
||||
})
|
||||
import_res = self.importdescriptors(reqs)
|
||||
for res in import_res:
|
||||
if not res['success']:
|
||||
raise JSONRPCException(res['error'])
|
||||
|
Reference in New Issue
Block a user