test: prep manual equality assert conversions

The later scripted diff only handles plain `assert x == y` lines.
Some remaining tests still use equality inside comprehensions, parenthesized asserts, and other shapes that the line-based rewrite would misread.
Rewrite those sites by hand first so the later mechanical conversion stays safe.
The commit also simplifies the dead `len(["errors"]) == 0` branch in `blocktools.py`, which can never be true.
This commit is contained in:
Lőrinc
2026-03-08 21:22:43 +00:00
parent 4f4516e3f6
commit dcd90fbe54
16 changed files with 49 additions and 34 deletions

View File

@@ -81,7 +81,8 @@ class MempoolClusterTest(BitcoinTestFramework):
last_val = {"weight": 0, "fee": 0}
for x in feeratediagram:
# The weight is always positive, except for the first iteration
assert x['weight'] > 0 or x['fee'] == 0
assert (x['weight'] > 0
or x['fee'] == 0)
# Monotonically decreasing fee per weight
assert_greater_than_or_equal(last_val['fee'] * x['weight'], x['fee'] * last_val['weight'])
last_val = x

View File

@@ -179,7 +179,7 @@ class MempoolLimitTest(BitcoinTestFramework):
# Package should be submitted, temporarily exceeding maxmempool, and then evicted.
res = node.submitpackage(package_hex)
assert_equal(res["package_msg"], "transaction failed")
assert len([tx_res for _, tx_res in res["tx-results"].items() if "error" in tx_res and tx_res["error"] == "bad-txns-inputs-missingorspent"])
assert "bad-txns-inputs-missingorspent" in [tx_res["error"] for _, tx_res in res["tx-results"].items() if "error" in tx_res]
# Maximum size must never be exceeded.
assert_greater_than(node.getmempoolinfo()["maxmempool"], node.getmempoolinfo()["bytes"])

View File

@@ -294,7 +294,8 @@ class MempoolTRUC(BitcoinTestFramework):
tx_v3_child = self.wallet.create_self_transfer(utxo_to_spend=tx_v3_parent["new_utxo"], version=3)
tx_v3_grandchild = self.wallet.create_self_transfer(utxo_to_spend=tx_v3_child["new_utxo"], version=3)
result = node.testmempoolaccept([tx_v3_parent["hex"], tx_v3_child["hex"], tx_v3_grandchild["hex"]])
assert all([txresult["package-error"] == f"TRUC-violation, tx {tx_v3_grandchild['txid']} (wtxid={tx_v3_grandchild['wtxid']}) would have too many ancestors" for txresult in result])
for txresult in result:
assert_equal(txresult["package-error"], f"TRUC-violation, tx {tx_v3_grandchild['txid']} (wtxid={tx_v3_grandchild['wtxid']}) would have too many ancestors")
@cleanup(extra_args=None)
def test_truc_ancestors_package_and_mempool(self):
@@ -440,11 +441,13 @@ class MempoolTRUC(BitcoinTestFramework):
test_accept_v3_from_v2 = node.testmempoolaccept([tx_v2["hex"], tx_v3_from_v2["hex"]])
expected_error_v3_from_v2 = f"TRUC-violation, version=3 tx {tx_v3_from_v2['txid']} (wtxid={tx_v3_from_v2['wtxid']}) cannot spend from non-version=3 tx {tx_v2['txid']} (wtxid={tx_v2['wtxid']})"
assert all([result["package-error"] == expected_error_v3_from_v2 for result in test_accept_v3_from_v2])
for result in test_accept_v3_from_v2:
assert_equal(result["package-error"], expected_error_v3_from_v2)
test_accept_v2_from_v3 = node.testmempoolaccept([tx_v3["hex"], tx_v2_from_v3["hex"]])
expected_error_v2_from_v3 = f"TRUC-violation, non-version=3 tx {tx_v2_from_v3['txid']} (wtxid={tx_v2_from_v3['wtxid']}) cannot spend from version=3 tx {tx_v3['txid']} (wtxid={tx_v3['wtxid']})"
assert all([result["package-error"] == expected_error_v2_from_v3 for result in test_accept_v2_from_v3])
for result in test_accept_v2_from_v3:
assert_equal(result["package-error"], expected_error_v2_from_v3)
test_accept_pairs = node.testmempoolaccept([tx_v2["hex"], tx_v3["hex"], tx_v2_from_v2["hex"], tx_v3_from_v3["hex"]])
assert all([result["allowed"] for result in test_accept_pairs])
@@ -456,7 +459,8 @@ class MempoolTRUC(BitcoinTestFramework):
tx_v3_child_2 = self.wallet.create_self_transfer(utxo_to_spend=tx_v3_parent["new_utxos"][1], version=3)
test_accept_2children = node.testmempoolaccept([tx_v3_parent["hex"], tx_v3_child_1["hex"], tx_v3_child_2["hex"]])
expected_error_2children = f"TRUC-violation, tx {tx_v3_parent['txid']} (wtxid={tx_v3_parent['wtxid']}) would exceed descendant count limit"
assert all([result["package-error"] == expected_error_2children for result in test_accept_2children])
for result in test_accept_2children:
assert_equal(result["package-error"], expected_error_2children)
# Extra TRUC transaction does not get incorrectly marked as extra descendant
test_accept_1child_with_exra = node.testmempoolaccept([tx_v3_parent["hex"], tx_v3_child_1["hex"], tx_v3_independent["hex"]])
@@ -465,11 +469,13 @@ class MempoolTRUC(BitcoinTestFramework):
# Extra TRUC transaction does not make us ignore the extra descendant
test_accept_2children_with_exra = node.testmempoolaccept([tx_v3_parent["hex"], tx_v3_child_1["hex"], tx_v3_child_2["hex"], tx_v3_independent["hex"]])
expected_error_extra = f"TRUC-violation, tx {tx_v3_parent['txid']} (wtxid={tx_v3_parent['wtxid']}) would exceed descendant count limit"
assert all([result["package-error"] == expected_error_extra for result in test_accept_2children_with_exra])
for result in test_accept_2children_with_exra:
assert_equal(result["package-error"], expected_error_extra)
# Same result if the parent is already in mempool
node.sendrawtransaction(tx_v3_parent["hex"])
test_accept_2children_with_in_mempool_parent = node.testmempoolaccept([tx_v3_child_1["hex"], tx_v3_child_2["hex"]])
assert all([result["package-error"] == expected_error_extra for result in test_accept_2children_with_in_mempool_parent])
for result in test_accept_2children_with_in_mempool_parent:
assert_equal(result["package-error"], expected_error_extra)
@cleanup(extra_args=None)
def test_reorg_2child_rbf(self):

