Merge bitcoin/bitcoin#29130: wallet: Add createwalletdescriptor and gethdkeys RPCs for adding new automatically generated descriptors

746b6d8839 test: Add test for createwalletdescriptor (Ava Chow)
2402b63062 wallet: Test upgrade of pre-taproot wallet to have tr() descriptors (Ava Chow)
460ae1bf67 wallet, rpc: Add createwalletdescriptor RPC (Ava Chow)
8e1a475062 wallet: Be able to retrieve single key from descriptors (Ava Chow)
85b1fb19dd wallet: Add GetActiveHDPubKeys to retrieve xpubs from active descriptors (Ava Chow)
73926f2d31 wallet, descspkm: Refactor wallet descriptor generation to standalone func (Andrew Chow)
54e74f46ea wallet: Refactor function for single DescSPKM setup (Andrew Chow)
3b09d0eb7f tests: Test for gethdkeys (Ava Chow)
5febe28c9e wallet, rpc: Add gethdkeys RPC (Ava Chow)
66632e5c24 wallet: Add IsActiveScriptPubKeyMan (Ava Chow)
fa6a259985 desc spkm: Add functions to retrieve specific private keys (Ava Chow)
fe67841464 descriptor: Be able to get the pubkeys involved in a descriptor (Ava Chow)
ef6745879d key: Add constructor for CExtKey that takes CExtPubKey and CKey (Ava Chow)

Pull request description:

  This PR adds a `createwalletdescriptor` RPC which allows users to add new automatically generated descriptors to their wallet, e.g. to upgrade a 0.21.x wallet to contain a taproot descriptor. This RPC takes 3 arguments: the output type to create a descriptor for, whether the descriptor will be internal or external, and the HD key to use if the user wishes to use a specific key. The HD key is an optional parameter. If it is not specified, the wallet will use the key shared by the active descriptors, if they are all single key. For most users in the expected upgrade scenario, this should be sufficient. In more advanced cases, the user must specify the HD key to use.

  Currently, specified HD keys must already exist in the wallet. To make it easier for the user to know, `gethdkeys` is also added to list out the HD keys in use by all of the descriptors in the wallet. This will include all HD keys, whether we have the private key, for it, which descriptors use it and their activeness, and optionally the extended private key. In this way, users with more complex wallets will be still be able to get HD keys from their wallet for use in other scenarios, and if they want to use `createwalletdescriptor`, they can easily get the keys that they can specify to it.

  See also https://github.com/bitcoin/bitcoin/pull/26728#issuecomment-1866961865

ACKs for top commit:
  Sjors:
    re-utACK 746b6d8839
  furszy:
    ACK 746b6d8
  ryanofsky:
    Code review ACK 746b6d8839, and this looks ready to merge. There were various suggested changes since last review where main change seems to be switching `gethdkeys` output to use normalized descriptors (removing hardened path components).

Tree-SHA512: f2849101e6fbf1f59cb031eaaaee97af5b1ae92aaab54c5716940d210f08ab4fc952df2725b636596cd5747b8f5beb1a7a533425bc10d09da02659473516fbda
This commit is contained in:
Ryan Ofsky
2024-03-29 06:27:43 -04:00
16 changed files with 775 additions and 63 deletions

View File

