diff --git a/local-tests/chainrunner/chain.py b/local-tests/chainrunner/chain.py index 36cd94de86..8329643554 100644 --- a/local-tests/chainrunner/chain.py +++ b/local-tests/chainrunner/chain.py @@ -169,14 +169,28 @@ def fork(self, forkoff_path, ws_endpoint, snapshot_file=None): chainspec = join(self.path, 'chainspec.json') snapshot = snapshot_file if snapshot_file else join(self.path, 'snapshot.json') cmd = [check_file(forkoff_path), '--ws-rpc-endpoint', ws_endpoint, - '--initial-spec-path', chainspec, - '--snapshot-path', snapshot, - '--combined-spec-path', forked] + '--initial-spec-path', chainspec, + '--snapshot-path', snapshot, + '--combined-spec-path', forked] if snapshot_file: cmd.append('--use-snapshot-file') subprocess.run(cmd, check=True) self.set_chainspec(forked) + def get_highest_imported(self, nodes=None): + """Return the maximum height such that each of the selected nodes (all nodes if None) + imported a block of such height.""" + nodes = range(len(self.nodes)) if nodes is None else nodes + nodes = [self.nodes[i] for i in nodes] + return min([n.highest_block()[0] for n in nodes], default=-1) + + def get_highest_finalized(self, nodes=None): + """Return the maximum height such that each of the selected nodes (all nodes if None) + finalized a block of such height.""" + nodes = range(len(self.nodes)) if nodes is None else nodes + nodes = [self.nodes[i] for i in nodes] + return min([n.highest_block()[1] for n in nodes], default=-1) + def wait_for_finalization(self, old_finalized, nodes=None, timeout=600, finalized_delta=3, catchup=True, catchup_delta=10): """Wait for finalization to catch up with the newest blocks. Requires providing the number of the last finalized block, which will be used as a reference against recently finalized blocks. @@ -185,12 +199,13 @@ def wait_for_finalization(self, old_finalized, nodes=None, timeout=600, finalize If `catchup` is True, wait until finalization catches up with the newly produced blocks (within `catchup_delta` blocks). 'timeout' (in seconds) is a global timeout for the whole method to execute. Raise TimeoutError if finalization fails to recover within the given timeout.""" - nodes = [self.nodes[i] for i in nodes] if nodes else self.nodes + nodes = range(len(self.nodes)) if nodes is None else nodes deadline = time.time() + timeout - while any((n.highest_block()[1] <= old_finalized + finalized_delta) for n in nodes): + while self.get_highest_finalized(nodes) <= old_finalized + finalized_delta: time.sleep(5) if time.time() > deadline: raise TimeoutError(f'Block finalization stalled after {timeout} seconds') + nodes = [self.nodes[i] for i in nodes] if catchup: def lags(node): r, f = node.highest_block() @@ -201,10 +216,19 @@ def lags(node): print(f'Finalization restored, but failed to catch up with recent blocks within {timeout} seconds') break + def wait_for_imported_at_height(self, height, nodes=None, timeout=600): + """Wait until all the selected `nodes` (all nodes if None) imported a block at height `height`""" + nodes = range(len(self.nodes)) if nodes is None else nodes + deadline = time.time() + timeout + while self.get_highest_imported(nodes) < height: + time.sleep(1) + if time.time() > deadline: + raise TimeoutError(f'Block production stalled after {timeout} seconds') + def wait_for_authorities(self, nodes=None, timeout=600): """Wait for the selected `nodes` (all validator nodes if None) to connect to all known authorities. If not successful within the given `timeout` (in seconds), raise TimeoutError.""" - nodes = [self.nodes[i] for i in nodes] if nodes else self.validator_nodes + nodes = [self.nodes[i] for i in nodes] if nodes is not None else self.validator_nodes deadline = time.time() + timeout while not all(n.check_authorities() for n in nodes): time.sleep(5) diff --git a/local-tests/test_force_reorg.py b/local-tests/test_force_reorg.py new file mode 100644 index 0000000000..df6140803b --- /dev/null +++ b/local-tests/test_force_reorg.py @@ -0,0 +1,66 @@ +#!/bin/env python +import os +from os.path import abspath, join +from time import ctime +from chainrunner import Chain, Seq, generate_keys + + +def printt(s): print(ctime() + ' | ' + s) + + +# Path to working directory, where chainspec, logs and nodes' dbs are written: +workdir = abspath(os.getenv('WORKDIR', '/tmp/workdir')) +# Path to the aleph-node binary (important DON'T use short-session feature): +binary = abspath(os.getenv('ALEPH_NODE_BINARY', join(workdir, 'aleph-node'))) + +phrases = [f'//{i}' for i in range(8)] +keys = generate_keys(binary, phrases) +chain = Chain(workdir) +printt('Bootstrapping the chain with binary') + +chain.bootstrap(binary, + keys.values(), + sudo_account_id=keys[phrases[0]], + chain_type='local') +chain.set_flags('no-mdns', + port=Seq(30334), + validator_port=Seq(30343), + ws_port=Seq(9944), + rpc_port=Seq(9933), + unit_creation_delay=200, + execution='Native') + +addresses = [n.address() for n in chain] +validator_addresses = [n.validator_address() for n in chain] +chain.set_flags(bootnodes=addresses[1]) +chain.set_flags_validator(public_addr=addresses, public_validator_addresses=validator_addresses) + +BLOCKS_PER_STAGE = 50 +chain.set_flags_validator('validator') +chain.set_flags('max_nonfinalized_blocks', max_nonfinalized_blocks=BLOCKS_PER_STAGE) + +printt('Starting the chain') +chain.start('aleph') +part1 = [0, 2, 4, 6] # Node ids partitioned into two halves +part2 = [1, 3, 5, 7] + +chain.wait_for_finalization(BLOCKS_PER_STAGE, catchup=True, catchup_delta=5) # run normally for some time + +printt('Stopping nodes: ' + ' '.join([str(n) for n in part2])) +chain.stop(nodes=part2) +f1 = chain.get_highest_finalized(nodes=part1) +chain.wait_for_imported_at_height(f1 + BLOCKS_PER_STAGE, nodes=part1) + +printt('Stopping nodes: ' + ' '.join([str(n) for n in part1])) +chain.stop(nodes=part1) + +f2 = chain.get_highest_finalized(nodes=part2) # highest finalized before stop +printt('Starting nodes: ' + ' '.join([str(n) for n in part2])) +chain.start('aleph-recovered', nodes=part2) +chain.wait_for_imported_at_height(f2 + BLOCKS_PER_STAGE, nodes=part2) + +printt('Starting nodes: ' + ' '.join([str(n) for n in part1])) +chain.start('aleph-recovered', nodes=part1) +chain.wait_for_finalization(0, catchup=True, catchup_delta=5) # wait for finalization catchup + +print('Ok')