diff --git a/src/net.cpp b/src/net.cpp index c967de7094e..4f88aa8aab9 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -390,8 +390,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, } } - LogDebug(BCLog::NET, "trying %s connection %s lastseen=%.1fhrs\n", + LogDebug(BCLog::NET, "trying %s connection (%s) to %s, lastseen=%.1fhrs\n", use_v2transport ? "v2" : "v1", + ConnectionTypeAsString(conn_type), pszDest ? pszDest : addrConnect.ToStringAddrPort(), Ticks(pszDest ? 0h : Now() - addrConnect.nTime)); diff --git a/test/functional/p2p_private_broadcast.py b/test/functional/p2p_private_broadcast.py index b81b5aa483d..77bf0e952e9 100755 --- a/test/functional/p2p_private_broadcast.py +++ b/test/functional/p2p_private_broadcast.py @@ -6,6 +6,7 @@ Test how locally submitted transactions are sent to the network when private broadcast is used. """ +import re import time import threading @@ -46,9 +47,6 @@ from test_framework.wallet import ( MiniWallet, ) -MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8 -MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2 -NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS NUM_PRIVATE_BROADCAST_PER_TX = 3 # Fill addrman with these addresses. Must have enough Tor addresses, so that even @@ -184,24 +182,56 @@ class P2PPrivateBroadcast(BitcoinTestFramework): self.destinations_lock = threading.Lock() + def find_connection_type_in_debug_log(to_addr, to_port): + """ + Scan the debug log of tx_originator for a connection attempt to to_addr:to_port. + Return the connection type (outbound-full-relay, private-broadcast, etc) or + None if there is no connection attempt to to_addr:to_port. + """ + with open(self.tx_originator_debug_log_path, mode="r", encoding="utf-8") as debug_log: + for line in debug_log.readlines(): + match = re.match(f".*trying v. connection \\((.+)\\) to \\[?{to_addr}]?:{to_port},.*", line) + if match: + return match.group(1) + return None + def destinations_factory(requested_to_addr, requested_to_port): + """ + Instruct the SOCKS5 proxy to redirect connections: + * The first automatic outbound connection -> P2PDataStore + * The first private broadcast connection -> nodes[1] + * Anything else -> P2PInterface + """ + conn_type = None + def found_connection_in_debug_log(): + nonlocal conn_type + conn_type = find_connection_type_in_debug_log(requested_to_addr, requested_to_port) + return conn_type is not None + + self.wait_until(found_connection_in_debug_log) + with self.destinations_lock: i = len(self.destinations) actual_to_addr = "" actual_to_port = 0 listener = None - if i == NUM_INITIAL_CONNECTIONS: + target_name = "" + if conn_type == "private-broadcast" and not any(dest["conn_type"] == "private-broadcast" for dest in self.destinations): # Instruct the SOCKS5 server to redirect the first private # broadcast connection from nodes[0] to nodes[1] actual_to_addr = "127.0.0.1" # nodes[1] listen address actual_to_port = tor_port(1) # nodes[1] listen port for Tor - self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " - f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])") + target_name = "nodes[1]" else: # Create a Python P2P listening node and instruct the SOCKS5 proxy to # redirect the connection to it. The first outbound connection is used # later to serve GETDATA, thus make it P2PDataStore(). - listener = P2PDataStore() if i == 0 else P2PInterface() + if conn_type == "outbound-full-relay" and not any(dest["conn_type"] == "outbound-full-relay" for dest in self.destinations): + listener = P2PDataStore() + target_name = "Python P2PDataStore" + else: + listener = P2PInterface() + target_name = "Python P2PInterface" listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor) listener.peer_connect_send_version(services=P2P_SERVICES) @@ -221,11 +251,14 @@ class P2PPrivateBroadcast(BitcoinTestFramework): callback=on_listen_done) # Wait until the callback has been called. self.wait_until(lambda: actual_to_port != 0) - self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " - f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)") + + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} ({conn_type}) for " + f"{format_addr_port(requested_to_addr, requested_to_port)} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} ({target_name})") self.destinations.append({ "requested_to": format_addr_port(requested_to_addr, requested_to_port), + "conn_type": conn_type, "node": listener, }) assert_equal(len(self.destinations), i + 1) @@ -263,6 +296,15 @@ class P2PPrivateBroadcast(BitcoinTestFramework): self.setup_nodes() def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations): + def wait_and_get_destination(n): + """Wait for self.destinations[] to have at least n elements and return the 'n'th.""" + def get_destinations_len(): + with self.destinations_lock: + return len(self.destinations) + self.wait_until(lambda: get_destinations_len() > n) + with self.destinations_lock: + return self.destinations[n] + broadcasts_done = 0 i = skip_destinations - 1 while broadcasts_done < broadcasts_to_expect: @@ -270,9 +312,10 @@ class P2PPrivateBroadcast(BitcoinTestFramework): self.log.debug(f"{label}: waiting for outbound connection i={i}") # At this point the connection may not yet have been established (A), # may be active (B), or may have already been closed (C). - self.wait_until(lambda: len(self.destinations) > i) - dest = self.destinations[i] + dest = wait_and_get_destination(i) peer = dest["node"] + if peer is None: + continue # That is the first private broadcast connection, redirected to nodes[1] peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False) # Now it is either (B) or (C). if peer.last_message["version"].nServices != 0: @@ -314,6 +357,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework): def run_test(self): tx_originator = self.nodes[0] + self.tx_originator_debug_log_path = tx_originator.debug_log_path tx_receiver = self.nodes[1] far_observer = tx_receiver.add_p2p_connection(P2PInterface()) @@ -321,30 +365,24 @@ class P2PPrivateBroadcast(BitcoinTestFramework): # Fill tx_originator's addrman. for addr in ADDRMAN_ADDRESSES: - res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False) + res = tx_originator.addpeeraddress(address=addr, port=0 if addr.endswith(".i2p") else 8333, tried=False) if not res["success"]: self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)") - self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS) - - # The next opened connection by tx_originator should be "private broadcast" - # for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver. - txs = wallet.create_self_transfer_chain(chain_length=3) self.log.info(f"Created txid={txs[0]['txid']}: for basic test") self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast") self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool") tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1) - self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, " - "must be the first private broadcast connection") + self.log.info("First private broadcast: waiting for the transaction to reach the recipient") self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0) + self.log.info("First private broadcast: the recipient received the transaction") far_observer.wait_for_tx(txs[0]["txid"]) - self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: " - "the private broadcast target received and further relayed the transaction") + self.log.info("First private broadcast: the recipient further relayed the transaction") # One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts. - self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1) + self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, 0) self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)") ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}" @@ -364,8 +402,32 @@ class P2PPrivateBroadcast(BitcoinTestFramework): wtxid_int = int(txs[0]["wtxid"], 16) inv = CInv(MSG_WTX, wtxid_int) + tx_returner = None # First outbound-full-relay, will be P2PDataStore. + other_peer = None # Any other outbound-full-relay, we use the second one. + + def set_tx_returner_and_other(): + nonlocal tx_returner + nonlocal other_peer + tx_returner = None + other_peer = None + with self.destinations_lock: + for dest in self.destinations: + if dest["conn_type"] == "outbound-full-relay" and dest["node"] is not None: + if tx_returner is None: + assert(type(dest["node"]) is P2PDataStore) + tx_returner = dest["node"] + else: + assert(type(dest["node"]) is P2PInterface) + other_peer = dest["node"] + return True + return False + + self.wait_until(set_tx_returner_and_other) + + tx_returner.wait_for_connect() + other_peer.wait_for_connect() + self.log.info("Sending INV and waiting for GETDATA from node") - tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator. tx_returner.tx_store[wtxid_int] = txs[0]["tx"] assert "getdata" not in tx_returner.last_message received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network" @@ -375,7 +437,7 @@ class P2PPrivateBroadcast(BitcoinTestFramework): self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0) self.log.info("Waiting for normal broadcast to another peer") - self.destinations[1]["node"].wait_for_inv([inv]) + other_peer.wait_for_inv([inv]) self.log.info("Checking getprivatebroadcastinfo no longer reports the transaction after it is received back") pbinfo = tx_originator.getprivatebroadcastinfo()