View File

@@ -132,18 +132,18 @@ class DisconnectBanTest(BitcoinTestFramework):
address1 = self.nodes[0].getpeerinfo()[0]['addr']
self.nodes[0].disconnectnode(address=address1)
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10)
assert not [node for node in self.nodes[0].getpeerinfo() if node['addr'] == address1]
assert address1 not in [node['addr'] for node in self.nodes[0].getpeerinfo()]
self.log.info("disconnectnode: successfully reconnect node")
self.connect_nodes(0, 1) # reconnect the node
assert_equal(len(self.nodes[0].getpeerinfo()), 2)
assert [node for node in self.nodes[0].getpeerinfo() if node['addr'] == address1]
assert address1 in [node['addr'] for node in self.nodes[0].getpeerinfo()]
self.log.info("disconnectnode: successfully disconnect node by node id")
id1 = self.nodes[0].getpeerinfo()[0]['id']
self.nodes[0].disconnectnode(nodeid=id1)
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10)
assert not [node for node in self.nodes[0].getpeerinfo() if node['id'] == id1]
assert id1 not in [node['id'] for node in self.nodes[0].getpeerinfo()]
if __name__ == '__main__':
DisconnectBanTest(__file__).main()

View File

@@ -499,7 +499,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.log.info("Checking abortprivatebroadcast removes a pending private-broadcast transaction")
tx_abort = wallet.create_self_transfer()
tx_originator.sendrawtransaction(hexstring=tx_abort["hex"], maxfeerate=0.1)
assert any(t["wtxid"] == tx_abort["wtxid"] for t in tx_originator.getprivatebroadcastinfo()["transactions"])
assert tx_abort["wtxid"] in [t["wtxid"] for t in tx_originator.getprivatebroadcastinfo()["transactions"]]
abort_res = tx_originator.abortprivatebroadcast(tx_abort["txid"])
assert_equal(len(abort_res["removed_transactions"]), 1)
assert_equal(abort_res["removed_transactions"][0]["txid"], tx_abort["txid"])

View File

@@ -166,7 +166,7 @@ class GetBlocksActivityTest(BitcoinTestFramework):
assert 'blockhash' not in unconfirmed
assert 'height' not in unconfirmed
assert any(a['txid'] == txid_2 for a in activity if not a.get('blockhash'))
assert txid_2 in [a['txid'] for a in activity if not a.get('blockhash')]
def test_receive_then_spend(self, node, wallet):
"""Also important because this tests multiple blockhashes."""

View File

