qa: Avoid start/stop of the network thread mid-test

This commit is contained in:
MarcoFalke
2018-06-18 17:28:37 -04:00
parent fa1eac9cdb
commit fa87da2f17
26 changed files with 95 additions and 220 deletions

View File

@ -13,11 +13,10 @@ P2PConnection: A low-level connection object to a node's P2P interface
P2PInterface: A high-level interface object for communicating to a node over P2P
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
and can respond correctly to getdata and getheaders messages"""
import asyncore
import asyncio
from collections import defaultdict
from io import BytesIO
import logging
import socket
import struct
import sys
import threading
@ -57,7 +56,8 @@ MAGIC_BYTES = {
"regtest": b"\xfa\xbf\xb5\xda", # regtest
}
class P2PConnection(asyncore.dispatcher):
class P2PConnection(asyncio.Protocol):
"""A low-level connection object to a node's P2P interface.
This class is responsible for:
@ -71,68 +71,59 @@ class P2PConnection(asyncore.dispatcher):
sub-classed and the on_message() callback overridden."""
def __init__(self):
# All P2PConnections must be created before starting the NetworkThread.
# assert that the network thread is not running.
assert not network_thread_running()
super().__init__(map=mininode_socket_map)
self._conn_open = False
# The underlying transport of the connection.
# Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe
self._transport = None
@property
def is_connected(self):
return self._conn_open
return self._transport is not None
def peer_connect(self, dstaddr, dstport, net="regtest"):
assert not self.is_connected
self.dstaddr = dstaddr
self.dstport = dstport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sendbuf = b""
# The initial message to send after the connection was made:
self.on_connection_send_msg = None
self.recvbuf = b""
self._asyncore_pre_connection = True
self.network = net
self.disconnect = False
logger.debug('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
try:
self.connect((dstaddr, dstport))
except:
self.handle_close()
loop = NetworkThread.network_event_loop
conn_gen_unsafe = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport)
conn_gen = lambda: loop.call_soon_threadsafe(loop.create_task, conn_gen_unsafe)
return conn_gen
def peer_disconnect(self):
# Connection could have already been closed by other end.
if self.is_connected:
self.disconnect = True # Signal asyncore to disconnect
NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.abort())
# Connection and disconnection methods
def handle_connect(self):
"""asyncore callback when a connection is opened."""
if not self.is_connected:
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self._conn_open = True
self._asyncore_pre_connection = False
self.on_open()
def connection_made(self, transport):
"""asyncio callback when a connection is opened."""
assert not self._transport
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self._transport = transport
if self.on_connection_send_msg:
self.send_message(self.on_connection_send_msg)
self.on_connection_send_msg = None # Never used again
self.on_open()
def handle_close(self):
"""asyncore callback when a connection is closed."""
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
self._conn_open = False
def connection_lost(self, exc):
"""asyncio callback when a connection is closed."""
if exc:
logger.warning("Connection lost to {}:{} due to {}".format(self.dstaddr, self.dstport, exc))
else:
logger.debug("Closed connection to: %s:%d" % (self.dstaddr, self.dstport))
self._transport = None
self.recvbuf = b""
self.sendbuf = b""
try:
self.close()
except:
pass
self.on_close()
# Socket read methods
def handle_read(self):
"""asyncore callback when data is read from the socket."""
t = self.recv(8192)
def data_received(self, t):
"""asyncio callback when data is read from the socket."""
if len(t) > 0:
self.recvbuf += t
self._on_data()
@ -179,30 +170,6 @@ class P2PConnection(asyncore.dispatcher):
# Socket write methods
def writable(self):
"""asyncore method to determine whether the handle_write() callback should be called on the next loop."""
with mininode_lock:
length = len(self.sendbuf)
return length > 0 or self._asyncore_pre_connection
def handle_write(self):
"""asyncore callback when data should be written to the socket."""
with mininode_lock:
# asyncore does not expose socket connection, only the first read/write
# event, thus we must check connection manually here to know when we
# actually connect
if self._asyncore_pre_connection:
self.handle_connect()
if not self.writable():
return
try:
sent = self.send(self.sendbuf)
except:
self.handle_close()
return
self.sendbuf = self.sendbuf[sent:]
def send_message(self, message):
"""Send a P2P message over the socket.
@ -212,15 +179,7 @@ class P2PConnection(asyncore.dispatcher):
raise IOError('Not connected')
self._log_message("send", message)
tmsg = self._build_message(message)
with mininode_lock:
if len(self.sendbuf) == 0:
try:
sent = self.send(tmsg)
self.sendbuf = tmsg[sent:]
except BlockingIOError:
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.write(tmsg))
# Class utility methods
@ -274,7 +233,7 @@ class P2PInterface(P2PConnection):
self.nServices = 0
def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs):
super().peer_connect(*args, **kwargs)
create_conn = super().peer_connect(*args, **kwargs)
if send_version:
# Send a version msg
@ -284,7 +243,9 @@ class P2PInterface(P2PConnection):
vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0
self.sendbuf = self._build_message(vt) # Will be sent right after handle_connect
self.on_connection_send_msg = vt # Will be sent soon after connection_made
return create_conn
# Message receiving methods
@ -408,56 +369,35 @@ class P2PInterface(P2PConnection):
self.ping_counter += 1
# Keep our own socket map for asyncore, so that we can track disconnects
# ourselves (to work around an issue with closing an asyncore socket when
# using select)
mininode_socket_map = dict()
# One lock for synchronizing all data access between the networking thread (see
# One lock for synchronizing all data access between the network event loop (see
# NetworkThread below) and the thread running the test logic. For simplicity,
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface,
# and whenever adding anything to the send buffer (in send_message()). This
# lock should be acquired in the thread running the test logic to synchronize
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface.
# This lock should be acquired in the thread running the test logic to synchronize
# access to any data shared with the P2PInterface or P2PConnection.
mininode_lock = threading.RLock()
class NetworkThread(threading.Thread):
network_event_loop = None
def __init__(self):
super().__init__(name="NetworkThread")
# There is only one event loop and no more than one thread must be created
assert not self.network_event_loop
NetworkThread.network_event_loop = asyncio.new_event_loop()
def run(self):
while mininode_socket_map:
# We check for whether to disconnect outside of the asyncore
# loop to work around the behavior of asyncore when using
# select
disconnected = []
for fd, obj in mininode_socket_map.items():
if obj.disconnect:
disconnected.append(obj)
[obj.handle_close() for obj in disconnected]
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
logger.debug("Network thread closing")
"""Start the network thread."""
self.network_event_loop.run_forever()
def network_thread_start():
"""Start the network thread."""
# Only one network thread may run at a time
assert not network_thread_running()
def close(self, timeout=10):
"""Close the connections and network event loop."""
self.network_event_loop.call_soon_threadsafe(self.network_event_loop.stop)
wait_until(lambda: not self.network_event_loop.is_running(), timeout=timeout)
self.network_event_loop.close()
self.join(timeout)
NetworkThread().start()
def network_thread_running():
"""Return whether the network thread is running."""
return any([thread.name == "NetworkThread" for thread in threading.enumerate()])
def network_thread_join(timeout=10):
"""Wait timeout seconds for the network thread to terminate.
Throw if the network thread doesn't terminate in timeout seconds."""
network_threads = [thread for thread in threading.enumerate() if thread.name == "NetworkThread"]
assert len(network_threads) <= 1
for thread in network_threads:
thread.join(timeout)
assert not thread.is_alive()
class P2PDataStore(P2PInterface):
"""A P2P data store class.

View File

@ -18,6 +18,7 @@ import time
from .authproxy import JSONRPCException
from . import coverage
from .test_node import TestNode
from .mininode import NetworkThread
from .util import (
MAX_NODES,
PortSeed,
@ -83,6 +84,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
"""Sets test framework defaults. Do not override this method. Instead, override the set_test_params() method"""
self.setup_clean_chain = False
self.nodes = []
self.network_thread = None
self.mocktime = 0
self.supports_cli = False
self.bind_to_localhost_only = True
@ -144,6 +146,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
self.options.tmpdir = tempfile.mkdtemp(prefix="test")
self._start_logging()
self.log.debug('Setting up network thread')
self.network_thread = NetworkThread()
self.network_thread.start()
success = TestStatus.FAILED
try:
@ -171,6 +177,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
print("Testcase failed. Attaching python debugger. Enter ? for help")
pdb.set_trace()
self.log.debug('Closing down network thread')
self.network_thread.close()
if not self.options.noshutdown:
self.log.info("Stopping nodes")
if self.nodes:

View File

@ -289,7 +289,7 @@ class TestNode():
if 'dstaddr' not in kwargs:
kwargs['dstaddr'] = '127.0.0.1'
p2p_conn.peer_connect(*args, **kwargs)
p2p_conn.peer_connect(*args, **kwargs)()
self.p2ps.append(p2p_conn)
return p2p_conn
@ -343,10 +343,10 @@ class TestNodeCLI():
def batch(self, requests):
results = []
for request in requests:
try:
results.append(dict(result=request()))
except JSONRPCException as e:
results.append(dict(error=e))
try:
results.append(dict(result=request()))
except JSONRPCException as e:
results.append(dict(error=e))
return results
def send_cli(self, command=None, *args, **kwargs):