test: improve rpc_gettxspendingprevout.py code

Grouped changes to improve the overall readability and maintainability of the test.
A lot more can be done, but this is a good first step.

1) Use for-loops instead of duplicating lines to perform the same checks for each
   node.

2) The {'txid': x, 'vout': y} dict is repeated everywhere in the test, both as
   input to gettxspendingprevout and as part of its result when an output has no
   known spender, making the test tedious to read and maintain.

   This introduces a prevout(txid, vout) query helper and an unspent_out(txid, vout)
   result helper to reduce the repetition. These two helpers are intentionally kept
   separate to make it immediately clear whether a dict is an input to
   gettxspendingprevout or an assertion on its result.

3) The same repetition problem mentioned above applies to other gettxspendingprevout
   possible results:
   Spent outputs returns {'txid': x, 'vout': y, 'spendingtxid': z} and
   Spent outputs when requesting spending tx returns {'txid': x, 'vout': y,
   'spendingtxid': z, 'blockhash': w, 'spendingtx': v}

   To fix it, this introduces:
   - spent_out(txid, vout, spending_tx_id): for outputs with a known spender
   - spent_out_in_block(txid, vout, spending_tx_id, blockhash, spending_tx): for
     outputs spent in a confirmed block, when full tx data is requested

4) Rename overloaded confirmed_utxo variable (used in three different tests) to more
   descriptive names: root_utxo, reorg_replace_utxo, reorg_cancel_utxo to clarify
   their roles in each of the tests.
This commit is contained in:
furszy
2026-02-22 02:08:06 -03:00
parent d9c7364ac5
commit ac3bea07cd

View File