@@ -267,7 +267,7 @@ def send_to_witness(use_p2wsh, node, utxo, pubkey, encode_p2sh, amount, sign=Tru
tx_to_witness = create_witness_tx(node, use_p2wsh, utxo, pubkey, encode_p2sh, amount)
if (sign):
signed = node.signrawtransactionwithwallet(tx_to_witness)
assert "errors" not in signed or len(["errors"]) == 0
assert "errors" not in signed
return node.sendrawtransaction(signed["hex"])
else:
if (insert_redeem_script):

View File

@@ -21,6 +21,7 @@ from .messages import (
)
from .crypto.ripemd160 import ripemd160
from .util import assert_equal
MAX_SCRIPT_ELEMENT_SIZE = 520
MAX_SCRIPT_SIZE = 10000
@@ -814,8 +815,8 @@ def BIP341_sha_outputs(txTo):
return sha256(b"".join(o.serialize() for o in txTo.vout))
def TaprootSignatureMsg(txTo, spent_utxos, hash_type, input_index=0, *, scriptpath=False, leaf_script=None, codeseparator_pos=-1, annex=None, leaf_ver=LEAF_VERSION_TAPSCRIPT):
assert (len(txTo.vin) == len(spent_utxos))
assert (input_index < len(txTo.vin))
assert_equal(len(txTo.vin), len(spent_utxos))
assert input_index < len(txTo.vin)
out_type = SIGHASH_ALL if hash_type == 0 else hash_type & 3
in_type = hash_type & SIGHASH_ANYONECANPAY
spk = spent_utxos[input_index].scriptPubKey

View File

@@ -673,7 +673,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
`[{"txid": txid, "vout": vout1}, {"txid": txid, "vout": vout2}, ...]`.
The result can be used to specify inputs for RPCs like `createrawtransaction`,
`createpsbt`, `lockunspent` etc."""
assert all(len(output.keys()) == 1 for output in outputs)
for output in outputs:
assert_equal(len(output.keys()), 1)
send_res = node.send(outputs)
assert send_res["complete"]
utxos = []

View File

@@ -35,7 +35,8 @@ class ToolBitcoinTest(BitcoinTestFramework):
self.add_nodes(self.num_nodes, self.extra_args)
node_argv = self.get_binaries().node_argv()
self.node_options = [node.args[len(node_argv):] for node in self.nodes]
assert all(node.args[:len(node_argv)] == node_argv for node in self.nodes)
for node in self.nodes:
assert_equal(node.args[:len(node_argv)], node_argv)
def set_cmd_args(self, node, args):
"""Set up node so it will be started through bitcoin wrapper command with specified arguments."""

View File

@@ -146,7 +146,8 @@ class TxConflicts(BitcoinTestFramework):
unspents = alice.listunspent()
assert_equal(len(unspents), 3)
assert all([tx["amount"] == 25 for tx in unspents])
for tx in unspents:
assert_equal(tx["amount"], 25)
# tx1 spends unspent[0] and unspent[1]
raw_tx = alice.createrawtransaction(inputs=[unspents[0], unspents[1]], outputs=[{bob.getnewaddress() : 49.9999}])
@@ -336,7 +337,8 @@ class TxConflicts(BitcoinTestFramework):
unspents = alice.listunspent()
assert_equal(len(unspents), 2)
assert all([tx["amount"] == 25 for tx in unspents])
for tx in unspents:
assert_equal(tx["amount"], 25)
assert_equal(alice.getrawmempool(), [])

View File

@@ -90,13 +90,13 @@ class ImportPrunedFundsTest(BitcoinTestFramework):
wwatch = self.nodes[1].get_wallet_rpc('wwatch')
wwatch.importdescriptors([{"desc": self.nodes[0].getaddressinfo(address2)["desc"], "timestamp": "now"}])
wwatch.importprunedfunds(rawtransaction=rawtxn2, txoutproof=proof2)
assert [tx for tx in wwatch.listtransactions() if tx['txid'] == txnid2]
assert txnid2 in [tx['txid'] for tx in wwatch.listtransactions()]
# Import with private key with no rescan
w1 = self.nodes[1].get_wallet_rpc(self.default_wallet_name)
wallet_importprivkey(w1, address3_privkey, "now")
w1.importprunedfunds(rawtxn3, proof3)
assert [tx for tx in w1.listtransactions() if tx['txid'] == txnid3]
assert txnid3 in [tx['txid'] for tx in w1.listtransactions()]
balance3 = w1.getbalance()
assert_equal(balance3, Decimal('0.025'))
@@ -110,13 +110,13 @@ class ImportPrunedFundsTest(BitcoinTestFramework):
# Remove transactions
assert_raises_rpc_error(-4, f'Transaction {txnid1} does not belong to this wallet', w1.removeprunedfunds, txnid1)
assert not [tx for tx in w1.listtransactions() if tx['txid'] == txnid1]
assert txnid1 not in [tx['txid'] for tx in w1.listtransactions()]
wwatch.removeprunedfunds(txnid2)
assert not [tx for tx in wwatch.listtransactions() if tx['txid'] == txnid2]
assert txnid2 not in [tx['txid'] for tx in wwatch.listtransactions()]
w1.removeprunedfunds(txnid3)
assert not [tx for tx in w1.listtransactions() if tx['txid'] == txnid3]
assert txnid3 not in [tx['txid'] for tx in w1.listtransactions()]
# Check various RPC parameter validation errors
assert_raises_rpc_error(-22, "TX decode failed", w1.importprunedfunds, b'invalid tx'.hex(), proof1)

View File

@@ -272,7 +272,7 @@ class ListSinceBlockTest(BitcoinTestFramework):
# listsinceblock(lastblockhash) should now include txid1, as seen from nodes[0]
lsbres = self.nodes[0].listsinceblock(lastblockhash)
assert any(tx['txid'] == txid1 for tx in lsbres['removed'])
assert txid1 in [tx['txid'] for tx in lsbres['removed']]
# but it should not include 'removed' if include_removed=false
lsbres2 = self.nodes[0].listsinceblock(blockhash=lastblockhash, include_removed=False)
@@ -354,8 +354,8 @@ class ListSinceBlockTest(BitcoinTestFramework):
# listsinceblock(lastblockhash) should now include txid1 in transactions
# as well as in removed
lsbres = self.nodes[0].listsinceblock(lastblockhash)
assert any(tx['txid'] == txid1 for tx in lsbres['transactions'])
assert any(tx['txid'] == txid1 for tx in lsbres['removed'])
assert txid1 in [tx['txid'] for tx in lsbres['transactions']]
assert txid1 in [tx['txid'] for tx in lsbres['removed']]
# find transaction and ensure confirmations is valid
for tx in lsbres['transactions']:
@@ -463,14 +463,14 @@ class ListSinceBlockTest(BitcoinTestFramework):
# If we don't list change, we won't have an entry for it.
coins = self.nodes[2].listsinceblock(blockhash=block_hash)["transactions"]
assert not any(c["address"] == addr for c in coins)
assert addr not in [c["address"] for c in coins]
# Now if we list change, we'll get both the send (to a change address) and
# the actual change.
res = self.nodes[2].listsinceblock(blockhash=block_hash, include_change=True)
coins = [entry for entry in res["transactions"] if entry["category"] == "receive"]
assert_equal(len(coins), 2)
assert any(c["address"] == addr for c in coins)
assert addr in [c["address"] for c in coins]
assert all(self.nodes[2].getaddressinfo(c["address"])["ischange"] for c in coins)
def test_op_return(self):

View File

@@ -982,7 +982,7 @@ class WalletMigrationTest(BitcoinTestFramework):
# Also, the watch-only wallet should have the descriptor for the standard sh(pkh())
desc = descsum_create(f"addr({addy_script_sh_pkh})")
assert next(it['desc'] for it in wallet_wo.listdescriptors()['descriptors'] if it['desc'] == desc)
assert desc in [it['desc'] for it in wallet_wo.listdescriptors()['descriptors']]
# And doesn't have a descriptor for the invalid one
desc_invalid = descsum_create(f"addr({addy_script_double_sh_pkh})")
assert_equal(next((it['desc'] for it in wallet_wo.listdescriptors()['descriptors'] if it['desc'] == desc_invalid), None), None)

View File

@@ -86,9 +86,11 @@ class WalletMultisigDescriptorPSBTTest(BitcoinTestFramework):
self.log.info("Check that every participant's multisig generates the same addresses...")
for _ in range(10): # we check that the first 10 generated addresses are the same for all participant's multisigs
receive_addresses = [multisig.getnewaddress() for multisig in participants["multisigs"]]
assert all(address == receive_addresses[0] for address in receive_addresses)
for address in receive_addresses:
assert_equal(address, receive_addresses[0])
change_addresses = [multisig.getrawchangeaddress() for multisig in participants["multisigs"]]
assert all(address == change_addresses[0] for address in change_addresses)
for address in change_addresses:
assert_equal(address, change_addresses[0])
self.log.info("Get a mature utxo to send to the multisig...")
coordinator_wallet = participants["signers"][0]

View File

@@ -42,7 +42,8 @@ class ReorgsRestoreTest(BitcoinTestFramework):
self.connect_nodes(1, 0)
self.sync_blocks(self.nodes[:2])
self.disconnect_nodes(1, 0)
assert all(len(node.getpeerinfo()) == 0 for node in self.nodes[:2])
for node in self.nodes[:2]:
assert_equal(len(node.getpeerinfo()), 0)
# Create a new block in node0, coinbase going to wallet0
self.nodes[0].createwallet(wallet_name="w0", load_on_startup=True)