Merge #21008: test: fix zmq test flakiness, improve speed

ef21fb7313005a8a2d4f03fb4056f1f66c1b04f0 zmq test: speedup test by whitelisting peers (immediate tx relay) (Sebastian Falbesoner)
5c6546362dce8b468268578e345c37ed515a1855 zmq test: fix flakiness by using more robust sync method (Sebastian Falbesoner)
8666033630eeaf851ec69e018bb53eb23093f4b9 zmq test: accept arbitrary sequence start number in ZMQSubscriber (Sebastian Falbesoner)
6014d6e1b5a0dda6e20c2721f0bdb7e6a63ece81 zmq test: dedup message reception handling in ZMQSubscriber (Sebastian Falbesoner)

Pull request description:

  Fixes #20934 by using the "sync up" method described in https://github.com/bitcoin/bitcoin/issues/20538#issuecomment-738791868.

  After improving robustness with this approach (commits 1-3), it turned out that there were still some fails, but those were unrelated to zmq: Out of 500 runs, 3 times `sync_mempool()` or `sync_blocks()` timed out, which can happen because the trickle relay time has no upper bound -- hence in rare cases, it takes longer than 60s. This is fixed by enabling immediate tx relay on node1 (commit 4), which as a nice side-effect also gives us a rough 2x speedup for the test.

  For further details, also see the explanations in the commit messages.

  There is no guarantee that the test is still not flaky, but it would help if potential reviewers would run the following script locally and report how many runs failed (feel free to do less than 1000 runs, as this takes quite a long if ran with `--valgrind`):
  ```
  #!/bin/sh
  OUTPUT_FILE=./zmq_results
  echo ===== repeated zmq test ===== > $OUTPUT_FILE

  for i in `seq 1000`; do
      echo ------------------------
      echo ----- test run $i -----
      echo ------------------------
      echo --- $i --- >> $OUTPUT_FILE
      ./test/functional/interface_zmq.py --valgrind
      if [ $? -ne 0 ]; then
          echo "FAILED. /o\\" >> $OUTPUT_FILE
      else
          echo "PASSED. \\o/" >> $OUTPUT_FILE
      fi
  done

  echo Failed test runs:
  grep FAILED $OUTPUT_FILE | wc -l
  ```

ACKs for top commit:
  jonatack:
    Light ACK ef21fb7313005a8a2d4f03fb4056f1f66c1b04f0 with the caveat that I was unable to make the test fail with valgrind both here and on master, so I can't vouch that it actually fixes the CI flakiness. The test does run ~2x faster with this.

Tree-SHA512: 7a1e7592fbbd98e69e1e1294486b91253e589c72b3c6bbb7f587028ec07cca59b7d984e4ebf256c4bc3e8a529ec77d31842f3dd874038aea0b684abfea50306a
This commit is contained in:
MarcoFalke 2021-02-16 18:56:14 +01:00
commit 3c9d9d21e1
No known key found for this signature in database
GPG Key ID: D2EA4850E7528B25

View File

@ -27,28 +27,31 @@ def hash256_reversed(byte_str):
class ZMQSubscriber:
def __init__(self, socket, topic):
self.sequence = 0
self.sequence = None # no sequence number received yet
self.socket = socket
self.topic = topic
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
def receive(self):
# Receive message from publisher and verify that topic and sequence match
def _receive_from_publisher_and_check(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
received_seq = struct.unpack('<I', seq)[-1]
if self.sequence is None:
self.sequence = received_seq
else:
assert_equal(received_seq, self.sequence)
self.sequence += 1
return body
def receive(self):
return self._receive_from_publisher_and_check()
def receive_sequence(self):
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
assert_equal(topic, self.topic)
# Sequence should be incremental.
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
self.sequence += 1
body = self._receive_from_publisher_and_check()
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
@ -64,6 +67,9 @@ class ZMQTest (BitcoinTestFramework):
self.num_nodes = 2
if self.is_wallet_compiled():
self.requires_wallet = True
# This test isn't testing txn relay/timing, so set whitelist on the
# peers for instant txn relay. This speeds up the test run time 2-3x.
self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes
def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq()
@ -84,23 +90,46 @@ class ZMQTest (BitcoinTestFramework):
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
if connect_nodes:
self.connect_nodes(0, 1)
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] +
self.extra_args[0])
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
# Relax so that the subscribers are ready before publishing zmq messages
sleep(0.2)
# Ensure that all zmq publisher notification interfaces are ready by
# running the following "sync up" procedure:
# 1. Generate a block on the node
# 2. Try to receive a notification on all subscribers
# 3. If all subscribers get a message within the timeout (1 second),
# we are done, otherwise repeat starting from step 1
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, 1000)
while True:
self.nodes[0].generate(1)
recv_failed = False
for sub in subscribers:
try:
sub.receive()
except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.")
recv_failed = True
if not recv_failed:
self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
break
# set subscriber's desired timeout for the test
for sub in subscribers:
sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
self.connect_nodes(0, 1)
if sync_blocks:
self.sync_blocks()
return subscribers
@ -110,9 +139,7 @@ class ZMQTest (BitcoinTestFramework):
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = 'tcp://127.0.0.1:28332'
subs = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
connect_nodes=True)
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
hashblock = subs[0]
hashtx = subs[1]
@ -189,6 +216,7 @@ class ZMQTest (BitcoinTestFramework):
hashblock, hashtx = self.setup_zmq_test(
[(topic, address) for topic in ["hashblock", "hashtx"]],
recv_timeout=2) # 2 second timeout to check end of notifications
self.disconnect_nodes(0, 1)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@ -237,6 +265,7 @@ class ZMQTest (BitcoinTestFramework):
"""
self.log.info("Testing 'sequence' publisher")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
self.disconnect_nodes(0, 1)
# Mempool sequence number starts at 1
seq_num = 1
@ -387,7 +416,7 @@ class ZMQTest (BitcoinTestFramework):
return
self.log.info("Testing 'mempool sync' usage of sequence notifier")
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
[seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
# In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
@ -487,10 +516,13 @@ class ZMQTest (BitcoinTestFramework):
def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
# (note that after the reorg test, syncing would fail due to different
# chain lengths on node0 and node1; for this test we only need node0, so
# we can disable syncing blocks on the setup)
subscribers = self.setup_zmq_test([
("hashblock", "tcp://127.0.0.1:28334"),
("hashblock", "tcp://127.0.0.1:28335"),
])
], sync_blocks=False)
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)