Merge #11712: [tests] Split NodeConn from NodeConnCB

e9dfa9bcc [tests] Move version message sending from NodeConn to NodeConnCB (John Newbery)
dad596fc3 [tests] Make NodeConnCB a subclass of NodeConn (John Newbery)
e30d40438 [tests] Move only: move NodeConnCB below NodeConn (John Newbery)
4d5059856 [tests] Tidy up mininode (John Newbery)
f2ae6f32a [tests] Remove mininode periodic (half-hour) ping messages (John Newbery)
ec59523c5 [tests] Remove rpc property from TestNode in p2p-segwit.py. (John Newbery)

Pull request description:

  This is the final step in #11518, except for possibly renaming - for motivation, please see that PR.

  If this is merged, then migrating the test framework from asyncore to asyncio should be easier (I say should because I haven't dug too deeply into what would be required).

  Requesting review from @ryanofsky , since he always has good feedback on these refactor PRs, and I'd appreciate his take on this refactor. Note particularly that I've reverted the change suggested here: https://github.com/bitcoin/bitcoin/pull/11182#discussion_r148859555 . The idea, as always, is to present a simple interface to the test writer.

Tree-SHA512: 94dd467a13ec799b101108cf47d4dccb6f6240b601e375e3d785313333bbb389c26072a50759aca663bbf3d6c8b867b99e36ae8800ab8ea115e0496c151926ce
This commit is contained in:
MarcoFalke
2017-11-29 16:29:12 -05:00
14 changed files with 507 additions and 489 deletions

View File

@@ -43,7 +43,6 @@ class TestNode(NodeConnCB):
def __init__(self, block_store, tx_store):
super().__init__()
self.conn = None
self.bestblockhash = None
self.block_store = block_store
self.block_request_map = {}
@@ -58,26 +57,23 @@ class TestNode(NodeConnCB):
self.lastInv = []
self.closed = False
def on_close(self, conn):
def on_close(self):
self.closed = True
def add_connection(self, conn):
self.conn = conn
def on_headers(self, conn, message):
def on_headers(self, message):
if len(message.headers) > 0:
best_header = message.headers[-1]
best_header.calc_sha256()
self.bestblockhash = best_header.sha256
def on_getheaders(self, conn, message):
def on_getheaders(self, message):
response = self.block_store.headers_for(message.locator, message.hashstop)
if response is not None:
conn.send_message(response)
self.send_message(response)
def on_getdata(self, conn, message):
[conn.send_message(r) for r in self.block_store.get_blocks(message.inv)]
[conn.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
def on_getdata(self, message):
[self.send_message(r) for r in self.block_store.get_blocks(message.inv)]
[self.send_message(r) for r in self.tx_store.get_transactions(message.inv)]
for i in message.inv:
if i.type == 1 or i.type == 1 | (1 << 30): # MSG_TX or MSG_WITNESS_TX
@@ -85,16 +81,16 @@ class TestNode(NodeConnCB):
elif i.type == 2 or i.type == 2 | (1 << 30): # MSG_BLOCK or MSG_WITNESS_BLOCK
self.block_request_map[i.hash] = True
def on_inv(self, conn, message):
def on_inv(self, message):
self.lastInv = [x.hash for x in message.inv]
def on_pong(self, conn, message):
def on_pong(self, message):
try:
del self.pingMap[message.nonce]
except KeyError:
raise AssertionError("Got pong for unknown ping [%s]" % repr(message))
def on_reject(self, conn, message):
def on_reject(self, message):
if message.message == b'tx':
self.tx_reject_map[message.data] = RejectResult(message.code, message.reason)
if message.message == b'block':
@@ -102,30 +98,30 @@ class TestNode(NodeConnCB):
def send_inv(self, obj):
mtype = 2 if isinstance(obj, CBlock) else 1
self.conn.send_message(msg_inv([CInv(mtype, obj.sha256)]))
self.send_message(msg_inv([CInv(mtype, obj.sha256)]))
def send_getheaders(self):
# We ask for headers from their last tip.
m = msg_getheaders()
m.locator = self.block_store.get_locator(self.bestblockhash)
self.conn.send_message(m)
self.send_message(m)
def send_header(self, header):
m = msg_headers()
m.headers.append(header)
self.conn.send_message(m)
self.send_message(m)
# This assumes BIP31
def send_ping(self, nonce):
self.pingMap[nonce] = True
self.conn.send_message(msg_ping(nonce))
self.send_message(msg_ping(nonce))
def received_ping_response(self, nonce):
return nonce not in self.pingMap
def send_mempool(self):
self.lastInv = []
self.conn.send_message(msg_mempool())
self.send_message(msg_mempool())
# TestInstance:
#
@@ -166,8 +162,7 @@ class TestManager():
def __init__(self, testgen, datadir):
self.test_generator = testgen
self.connections = []
self.test_nodes = []
self.p2p_connections= []
self.block_store = BlockStore(datadir)
self.tx_store = TxStore(datadir)
self.ping_counter = 1
@@ -175,28 +170,24 @@ class TestManager():
def add_all_connections(self, nodes):
for i in range(len(nodes)):
# Create a p2p connection to each node
test_node = TestNode(self.block_store, self.tx_store)
self.test_nodes.append(test_node)
self.connections.append(NodeConn('127.0.0.1', p2p_port(i), test_node))
# Make sure the TestNode (callback class) has a reference to its
# associated NodeConn
test_node.add_connection(self.connections[-1])
node = TestNode(self.block_store, self.tx_store)
node.peer_connect('127.0.0.1', p2p_port(i))
self.p2p_connections.append(node)
def clear_all_connections(self):
self.connections = []
self.test_nodes = []
self.p2p_connections = []
def wait_for_disconnections(self):
def disconnected():
return all(node.closed for node in self.test_nodes)
return all(node.closed for node in self.p2p_connections)
wait_until(disconnected, timeout=10, lock=mininode_lock)
def wait_for_verack(self):
return all(node.wait_for_verack() for node in self.test_nodes)
return all(node.wait_for_verack() for node in self.p2p_connections)
def wait_for_pings(self, counter):
def received_pongs():
return all(node.received_ping_response(counter) for node in self.test_nodes)
return all(node.received_ping_response(counter) for node in self.p2p_connections)
wait_until(received_pongs, lock=mininode_lock)
# sync_blocks: Wait for all connections to request the blockhash given
@@ -206,17 +197,17 @@ class TestManager():
def blocks_requested():
return all(
blockhash in node.block_request_map and node.block_request_map[blockhash]
for node in self.test_nodes
for node in self.p2p_connections
)
# --> error if not requested
wait_until(blocks_requested, attempts=20*num_blocks, lock=mininode_lock)
# Send getheaders message
[ c.cb.send_getheaders() for c in self.connections ]
[ c.send_getheaders() for c in self.p2p_connections ]
# Send ping and wait for response -- synchronization hack
[ c.cb.send_ping(self.ping_counter) for c in self.connections ]
[ c.send_ping(self.ping_counter) for c in self.p2p_connections ]
self.wait_for_pings(self.ping_counter)
self.ping_counter += 1
@@ -226,42 +217,42 @@ class TestManager():
def transaction_requested():
return all(
txhash in node.tx_request_map and node.tx_request_map[txhash]
for node in self.test_nodes
for node in self.p2p_connections
)
# --> error if not requested
wait_until(transaction_requested, attempts=20*num_events, lock=mininode_lock)
# Get the mempool
[ c.cb.send_mempool() for c in self.connections ]
[ c.send_mempool() for c in self.p2p_connections ]
# Send ping and wait for response -- synchronization hack
[ c.cb.send_ping(self.ping_counter) for c in self.connections ]
[ c.send_ping(self.ping_counter) for c in self.p2p_connections ]
self.wait_for_pings(self.ping_counter)
self.ping_counter += 1
# Sort inv responses from each node
with mininode_lock:
[ c.cb.lastInv.sort() for c in self.connections ]
[ c.lastInv.sort() for c in self.p2p_connections ]
# Verify that the tip of each connection all agree with each other, and
# with the expected outcome (if given)
def check_results(self, blockhash, outcome):
with mininode_lock:
for c in self.connections:
for c in self.p2p_connections:
if outcome is None:
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
if c.bestblockhash != self.p2p_connections[0].bestblockhash:
return False
elif isinstance(outcome, RejectResult): # Check that block was rejected w/ code
if c.cb.bestblockhash == blockhash:
if c.bestblockhash == blockhash:
return False
if blockhash not in c.cb.block_reject_map:
if blockhash not in c.block_reject_map:
logger.error('Block not in reject map: %064x' % (blockhash))
return False
if not outcome.match(c.cb.block_reject_map[blockhash]):
logger.error('Block rejected with %s instead of expected %s: %064x' % (c.cb.block_reject_map[blockhash], outcome, blockhash))
if not outcome.match(c.block_reject_map[blockhash]):
logger.error('Block rejected with %s instead of expected %s: %064x' % (c.block_reject_map[blockhash], outcome, blockhash))
return False
elif ((c.cb.bestblockhash == blockhash) != outcome):
elif ((c.bestblockhash == blockhash) != outcome):
return False
return True
@@ -273,21 +264,21 @@ class TestManager():
# a particular tx's existence in the mempool is the same across all nodes.
def check_mempool(self, txhash, outcome):
with mininode_lock:
for c in self.connections:
for c in self.p2p_connections:
if outcome is None:
# Make sure the mempools agree with each other
if c.cb.lastInv != self.connections[0].cb.lastInv:
if c.lastInv != self.p2p_connections[0].lastInv:
return False
elif isinstance(outcome, RejectResult): # Check that tx was rejected w/ code
if txhash in c.cb.lastInv:
if txhash in c.lastInv:
return False
if txhash not in c.cb.tx_reject_map:
if txhash not in c.tx_reject_map:
logger.error('Tx not in reject map: %064x' % (txhash))
return False
if not outcome.match(c.cb.tx_reject_map[txhash]):
logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.cb.tx_reject_map[txhash], outcome, txhash))
if not outcome.match(c.tx_reject_map[txhash]):
logger.error('Tx rejected with %s instead of expected %s: %064x' % (c.tx_reject_map[txhash], outcome, txhash))
return False
elif ((txhash in c.cb.lastInv) != outcome):
elif ((txhash in c.lastInv) != outcome):
return False
return True
@@ -332,25 +323,25 @@ class TestManager():
first_block_with_hash = False
with mininode_lock:
self.block_store.add_block(block)
for c in self.connections:
if first_block_with_hash and block.sha256 in c.cb.block_request_map and c.cb.block_request_map[block.sha256] == True:
for c in self.p2p_connections:
if first_block_with_hash and block.sha256 in c.block_request_map and c.block_request_map[block.sha256] == True:
# There was a previous request for this block hash
# Most likely, we delivered a header for this block
# but never had the block to respond to the getdata
c.send_message(msg_block(block))
else:
c.cb.block_request_map[block.sha256] = False
c.block_request_map[block.sha256] = False
# Either send inv's to each node and sync, or add
# to invqueue for later inv'ing.
if (test_instance.sync_every_block):
# if we expect success, send inv and sync every block
# if we expect failure, just push the block and see what happens.
if outcome == True:
[ c.cb.send_inv(block) for c in self.connections ]
[ c.send_inv(block) for c in self.p2p_connections ]
self.sync_blocks(block.sha256, 1)
else:
[ c.send_message(msg_block(block)) for c in self.connections ]
[ c.cb.send_ping(self.ping_counter) for c in self.connections ]
[ c.send_message(msg_block(block)) for c in self.p2p_connections ]
[ c.send_ping(self.ping_counter) for c in self.p2p_connections ]
self.wait_for_pings(self.ping_counter)
self.ping_counter += 1
if (not self.check_results(tip, outcome)):
@@ -360,7 +351,7 @@ class TestManager():
elif isinstance(b_or_t, CBlockHeader):
block_header = b_or_t
self.block_store.add_header(block_header)
[ c.cb.send_header(block_header) for c in self.connections ]
[ c.send_header(block_header) for c in self.p2p_connections ]
else: # Tx test runner
assert(isinstance(b_or_t, CTransaction))
@@ -369,11 +360,11 @@ class TestManager():
# Add to shared tx store and clear map entry
with mininode_lock:
self.tx_store.add_transaction(tx)
for c in self.connections:
c.cb.tx_request_map[tx.sha256] = False
for c in self.p2p_connections:
c.tx_request_map[tx.sha256] = False
# Again, either inv to all nodes or save for later
if (test_instance.sync_every_tx):
[ c.cb.send_inv(tx) for c in self.connections ]
[ c.send_inv(tx) for c in self.p2p_connections ]
self.sync_transaction(tx.sha256, 1)
if (not self.check_mempool(tx.sha256, outcome)):
raise AssertionError("Test failed at test %d" % test_number)
@@ -381,26 +372,26 @@ class TestManager():
invqueue.append(CInv(1, tx.sha256))
# Ensure we're not overflowing the inv queue
if len(invqueue) == MAX_INV_SZ:
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
[ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ]
invqueue = []
# Do final sync if we weren't syncing on every block or every tx.
if (not test_instance.sync_every_block and block is not None):
if len(invqueue) > 0:
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
[ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ]
invqueue = []
self.sync_blocks(block.sha256, len(test_instance.blocks_and_transactions))
if (not self.check_results(tip, block_outcome)):
raise AssertionError("Block test failed at test %d" % test_number)
if (not test_instance.sync_every_tx and tx is not None):
if len(invqueue) > 0:
[ c.send_message(msg_inv(invqueue)) for c in self.connections ]
[ c.send_message(msg_inv(invqueue)) for c in self.p2p_connections ]
invqueue = []
self.sync_transaction(tx.sha256, len(test_instance.blocks_and_transactions))
if (not self.check_mempool(tx.sha256, tx_outcome)):
raise AssertionError("Mempool test failed at test %d" % test_number)
[ c.disconnect_node() for c in self.connections ]
[ c.disconnect_node() for c in self.p2p_connections ]
self.wait_for_disconnections()
self.block_store.close()
self.tx_store.close()