@@ -11,6 +11,24 @@ from test_framework.util import (
)
from test_framework.wallet import MiniWallet
#### Query Helpers ####
def prevout(txid, vout):
"""Build a prevout query dict for use with gettxspendingprevout"""
return {'txid': txid, 'vout': vout}
#### Result Helpers ####
def unspent_out(txid, vout):
"""Expected result for an available output (not spent)"""
return {'txid': txid, 'vout': vout}
def spent_out(txid, vout, spending_tx_id):
"""Expected result for an output with a known spender"""
return {'txid': txid, 'vout': vout, 'spendingtxid': spending_tx_id}
def spent_out_in_block(txid, vout, spending_tx_id, blockhash, spending_tx):
"""Expected result for an output spent in a confirmed block, with full tx data"""
return {'txid': txid, 'vout': vout, 'spendingtxid': spending_tx_id, 'blockhash': blockhash, 'spendingtx': spending_tx}
class GetTxSpendingPrevoutTest(BitcoinTestFramework):
def set_test_params(self):
@@ -23,8 +41,13 @@ class GetTxSpendingPrevoutTest(BitcoinTestFramework):
]
def run_test(self):
self.wallet = MiniWallet(self.nodes[0])
confirmed_utxo = self.wallet.get_utxo()
node0, node1, node2 = self.nodes
self.wallet = MiniWallet(node0)
root_utxo = self.wallet.get_utxo()
txid_root_utxo = root_utxo['txid']
def create_tx(**kwargs):
return self.wallet.send_self_transfer_multi(from_node=node0, **kwargs)
# Create a tree of unconfirmed transactions in the mempool:
# txA
@@ -41,145 +64,150 @@ class GetTxSpendingPrevoutTest(BitcoinTestFramework):
# \ /
# txH
def create_tx(**kwargs):
return self.wallet.send_self_transfer_multi(
from_node=self.nodes[0],
**kwargs,
)
txA = create_tx(utxos_to_spend=[confirmed_utxo], num_outputs=2)
txA = create_tx(utxos_to_spend=[root_utxo], num_outputs=2)
txB = create_tx(utxos_to_spend=[txA["new_utxos"][0]], num_outputs=2)
txC = create_tx(utxos_to_spend=[txA["new_utxos"][1]], num_outputs=2)
txD = create_tx(utxos_to_spend=[txB["new_utxos"][0]], num_outputs=1)
txE = create_tx(utxos_to_spend=[txB["new_utxos"][1]], num_outputs=1)
txF = create_tx(utxos_to_spend=[txC["new_utxos"][0]], num_outputs=2)
txG = create_tx(utxos_to_spend=[txC["new_utxos"][1]], num_outputs=1)
txH = create_tx(utxos_to_spend=[txE["new_utxos"][0],txF["new_utxos"][0]], num_outputs=1)
txidA, txidB, txidC, txidD, txidE, txidF, txidG, txidH = [
tx["txid"] for tx in [txA, txB, txC, txD, txE, txF, txG, txH]
]
txH = create_tx(utxos_to_spend=[txE["new_utxos"][0], txF["new_utxos"][0]], num_outputs=1)
mempool = self.nodes[0].getrawmempool()
txs = [txA, txB, txC, txD, txE, txF, txG, txH]
txidA, txidB, txidC, txidD, txidE, txidF, txidG, txidH = [tx["txid"] for tx in txs]
mempool = node0.getrawmempool()
assert_equal(len(mempool), 8)
for txid in [txidA, txidB, txidC, txidD, txidE, txidF, txidG, txidH]:
assert_equal(txid in mempool, True)
for tx in txs:
assert tx["txid"] in mempool
self.log.info("Find transactions spending outputs")
# wait until spending transactions are found in the mempool of node 0, 1 and 2
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ])
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : txidA}, {'txid' : txidA, 'vout' : 1, 'spendingtxid' : txidC} ])
self.wait_until(lambda: self.nodes[1].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ]) == result)
self.wait_until(lambda: self.nodes[2].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ], mempool_only=True) == result)
result = node0.gettxspendingprevout([prevout(txid_root_utxo, vout=0), prevout(txidA, vout=1)])
assert_equal(result, [spent_out(txid_root_utxo, vout=0, spending_tx_id=txidA),
spent_out(txidA, vout=1, spending_tx_id=txidC)])
self.wait_until(lambda: node1.gettxspendingprevout([prevout(txid_root_utxo, vout=0), prevout(txidA, vout=1)]) == result)
self.wait_until(lambda: node2.gettxspendingprevout([prevout(txid_root_utxo, vout=0), prevout(txidA, vout=1)], mempool_only=True) == result)
self.log.info("Find transaction spending multiple outputs")
result = self.nodes[0].gettxspendingprevout([ {'txid' : txidE, 'vout' : 0}, {'txid' : txidF, 'vout' : 0} ])
assert_equal(result, [ {'txid' : txidE, 'vout' : 0, 'spendingtxid' : txidH}, {'txid' : txidF, 'vout' : 0, 'spendingtxid' : txidH} ])
result = node0.gettxspendingprevout([prevout(txidE, vout=0), prevout(txidF, vout=0)])
assert_equal(result, [spent_out(txidE, vout=0, spending_tx_id=txidH),
spent_out(txidF, vout=0, spending_tx_id=txidH)])
self.log.info("Find no transaction when output is unspent")
result = self.nodes[0].gettxspendingprevout([ {'txid' : txidH, 'vout' : 0} ])
assert_equal(result, [ {'txid' : txidH, 'vout' : 0} ])
result = self.nodes[0].gettxspendingprevout([ {'txid' : txidA, 'vout' : 5} ])
assert_equal(result, [ {'txid' : txidA, 'vout' : 5} ])
result = self.nodes[1].gettxspendingprevout([ {'txid' : txidA, 'vout' : 5} ])
assert_equal(result, [ {'txid' : txidA, 'vout' : 5} ])
result = self.nodes[2].gettxspendingprevout([ {'txid' : txidA, 'vout' : 5} ])
assert_equal(result, [ {'txid' : txidA, 'vout' : 5} ])
assert_equal(node0.gettxspendingprevout([prevout(txidH, vout=0)]), [unspent_out(txidH, vout=0)])
for node in self.nodes:
assert_equal(node.gettxspendingprevout([prevout(txidA, vout=5)]), [unspent_out(txidA, vout=5)])
self.log.info("Mixed spent and unspent outputs")
result = self.nodes[0].gettxspendingprevout([ {'txid' : txidB, 'vout' : 0}, {'txid' : txidG, 'vout' : 3} ])
assert_equal(result, [ {'txid' : txidB, 'vout' : 0, 'spendingtxid' : txidD}, {'txid' : txidG, 'vout' : 3} ])
result = node0.gettxspendingprevout([prevout(txidB, vout=0), prevout(txidG, vout=3)])
assert_equal(result, [spent_out(txidB, vout=0, spending_tx_id=txidD),
unspent_out(txidG, vout=3)])
self.log.info("Unknown input fields")
assert_raises_rpc_error(-3, "Unexpected key unknown", self.nodes[0].gettxspendingprevout, [{'txid' : txidC, 'vout' : 1, 'unknown' : 42}])
assert_raises_rpc_error(-3, "Unexpected key unknown", node0.gettxspendingprevout, [{'txid' : txidC, 'vout' : 1, 'unknown' : 42}])
self.log.info("Invalid vout provided")
assert_raises_rpc_error(-8, "Invalid parameter, vout cannot be negative", self.nodes[0].gettxspendingprevout, [{'txid' : txidA, 'vout' : -1}])
assert_raises_rpc_error(-8, "Invalid parameter, vout cannot be negative", node0.gettxspendingprevout, [prevout(txidA, vout=-1)])
self.log.info("Invalid txid provided")
assert_raises_rpc_error(-3, "JSON value of type number for field txid is not of expected type string", self.nodes[0].gettxspendingprevout, [{'txid' : 42, 'vout' : 0}])
assert_raises_rpc_error(-3, "JSON value of type number for field txid is not of expected type string", node0.gettxspendingprevout, [prevout(42, vout=0)])
self.log.info("Missing outputs")
assert_raises_rpc_error(-8, "Invalid parameter, outputs are missing", self.nodes[0].gettxspendingprevout, [])
assert_raises_rpc_error(-8, "Invalid parameter, outputs are missing", node0.gettxspendingprevout, [])
self.log.info("Missing vout")
assert_raises_rpc_error(-3, "Missing vout", self.nodes[0].gettxspendingprevout, [{'txid' : txidA}])
assert_raises_rpc_error(-3, "Missing vout", node0.gettxspendingprevout, [{'txid' : txidA}])
self.log.info("Missing txid")
assert_raises_rpc_error(-3, "Missing txid", self.nodes[0].gettxspendingprevout, [{'vout' : 3}])
assert_raises_rpc_error(-3, "Missing txid", node0.gettxspendingprevout, [{'vout' : 3}])
blockhash = self.generate(self.wallet, 1)[0]
# spending transactions are found in the index of nodes 0 and 1 but not node 2
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : txidA, 'blockhash' : blockhash, 'spendingtx' : txA['hex']}, {'txid' : txidA, 'vout' : 1, 'spendingtxid' : txidC, 'blockhash' : blockhash, 'spendingtx' : txC['hex']} ])
result = self.nodes[1].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : txidA, 'blockhash' : blockhash, 'spendingtx' : txA['hex']}, {'txid' : txidA, 'vout' : 1, 'spendingtxid' : txidC, 'blockhash' : blockhash, 'spendingtx' : txC['hex']} ])
result = self.nodes[2].gettxspendingprevout([{ 'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1}])
for node in [node0, node1]:
result = node.gettxspendingprevout([prevout(txid_root_utxo, vout=0), prevout(txidA, vout=1)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(txid_root_utxo, vout=0, spending_tx_id=txidA, blockhash=blockhash, spending_tx=txA['hex']),
spent_out_in_block(txidA, vout=1, spending_tx_id=txidC, blockhash=blockhash, spending_tx=txC['hex'])])
# Spending transactions not in node 2 (no spending index)
result = node2.gettxspendingprevout([prevout(txid_root_utxo, vout=0), prevout(txidA, vout=1)], return_spending_tx=True)
assert_equal(result, [unspent_out(txid_root_utxo, vout=0), unspent_out(txidA, vout=1)])
# spending transaction is not found if we only search the mempool
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1} ], return_spending_tx=True, mempool_only=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0}, {'txid' : txidA, 'vout' : 1}])
result = node0.gettxspendingprevout([prevout(txid_root_utxo, vout=0), prevout(txidA, vout=1)], return_spending_tx=True, mempool_only=True)
assert_equal(result, [unspent_out(txid_root_utxo, vout=0), unspent_out(txidA, vout=1)])
self.log.info("Check that our txospenderindex is updated when a reorg replaces a spending transaction")
confirmed_utxo = self.wallet.get_utxo(mark_as_spent = False)
tx1 = create_tx(utxos_to_spend=[confirmed_utxo], num_outputs=1)
reorg_replace_utxo = self.wallet.get_utxo(mark_as_spent=False)
txid_reorg_replace_utxo = reorg_replace_utxo['txid']
tx1 = create_tx(utxos_to_spend=[reorg_replace_utxo], num_outputs=1)
blockhash = self.generate(self.wallet, 1)[0]
# tx1 is confirmed, and indexed in txospenderindex as spending our utxo
assert tx1["txid"] not in self.nodes[0].getrawmempool()
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : tx1["txid"], 'blockhash' : blockhash, 'spendingtx' : tx1['hex']} ])
# replace tx1 with tx2
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
self.nodes[1].invalidateblock(self.nodes[1].getbestblockhash())
self.nodes[2].invalidateblock(self.nodes[2].getbestblockhash())
assert tx1["txid"] in self.nodes[0].getrawmempool()
assert tx1["txid"] in self.nodes[1].getrawmempool()
tx2 = create_tx(utxos_to_spend=[confirmed_utxo], num_outputs=2)
assert tx2["txid"] in self.nodes[0].getrawmempool()
assert tx1["txid"] not in node0.getrawmempool()
result = node0.gettxspendingprevout([prevout(txid_reorg_replace_utxo, vout=0)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(txid_reorg_replace_utxo, vout=0, spending_tx_id=tx1["txid"], blockhash=blockhash, spending_tx=tx1['hex'])])
# replace tx1 with tx2 triggering a "reorg"
best_block_hash = node0.getbestblockhash()
for node in self.nodes:
node.invalidateblock(best_block_hash)
assert tx1["txid"] in node.getrawmempool()
# create and submit replacement
tx2 = create_tx(utxos_to_spend=[reorg_replace_utxo], num_outputs=2)
assert tx2["txid"] in node0.getrawmempool()
# check that when we find tx2 when we look in the mempool for a tx spending our output
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : tx2["txid"], 'spendingtx' : tx2['hex']} ])
result = node0.gettxspendingprevout([prevout(txid_reorg_replace_utxo, vout=0)], return_spending_tx=True)
assert_equal(result, [ {'txid' : txid_reorg_replace_utxo, 'vout' : 0, 'spendingtxid' : tx2["txid"], 'spendingtx' : tx2['hex']} ])
# check that our txospenderindex has been updated
blockhash = self.generate(self.wallet, 1)[0]
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : tx2["txid"], 'blockhash' : blockhash, 'spendingtx' : tx2['hex']} ])
result = node0.gettxspendingprevout([prevout(txid_reorg_replace_utxo, vout=0)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(txid_reorg_replace_utxo, vout=0, spending_tx_id=tx2["txid"], blockhash=blockhash, spending_tx=tx2['hex'])])
self.log.info("Check that our txospenderindex is updated when a reorg cancels a spending transaction")
confirmed_utxo = self.wallet.get_utxo(mark_as_spent = False)
tx1 = create_tx(utxos_to_spend=[confirmed_utxo], num_outputs=1)
reorg_cancel_utxo = self.wallet.get_utxo(mark_as_spent=False)
txid_reorg_cancel_utxo = reorg_cancel_utxo['txid']
tx1 = create_tx(utxos_to_spend=[reorg_cancel_utxo], num_outputs=1)
tx2 = create_tx(utxos_to_spend=[tx1["new_utxos"][0]], num_outputs=1)
# tx1 spends our utxo, tx2 spends tx1
blockhash = self.generate(self.wallet, 1)[0]
# tx1 and tx2 are confirmed, and indexed in txospenderindex
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : tx1["txid"], 'blockhash' : blockhash, 'spendingtx' : tx1['hex']} ])
result = self.nodes[0].gettxspendingprevout([ {'txid' : tx1['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : tx1['txid'], 'vout' : 0, 'spendingtxid' : tx2["txid"], 'blockhash' : blockhash, 'spendingtx' : tx2['hex']} ])
result = node0.gettxspendingprevout([prevout(txid_reorg_cancel_utxo, vout=0)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(txid_reorg_cancel_utxo, vout=0, spending_tx_id=tx1["txid"], blockhash=blockhash, spending_tx=tx1['hex'])])
result = node0.gettxspendingprevout([prevout(tx1['txid'], vout=0)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(tx1['txid'], vout=0, spending_tx_id=tx2["txid"], blockhash=blockhash, spending_tx=tx2['hex'])])
# replace tx1 with tx3
blockhash= self.nodes[0].getbestblockhash()
self.nodes[0].invalidateblock(blockhash)
self.nodes[1].invalidateblock(blockhash)
self.nodes[2].invalidateblock(blockhash)
tx3 = create_tx(utxos_to_spend=[confirmed_utxo], num_outputs=2, fee_per_output=2000)
assert tx3["txid"] in self.nodes[0].getrawmempool()
assert tx1["txid"] not in self.nodes[0].getrawmempool()
assert tx2["txid"] not in self.nodes[0].getrawmempool()
blockhash = node0.getbestblockhash()
for node in self.nodes:
node.invalidateblock(blockhash)
tx3 = create_tx(utxos_to_spend=[reorg_cancel_utxo], num_outputs=2, fee_per_output=2000)
node0_mempool = node0.getrawmempool()
assert tx3["txid"] in node0_mempool
assert tx1["txid"] not in node0_mempool
assert tx2["txid"] not in node0_mempool
# tx2 is not in the mempool anymore, but still in txospender index which has not been rewound yet
result = self.nodes[0].gettxspendingprevout([ {'txid' : tx1['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : tx1['txid'], 'vout' : 0, 'spendingtxid' : tx2["txid"], 'blockhash' : blockhash, 'spendingtx' : tx2['hex']} ])
txinfo = self.nodes[0].getrawtransaction(tx2["txid"], verbose = True, blockhash = blockhash)
result = node0.gettxspendingprevout([prevout(tx1['txid'], vout=0)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(tx1['txid'], vout=0, spending_tx_id=tx2["txid"], blockhash=blockhash, spending_tx=tx2['hex'])])
txinfo = node0.getrawtransaction(tx2["txid"], verbose = True, blockhash = blockhash)
assert_equal(txinfo["confirmations"], 0)
assert_equal(txinfo["in_active_chain"], False)
blockhash = self.generate(self.wallet, 1)[0]
# we check that the spending tx for tx1 is now tx3
result = self.nodes[0].gettxspendingprevout([ {'txid' : confirmed_utxo['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : confirmed_utxo['txid'], 'vout' : 0, 'spendingtxid' : tx3["txid"], 'blockhash' : blockhash, 'spendingtx' : tx3['hex']} ])
result = node0.gettxspendingprevout([prevout(txid_reorg_cancel_utxo, vout=0)], return_spending_tx=True)
assert_equal(result, [spent_out_in_block(txid_reorg_cancel_utxo, vout=0, spending_tx_id=tx3["txid"], blockhash=blockhash, spending_tx=tx3['hex'])])
# we check that there is no more spending tx for tx1
result = self.nodes[0].gettxspendingprevout([ {'txid' : tx1['txid'], 'vout' : 0} ], return_spending_tx=True)
assert_equal(result, [ {'txid' : tx1['txid'], 'vout' : 0} ])
result = node0.gettxspendingprevout([prevout(tx1['txid'], vout=0)], return_spending_tx=True)
assert_equal(result, [unspent_out(tx1['txid'], vout=0)])
if __name__ == '__main__':