test: let connections happen in any order in p2p_private_broadcast.py

If the following two events happen:

* (likely) the automatic 10 initial connections are not made to all
  networks
* (unlikely) the network-specific logic kicks in almost immediately.
  It is using exponential distribution with a mean of 5 minutes
  (`rng.rand_exp_duration(EXTRA_NETWORK_PEER_INTERVAL)`).

So if both happen, then the 11th connection may not be the expected
private broadcast, but a network-specific connection.

Fix this by retrieving the connection type from
`destinations_factory()`. This is more flexible because it allows
connections to happen in any order and does not break if e.g. the 11th
connection is not the expected first private broadcast.

This also makes the test run faster:
before: 19-44 sec
now: 10-25 sec
because for example there is no need to wait for the initial 10
automatic outbound connections to be made in order to proceed.

Fixes: https://github.com/bitcoin/bitcoin/issues/34387
This commit is contained in:
Vasil Dimov
2026-01-26 12:45:43 +01:00
parent 67696b207f
commit a8ebcfd34c

View File

@@ -6,6 +6,7 @@
Test how locally submitted transactions are sent to the network when private broadcast is used.
"""
import re
import time
import threading
@@ -46,9 +47,6 @@ from test_framework.wallet import (
MiniWallet,
)
MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8
MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2
NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS
NUM_PRIVATE_BROADCAST_PER_TX = 3
# Fill addrman with these addresses. Must have enough Tor addresses, so that even
@@ -184,24 +182,56 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.destinations_lock = threading.Lock()
def find_connection_type_in_debug_log(to_addr, to_port):
"""
Scan the debug log of tx_originator for a connection attempt to to_addr:to_port.
Return the connection type (outbound-full-relay, private-broadcast, etc) or
None if there is no connection attempt to to_addr:to_port.
"""
with open(self.tx_originator_debug_log_path, mode="r", encoding="utf-8") as debug_log:
for line in debug_log.readlines():
match = re.match(f".*trying v. connection \\((.+)\\) to \\[?{to_addr}]?:{to_port},.*", line)
if match:
return match.group(1)
return None
def destinations_factory(requested_to_addr, requested_to_port):
"""
Instruct the SOCKS5 proxy to redirect connections:
* The first automatic outbound connection -> P2PDataStore
* The first private broadcast connection -> nodes[1]
* Anything else -> P2PInterface
"""
conn_type = None
def found_connection_in_debug_log():
nonlocal conn_type
conn_type = find_connection_type_in_debug_log(requested_to_addr, requested_to_port)
return conn_type is not None
self.wait_until(found_connection_in_debug_log)
with self.destinations_lock:
i = len(self.destinations)
actual_to_addr = ""
actual_to_port = 0
listener = None
if i == NUM_INITIAL_CONNECTIONS:
target_name = ""
if conn_type == "private-broadcast" and not any(dest["conn_type"] == "private-broadcast" for dest in self.destinations):
# Instruct the SOCKS5 server to redirect the first private
# broadcast connection from nodes[0] to nodes[1]
actual_to_addr = "127.0.0.1" # nodes[1] listen address
actual_to_port = tor_port(1) # nodes[1] listen port for Tor
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])")
target_name = "nodes[1]"
else:
# Create a Python P2P listening node and instruct the SOCKS5 proxy to
# redirect the connection to it. The first outbound connection is used
# later to serve GETDATA, thus make it P2PDataStore().
listener = P2PDataStore() if i == 0 else P2PInterface()
if conn_type == "outbound-full-relay" and not any(dest["conn_type"] == "outbound-full-relay" for dest in self.destinations):
listener = P2PDataStore()
target_name = "Python P2PDataStore"
else:
listener = P2PInterface()
target_name = "Python P2PInterface"
listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor)
listener.peer_connect_send_version(services=P2P_SERVICES)
@@ -221,11 +251,14 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
callback=on_listen_done)
# Wait until the callback has been called.
self.wait_until(lambda: actual_to_port != 0)
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} ({conn_type}) for "
f"{format_addr_port(requested_to_addr, requested_to_port)} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} ({target_name})")
self.destinations.append({
"requested_to": format_addr_port(requested_to_addr, requested_to_port),
"conn_type": conn_type,
"node": listener,
})
assert_equal(len(self.destinations), i + 1)
@@ -263,6 +296,15 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.setup_nodes()
def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations):
def wait_and_get_destination(n):
"""Wait for self.destinations[] to have at least n elements and return the 'n'th."""
def get_destinations_len():
with self.destinations_lock:
return len(self.destinations)
self.wait_until(lambda: get_destinations_len() > n)
with self.destinations_lock:
return self.destinations[n]
broadcasts_done = 0
i = skip_destinations - 1
while broadcasts_done < broadcasts_to_expect:
@@ -270,9 +312,10 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.log.debug(f"{label}: waiting for outbound connection i={i}")
# At this point the connection may not yet have been established (A),
# may be active (B), or may have already been closed (C).
self.wait_until(lambda: len(self.destinations) > i)
dest = self.destinations[i]
dest = wait_and_get_destination(i)
peer = dest["node"]
if peer is None:
continue # That is the first private broadcast connection, redirected to nodes[1]
peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False)
# Now it is either (B) or (C).
if peer.last_message["version"].nServices != 0:
@@ -304,6 +347,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
def run_test(self):
tx_originator = self.nodes[0]
self.tx_originator_debug_log_path = tx_originator.debug_log_path
tx_receiver = self.nodes[1]
far_observer = tx_receiver.add_p2p_connection(P2PInterface())
@@ -315,26 +359,20 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
if not res["success"]:
self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)")
self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS)
# The next opened connection by tx_originator should be "private broadcast"
# for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver.
txs = wallet.create_self_transfer_chain(chain_length=3)
self.log.info(f"Created txid={txs[0]['txid']}: for basic test")
self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast")
self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool")
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1)
self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, "
"must be the first private broadcast connection")
self.log.info("First private broadcast: waiting for the transaction to reach the recipient")
self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0)
self.log.info("First private broadcast: the recipient received the transaction")
far_observer.wait_for_tx(txs[0]["txid"])
self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: "
"the private broadcast target received and further relayed the transaction")
self.log.info("First private broadcast: the recipient further relayed the transaction")
# One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts.
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1)
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, 0)
self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)")
ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}"
@@ -354,8 +392,32 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
wtxid_int = int(txs[0]["wtxid"], 16)
inv = CInv(MSG_WTX, wtxid_int)
tx_returner = None # First outbound-full-relay, will be P2PDataStore.
other_peer = None # Any other outbound-full-relay, we use the second one.
def set_tx_returner_and_other():
nonlocal tx_returner
nonlocal other_peer
tx_returner = None
other_peer = None
with self.destinations_lock:
for dest in self.destinations:
if dest["conn_type"] == "outbound-full-relay" and dest["node"] is not None:
if tx_returner is None:
assert(type(dest["node"]) is P2PDataStore)
tx_returner = dest["node"]
else:
assert(type(dest["node"]) is P2PInterface)
other_peer = dest["node"]
return True
return False
self.wait_until(set_tx_returner_and_other)
tx_returner.wait_for_connect()
other_peer.wait_for_connect()
self.log.info("Sending INV and waiting for GETDATA from node")
tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator.
tx_returner.tx_store[wtxid_int] = txs[0]["tx"]
assert "getdata" not in tx_returner.last_message
received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network"
@@ -365,7 +427,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0)
self.log.info("Waiting for normal broadcast to another peer")
self.destinations[1]["node"].wait_for_inv([inv])
other_peer.wait_for_inv([inv])
self.log.info("Sending a transaction that is already in the mempool")
skip_destinations = len(self.destinations)