View File

@@ -20,10 +20,10 @@ import logging
import socket
import struct
import sys
import time
from threading import RLock, Thread
from test_framework.messages import *
from test_framework.util import wait_until
logger = logging.getLogger("TestFramework.mininode")
@@ -57,15 +57,206 @@ MAGIC_BYTES = {
"regtest": b"\xfa\xbf\xb5\xda", # regtest
}
class NodeConnCB():
"""Callback and helper functions for P2P connection to a bitcoind node.
class NodeConn(asyncore.dispatcher):
"""A low-level connection object to a node's P2P interface.
This class is responsible for:
- opening and closing the TCP connection to the node
- reading bytes from and writing bytes to the socket
- deserializing and serializing the P2P message header
- logging messages as they are sent and received
This class contains no logic for handing the P2P message payloads. It must be
sub-classed and the on_message() callback overridden.
TODO: rename this class P2PConnection."""
def __init__(self):
super().__init__(map=mininode_socket_map)
def peer_connect(self, dstaddr, dstport, net="regtest"):
self.dstaddr = dstaddr
self.dstport = dstport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sendbuf = b""
self.recvbuf = b""
self.state = "connecting"
self.network = net
self.disconnect = False
logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
try:
self.connect((dstaddr, dstport))
except:
self.handle_close()
def peer_disconnect(self):
# Connection could have already been closed by other end.
if self.state == "connected":
self.disconnect_node()
# Connection and disconnection methods
def handle_connect(self):
"""asyncore callback when a connection is opened."""
if self.state != "connected":
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self.state = "connected"
self.on_open()
def handle_close(self):
"""asyncore callback when a connection is closed."""
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
self.state = "closed"
self.recvbuf = b""
self.sendbuf = b""
try:
self.close()
except:
pass
self.on_close()
def disconnect_node(self):
"""Disconnect the p2p connection.
Called by the test logic thread. Causes the p2p connection
to be disconnected on the next iteration of the asyncore loop."""
self.disconnect = True
# Socket read methods
def handle_read(self):
"""asyncore callback when data is read from the socket."""
t = self.recv(8192)
if len(t) > 0:
self.recvbuf += t
self._on_data()
def _on_data(self):
"""Try to read P2P messages from the recv buffer.
This method reads data from the buffer in a loop. It deserializes,
parses and verifies the P2P header, then passes the P2P payload to
the on_message callback for processing."""
try:
while True:
if len(self.recvbuf) < 4:
return
if self.recvbuf[:4] != MAGIC_BYTES[self.network]:
raise ValueError("got garbage %s" % repr(self.recvbuf))
if len(self.recvbuf) < 4 + 12 + 4 + 4:
return
command = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
checksum = self.recvbuf[4+12+4:4+12+4+4]
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen:
return
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen]
th = sha256(msg)
h = sha256(th)
if checksum != h[:4]:
raise ValueError("got bad checksum " + repr(self.recvbuf))
self.recvbuf = self.recvbuf[4+12+4+4+msglen:]
if command not in MESSAGEMAP:
raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, command, repr(msg)))
f = BytesIO(msg)
t = MESSAGEMAP[command]()
t.deserialize(f)
self._log_message("receive", t)
self.on_message(t)
except Exception as e:
logger.exception('Error reading message:', repr(e))
raise
def on_message(self, message):
"""Callback for processing a P2P payload. Must be overridden by derived class."""
raise NotImplementedError
# Socket write methods
def writable(self):
"""asyncore method to determine whether the handle_write() callback should be called on the next loop."""
with mininode_lock:
pre_connection = self.state == "connecting"
length = len(self.sendbuf)
return (length > 0 or pre_connection)
def handle_write(self):
"""asyncore callback when data should be written to the socket."""
with mininode_lock:
# asyncore does not expose socket connection, only the first read/write
# event, thus we must check connection manually here to know when we
# actually connect
if self.state == "connecting":
self.handle_connect()
if not self.writable():
return
try:
sent = self.send(self.sendbuf)
except:
self.handle_close()
return
self.sendbuf = self.sendbuf[sent:]
def send_message(self, message, pushbuf=False):
"""Send a P2P message over the socket.
This method takes a P2P payload, builds the P2P header and adds
the message to the send buffer to be sent over the socket."""
if self.state != "connected" and not pushbuf:
raise IOError('Not connected, no pushbuf')
self._log_message("send", message)
command = message.command
data = message.serialize()
tmsg = MAGIC_BYTES[self.network]
tmsg += command
tmsg += b"\x00" * (12 - len(command))
tmsg += struct.pack("<I", len(data))
th = sha256(data)
h = sha256(th)
tmsg += h[:4]
tmsg += data
with mininode_lock:
if (len(self.sendbuf) == 0 and not pushbuf):
try:
sent = self.send(tmsg)
self.sendbuf = tmsg[sent:]
except BlockingIOError:
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
# Class utility methods
def _log_message(self, direction, msg):
"""Logs a message being sent or received over the connection."""
if direction == "send":
log_message = "Send message to "
elif direction == "receive":
log_message = "Received message from "
log_message += "%s:%d: %s" % (self.dstaddr, self.dstport, repr(msg)[:500])
if len(log_message) > 500:
log_message += "... (msg truncated)"
logger.debug(log_message)
class NodeConnCB(NodeConn):
"""A high-level P2P interface class for communicating with a Bitcoin node.
This class provides high-level callbacks for processing P2P message
payloads, as well as convenience methods for interacting with the
node over P2P.
Individual testcases should subclass this and override the on_* methods
if they want to alter message handling behaviour."""
if they want to alter message handling behaviour.
TODO: rename this class P2PInterface"""
def __init__(self):
# Track whether we have a P2P connection open to the node
self.connected = False
self.connection = None
super().__init__()
# Track number of messages of each type received and the most recent
# message of each type
@@ -75,9 +266,25 @@ class NodeConnCB():
# A count of the number of ping messages we've sent to the node
self.ping_counter = 1
# The network services received from the peer
self.nServices = 0
def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs):
super().peer_connect(*args, **kwargs)
if send_version:
# Send a version msg
vt = msg_version()
vt.nServices = services
vt.addrTo.ip = self.dstaddr
vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0
self.send_message(vt, True)
# Message receiving methods
def deliver(self, conn, message):
def on_message(self, message):
"""Receive message and dispatch message to appropriate callback.
We keep a count of how many of each message type has been received
@@ -87,66 +294,61 @@ class NodeConnCB():
command = message.command.decode('ascii')
self.message_count[command] += 1
self.last_message[command] = message
getattr(self, 'on_' + command)(conn, message)
getattr(self, 'on_' + command)(message)
except:
print("ERROR delivering %s (%s)" % (repr(message),
sys.exc_info()[0]))
print("ERROR delivering %s (%s)" % (repr(message), sys.exc_info()[0]))
raise
# Callback methods. Can be overridden by subclasses in individual test
# cases to provide custom message handling behaviour.
def on_open(self, conn):
self.connected = True
def on_open(self):
pass
def on_close(self, conn):
self.connected = False
self.connection = None
def on_close(self):
pass
def on_addr(self, conn, message): pass
def on_block(self, conn, message): pass
def on_blocktxn(self, conn, message): pass
def on_cmpctblock(self, conn, message): pass
def on_feefilter(self, conn, message): pass
def on_getaddr(self, conn, message): pass
def on_getblocks(self, conn, message): pass
def on_getblocktxn(self, conn, message): pass
def on_getdata(self, conn, message): pass
def on_getheaders(self, conn, message): pass
def on_headers(self, conn, message): pass
def on_mempool(self, conn): pass
def on_pong(self, conn, message): pass
def on_reject(self, conn, message): pass
def on_sendcmpct(self, conn, message): pass
def on_sendheaders(self, conn, message): pass
def on_tx(self, conn, message): pass
def on_addr(self, message): pass
def on_block(self, message): pass
def on_blocktxn(self, message): pass
def on_cmpctblock(self, message): pass
def on_feefilter(self, message): pass
def on_getaddr(self, message): pass
def on_getblocks(self, message): pass
def on_getblocktxn(self, message): pass
def on_getdata(self, message): pass
def on_getheaders(self, message): pass
def on_headers(self, message): pass
def on_mempool(self, message): pass
def on_pong(self, message): pass
def on_reject(self, message): pass
def on_sendcmpct(self, message): pass
def on_sendheaders(self, message): pass
def on_tx(self, message): pass
def on_inv(self, conn, message):
def on_inv(self, message):
want = msg_getdata()
for i in message.inv:
if i.type != 0:
want.inv.append(i)
if len(want.inv):
conn.send_message(want)
self.send_message(want)
def on_ping(self, conn, message):
conn.send_message(msg_pong(message.nonce))
def on_ping(self, message):
self.send_message(msg_pong(message.nonce))
def on_verack(self, conn, message):
def on_verack(self, message):
self.verack_received = True
def on_version(self, conn, message):
def on_version(self, message):
assert message.nVersion >= MIN_VERSION_SUPPORTED, "Version {} received. Test framework only supports versions greater than {}".format(message.nVersion, MIN_VERSION_SUPPORTED)
conn.send_message(msg_verack())
conn.nServices = message.nServices
self.send_message(msg_verack())
self.nServices = message.nServices
# Connection helper methods
def add_connection(self, conn):
self.connection = conn
def wait_for_disconnect(self, timeout=60):
test_function = lambda: not self.connected
test_function = lambda: self.state != "connected"
wait_until(test_function, timeout=timeout, lock=mininode_lock)
# Message receiving helper methods
@@ -178,12 +380,6 @@ class NodeConnCB():
# Message sending helper functions
def send_message(self, message):
if self.connection:
self.connection.send_message(message)
else:
logger.error("Cannot send message. No connection to node!")
def send_and_ping(self, message):
self.send_message(message)
self.sync_with_ping()
@@ -195,178 +391,6 @@ class NodeConnCB():
wait_until(test_function, timeout=timeout, lock=mininode_lock)
self.ping_counter += 1
class NodeConn(asyncore.dispatcher):
"""The actual NodeConn class
This class provides an interface for a p2p connection to a specified node."""
def __init__(self, dstaddr, dstport, callback, net="regtest", services=NODE_NETWORK|NODE_WITNESS, send_version=True):
asyncore.dispatcher.__init__(self, map=mininode_socket_map)
self.dstaddr = dstaddr
self.dstport = dstport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sendbuf = b""
self.recvbuf = b""
self.last_sent = 0
self.state = "connecting"
self.network = net
self.cb = callback
self.disconnect = False
self.nServices = 0
if send_version:
# stuff version msg into sendbuf
vt = msg_version()
vt.nServices = services
vt.addrTo.ip = self.dstaddr
vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0
self.send_message(vt, True)
logger.info('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
try:
self.connect((dstaddr, dstport))
except:
self.handle_close()
# Connection and disconnection methods
def handle_connect(self):
if self.state != "connected":
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self.state = "connected"
self.cb.on_open(self)
def handle_close(self):
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
self.state = "closed"
self.recvbuf = b""
self.sendbuf = b""
try:
self.close()
except:
pass
self.cb.on_close(self)
def disconnect_node(self):
""" Disconnect the p2p connection.
Called by the test logic thread. Causes the p2p connection
to be disconnected on the next iteration of the asyncore loop."""
self.disconnect = True
# Socket read methods
def readable(self):
return True
def handle_read(self):
t = self.recv(8192)
if len(t) > 0:
self.recvbuf += t
self.got_data()
def got_data(self):
try:
while True:
if len(self.recvbuf) < 4:
return
if self.recvbuf[:4] != MAGIC_BYTES[self.network]:
raise ValueError("got garbage %s" % repr(self.recvbuf))
if len(self.recvbuf) < 4 + 12 + 4 + 4:
return
command = self.recvbuf[4:4+12].split(b"\x00", 1)[0]
msglen = struct.unpack("<i", self.recvbuf[4+12:4+12+4])[0]
checksum = self.recvbuf[4+12+4:4+12+4+4]
if len(self.recvbuf) < 4 + 12 + 4 + 4 + msglen:
return
msg = self.recvbuf[4+12+4+4:4+12+4+4+msglen]
th = sha256(msg)
h = sha256(th)
if checksum != h[:4]:
raise ValueError("got bad checksum " + repr(self.recvbuf))
self.recvbuf = self.recvbuf[4+12+4+4+msglen:]
if command not in MESSAGEMAP:
raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self.dstaddr, self.dstport, command, repr(msg)))
f = BytesIO(msg)
t = MESSAGEMAP[command]()
t.deserialize(f)
self.got_message(t)
except Exception as e:
logger.exception('Error reading message:', repr(e))
raise
def got_message(self, message):
if self.last_sent + 30 * 60 < time.time():
self.send_message(MESSAGEMAP[b'ping']())
self._log_message("receive", message)
self.cb.deliver(self, message)
# Socket write methods
def writable(self):
with mininode_lock:
pre_connection = self.state == "connecting"
length = len(self.sendbuf)
return (length > 0 or pre_connection)
def handle_write(self):
with mininode_lock:
# asyncore does not expose socket connection, only the first read/write
# event, thus we must check connection manually here to know when we
# actually connect
if self.state == "connecting":
self.handle_connect()
if not self.writable():
return
try:
sent = self.send(self.sendbuf)
except:
self.handle_close()
return
self.sendbuf = self.sendbuf[sent:]
def send_message(self, message, pushbuf=False):
if self.state != "connected" and not pushbuf:
raise IOError('Not connected, no pushbuf')
self._log_message("send", message)
command = message.command
data = message.serialize()
tmsg = MAGIC_BYTES[self.network]
tmsg += command
tmsg += b"\x00" * (12 - len(command))
tmsg += struct.pack("<I", len(data))
th = sha256(data)
h = sha256(th)
tmsg += h[:4]
tmsg += data
with mininode_lock:
if (len(self.sendbuf) == 0 and not pushbuf):
try:
sent = self.send(tmsg)
self.sendbuf = tmsg[sent:]
except BlockingIOError:
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
self.last_sent = time.time()
# Class utility methods
def _log_message(self, direction, msg):
if direction == "send":
log_message = "Send message to "
elif direction == "receive":
log_message = "Received message from "
log_message += "%s:%d: %s" % (self.dstaddr, self.dstport, repr(msg)[:500])
if len(log_message) > 500:
log_message += "... (msg truncated)"
logger.debug(log_message)
# Keep our own socket map for asyncore, so that we can track disconnects
# ourselves (to workaround an issue with closing an asyncore socket when

View File

@@ -14,7 +14,6 @@ import subprocess
import time
from .authproxy import JSONRPCException
from .mininode import NodeConn
from .util import (
assert_equal,
get_rpc_proxy,
@@ -158,7 +157,7 @@ class TestNode():
self.encryptwallet(passphrase)
self.wait_until_stopped()
def add_p2p_connection(self, p2p_conn, **kwargs):
def add_p2p_connection(self, p2p_conn, *args, **kwargs):
"""Add a p2p connection to the node.
This method adds the p2p connection to the self.p2ps list and also
@@ -167,9 +166,9 @@ class TestNode():
kwargs['dstport'] = p2p_port(self.index)
if 'dstaddr' not in kwargs:
kwargs['dstaddr'] = '127.0.0.1'
p2p_conn.peer_connect(*args, **kwargs)
self.p2ps.append(p2p_conn)
kwargs.update({'callback': p2p_conn})
p2p_conn.add_connection(NodeConn(**kwargs))
return p2p_conn
@@ -185,10 +184,8 @@ class TestNode():
def disconnect_p2ps(self):
"""Close all p2p connections to the node."""
for p in self.p2ps:
# Connection could have already been closed by other end.
if p.connection is not None:
p.connection.disconnect_node()
self.p2ps = []
p.peer_disconnect()
del self.p2ps[:]
class TestNodeCLI():