@@ -181,6 +181,8 @@ BASE_SCRIPTS = [
'wallet_keypool_topup.py --legacy-wallet',
'wallet_keypool_topup.py --descriptors',
'wallet_fast_rescan.py --descriptors',
'wallet_gethdkeys.py --descriptors',
'wallet_createwalletdescriptor.py --descriptors',
'interface_zmq.py',
'rpc_invalid_address_message.py',
'rpc_validateaddress.py',

View File

@@ -355,6 +355,25 @@ class BackwardsCompatibilityTest(BitcoinTestFramework):
down_wallet_name = f"re_down_{node.version}"
down_backup_path = os.path.join(self.options.tmpdir, f"{down_wallet_name}.dat")
wallet.backupwallet(down_backup_path)
# Check that taproot descriptors can be added to 0.21 wallets
# This must be done after the backup is created so that 0.21 can still load
# the backup
if self.options.descriptors and self.major_version_equals(node, 21):
assert_raises_rpc_error(-12, "No bech32m addresses available", wallet.getnewaddress, address_type="bech32m")
xpubs = wallet.gethdkeys(active_only=True)
assert_equal(len(xpubs), 1)
assert_equal(len(xpubs[0]["descriptors"]), 6)
wallet.createwalletdescriptor("bech32m")
xpubs = wallet.gethdkeys(active_only=True)
assert_equal(len(xpubs), 1)
assert_equal(len(xpubs[0]["descriptors"]), 8)
tr_descs = [desc["desc"] for desc in xpubs[0]["descriptors"] if desc["desc"].startswith("tr(")]
assert_equal(len(tr_descs), 2)
for desc in tr_descs:
assert info["hdmasterfingerprint"] in desc
wallet.getnewaddress(address_type="bech32m")
wallet.unloadwallet()
# Check that no automatic upgrade broke the downgrading the wallet

View File

@@ -0,0 +1,123 @@
#!/usr/bin/env python3
# Copyright (c) 2023 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test wallet createwalletdescriptor RPC."""
from test_framework.descriptors import descsum_create
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
from test_framework.wallet_util import WalletUnlock
class WalletCreateDescriptorTest(BitcoinTestFramework):
def add_options(self, parser):
self.add_wallet_options(parser, descriptors=True, legacy=False)
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.test_basic()
self.test_imported_other_keys()
self.test_encrypted()
def test_basic(self):
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].createwallet("blank", blank=True)
wallet = self.nodes[0].get_wallet_rpc("blank")
xpub_info = def_wallet.gethdkeys(private=True)
xpub = xpub_info[0]["xpub"]
xprv = xpub_info[0]["xprv"]
expected_descs = []
for desc in def_wallet.listdescriptors()["descriptors"]:
if desc["desc"].startswith("wpkh("):
expected_descs.append(desc["desc"])
assert_raises_rpc_error(-5, "Unable to determine which HD key to use from active descriptors. Please specify with 'hdkey'", wallet.createwalletdescriptor, "bech32")
assert_raises_rpc_error(-5, f"Private key for {xpub} is not known", wallet.createwalletdescriptor, type="bech32", hdkey=xpub)
self.log.info("Test createwalletdescriptor after importing active descriptor to blank wallet")
# Import one active descriptor
assert_equal(wallet.importdescriptors([{"desc": descsum_create(f"pkh({xprv}/44h/2h/0h/0/0/*)"), "timestamp": "now", "active": True}])[0]["success"], True)
assert_equal(len(wallet.listdescriptors()["descriptors"]), 1)
assert_equal(len(wallet.gethdkeys()), 1)
new_descs = wallet.createwalletdescriptor("bech32")["descs"]
assert_equal(len(new_descs), 2)
assert_equal(len(wallet.gethdkeys()), 1)
assert_equal(new_descs, expected_descs)
self.log.info("Test descriptor creation options")
old_descs = set([(d["desc"], d["active"], d["internal"]) for d in wallet.listdescriptors(private=True)["descriptors"]])
wallet.createwalletdescriptor(type="bech32m", internal=False)
curr_descs = set([(d["desc"], d["active"], d["internal"]) for d in wallet.listdescriptors(private=True)["descriptors"]])
new_descs = list(curr_descs - old_descs)
assert_equal(len(new_descs), 1)
assert_equal(len(wallet.gethdkeys()), 1)
assert_equal(new_descs[0][0], descsum_create(f"tr({xprv}/86h/1h/0h/0/*)"))
assert_equal(new_descs[0][1], True)
assert_equal(new_descs[0][2], False)
old_descs = curr_descs
wallet.createwalletdescriptor(type="bech32m", internal=True)
curr_descs = set([(d["desc"], d["active"], d["internal"]) for d in wallet.listdescriptors(private=True)["descriptors"]])
new_descs = list(curr_descs - old_descs)
assert_equal(len(new_descs), 1)
assert_equal(len(wallet.gethdkeys()), 1)
assert_equal(new_descs[0][0], descsum_create(f"tr({xprv}/86h/1h/0h/1/*)"))
assert_equal(new_descs[0][1], True)
assert_equal(new_descs[0][2], True)
def test_imported_other_keys(self):
self.log.info("Test createwalletdescriptor with multiple keys in active descriptors")
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].createwallet("multiple_keys")
wallet = self.nodes[0].get_wallet_rpc("multiple_keys")
wallet_xpub = wallet.gethdkeys()[0]["xpub"]
xpub_info = def_wallet.gethdkeys(private=True)
xpub = xpub_info[0]["xpub"]
xprv = xpub_info[0]["xprv"]
assert_equal(wallet.importdescriptors([{"desc": descsum_create(f"wpkh({xprv}/0/0/*)"), "timestamp": "now", "active": True}])[0]["success"], True)
assert_equal(len(wallet.gethdkeys()), 2)
assert_raises_rpc_error(-5, "Unable to determine which HD key to use from active descriptors. Please specify with 'hdkey'", wallet.createwalletdescriptor, "bech32")
assert_raises_rpc_error(-4, "Descriptor already exists", wallet.createwalletdescriptor, type="bech32m", hdkey=wallet_xpub)
assert_raises_rpc_error(-5, "Unable to parse HD key. Please provide a valid xpub", wallet.createwalletdescriptor, type="bech32m", hdkey=xprv)
# Able to replace tr() descriptor with other hd key
wallet.createwalletdescriptor(type="bech32m", hdkey=xpub)
def test_encrypted(self):
self.log.info("Test createwalletdescriptor with encrypted wallets")
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].createwallet("encrypted", blank=True, passphrase="pass")
wallet = self.nodes[0].get_wallet_rpc("encrypted")
xpub_info = def_wallet.gethdkeys(private=True)
xprv = xpub_info[0]["xprv"]
with WalletUnlock(wallet, "pass"):
assert_equal(wallet.importdescriptors([{"desc": descsum_create(f"wpkh({xprv}/0/0/*)"), "timestamp": "now", "active": True}])[0]["success"], True)
assert_equal(len(wallet.gethdkeys()), 1)
assert_raises_rpc_error(-13, "Error: Please enter the wallet passphrase with walletpassphrase first.", wallet.createwalletdescriptor, type="bech32m")
with WalletUnlock(wallet, "pass"):
wallet.createwalletdescriptor(type="bech32m")
if __name__ == '__main__':
WalletCreateDescriptorTest().main()

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python3
# Copyright (c) 2023 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test wallet gethdkeys RPC."""
from test_framework.descriptors import descsum_create
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
from test_framework.wallet_util import WalletUnlock
class WalletGetHDKeyTest(BitcoinTestFramework):
def add_options(self, parser):
self.add_wallet_options(parser, descriptors=True, legacy=False)
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
self.test_basic_gethdkeys()
self.test_ranged_imports()
self.test_lone_key_imports()
self.test_ranged_multisig()
self.test_mixed_multisig()
def test_basic_gethdkeys(self):
self.log.info("Test gethdkeys basics")
self.nodes[0].createwallet("basic")
wallet = self.nodes[0].get_wallet_rpc("basic")
xpub_info = wallet.gethdkeys()
assert_equal(len(xpub_info), 1)
assert_equal(xpub_info[0]["has_private"], True)
assert "xprv" not in xpub_info[0]
xpub = xpub_info[0]["xpub"]
xpub_info = wallet.gethdkeys(private=True)
xprv = xpub_info[0]["xprv"]
assert_equal(xpub_info[0]["xpub"], xpub)
assert_equal(xpub_info[0]["has_private"], True)
descs = wallet.listdescriptors(True)
for desc in descs["descriptors"]:
assert xprv in desc["desc"]
self.log.info("HD pubkey can be retrieved from encrypted wallets")
prev_xprv = xprv
wallet.encryptwallet("pass")
# HD key is rotated on encryption, there should now be 2 HD keys
assert_equal(len(wallet.gethdkeys()), 2)
# New key is active, should be able to get only that one and its descriptors
xpub_info = wallet.gethdkeys(active_only=True)
assert_equal(len(xpub_info), 1)
assert xpub_info[0]["xpub"] != xpub
assert "xprv" not in xpub_info[0]
assert_equal(xpub_info[0]["has_private"], True)
self.log.info("HD privkey can be retrieved from encrypted wallets")
assert_raises_rpc_error(-13, "Error: Please enter the wallet passphrase with walletpassphrase first", wallet.gethdkeys, private=True)
with WalletUnlock(wallet, "pass"):
xpub_info = wallet.gethdkeys(active_only=True, private=True)[0]
assert xpub_info["xprv"] != xprv
for desc in wallet.listdescriptors(True)["descriptors"]:
if desc["active"]:
# After encrypting, HD key was rotated and should appear in all active descriptors
assert xpub_info["xprv"] in desc["desc"]
else:
# Inactive descriptors should have the previous HD key
assert prev_xprv in desc["desc"]
def test_ranged_imports(self):
self.log.info("Keys of imported ranged descriptors appear in gethdkeys")
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].createwallet("imports")
wallet = self.nodes[0].get_wallet_rpc("imports")
xpub_info = wallet.gethdkeys()
assert_equal(len(xpub_info), 1)
active_xpub = xpub_info[0]["xpub"]
import_xpub = def_wallet.gethdkeys(active_only=True)[0]["xpub"]
desc_import = def_wallet.listdescriptors(True)["descriptors"]
for desc in desc_import:
desc["active"] = False
wallet.importdescriptors(desc_import)
assert_equal(wallet.gethdkeys(active_only=True), xpub_info)
xpub_info = wallet.gethdkeys()
assert_equal(len(xpub_info), 2)
for x in xpub_info:
if x["xpub"] == active_xpub:
for desc in x["descriptors"]:
assert_equal(desc["active"], True)
elif x["xpub"] == import_xpub:
for desc in x["descriptors"]:
assert_equal(desc["active"], False)
else:
assert False
def test_lone_key_imports(self):
self.log.info("Non-HD keys do not appear in gethdkeys")
self.nodes[0].createwallet("lonekey", blank=True)
wallet = self.nodes[0].get_wallet_rpc("lonekey")
assert_equal(wallet.gethdkeys(), [])
wallet.importdescriptors([{"desc": descsum_create("wpkh(cTe1f5rdT8A8DFgVWTjyPwACsDPJM9ff4QngFxUixCSvvbg1x6sh)"), "timestamp": "now"}])
assert_equal(wallet.gethdkeys(), [])
self.log.info("HD keys of non-ranged descriptors should appear in gethdkeys")
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
xpub_info = def_wallet.gethdkeys(private=True)
xpub = xpub_info[0]["xpub"]
xprv = xpub_info[0]["xprv"]
prv_desc = descsum_create(f"wpkh({xprv})")
pub_desc = descsum_create(f"wpkh({xpub})")
assert_equal(wallet.importdescriptors([{"desc": prv_desc, "timestamp": "now"}])[0]["success"], True)
xpub_info = wallet.gethdkeys()
assert_equal(len(xpub_info), 1)
assert_equal(xpub_info[0]["xpub"], xpub)
assert_equal(len(xpub_info[0]["descriptors"]), 1)
assert_equal(xpub_info[0]["descriptors"][0]["desc"], pub_desc)
assert_equal(xpub_info[0]["descriptors"][0]["active"], False)
def test_ranged_multisig(self):
self.log.info("HD keys of a multisig appear in gethdkeys")
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].createwallet("ranged_multisig")
wallet = self.nodes[0].get_wallet_rpc("ranged_multisig")
xpub1 = wallet.gethdkeys()[0]["xpub"]
xprv1 = wallet.gethdkeys(private=True)[0]["xprv"]
xpub2 = def_wallet.gethdkeys()[0]["xpub"]
prv_multi_desc = descsum_create(f"wsh(multi(2,{xprv1}/*,{xpub2}/*))")
pub_multi_desc = descsum_create(f"wsh(multi(2,{xpub1}/*,{xpub2}/*))")
assert_equal(wallet.importdescriptors([{"desc": prv_multi_desc, "timestamp": "now"}])[0]["success"], True)
xpub_info = wallet.gethdkeys()
assert_equal(len(xpub_info), 2)
for x in xpub_info:
if x["xpub"] == xpub1:
found_desc = next((d for d in xpub_info[0]["descriptors"] if d["desc"] == pub_multi_desc), None)
assert found_desc is not None
assert_equal(found_desc["active"], False)
elif x["xpub"] == xpub2:
assert_equal(len(x["descriptors"]), 1)
assert_equal(x["descriptors"][0]["desc"], pub_multi_desc)
assert_equal(x["descriptors"][0]["active"], False)
else:
assert False
def test_mixed_multisig(self):
self.log.info("Non-HD keys of a multisig do not appear in gethdkeys")
def_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name)
self.nodes[0].createwallet("single_multisig")
wallet = self.nodes[0].get_wallet_rpc("single_multisig")
xpub = wallet.gethdkeys()[0]["xpub"]
xprv = wallet.gethdkeys(private=True)[0]["xprv"]
pub = def_wallet.getaddressinfo(def_wallet.getnewaddress())["pubkey"]
prv_multi_desc = descsum_create(f"wsh(multi(2,{xprv},{pub}))")
pub_multi_desc = descsum_create(f"wsh(multi(2,{xpub},{pub}))")
import_res = wallet.importdescriptors([{"desc": prv_multi_desc, "timestamp": "now"}])
assert_equal(import_res[0]["success"], True)
xpub_info = wallet.gethdkeys()
assert_equal(len(xpub_info), 1)
assert_equal(xpub_info[0]["xpub"], xpub)
found_desc = next((d for d in xpub_info[0]["descriptors"] if d["desc"] == pub_multi_desc), None)
assert found_desc is not None
assert_equal(found_desc["active"], False)
if __name__ == '__main__':
WalletGetHDKeyTest().main()