Skip to content

Commit

Permalink
A0-2753: Local test for block sync in presence of reorgs (#1301)
Browse files Browse the repository at this point in the history
# Description
A test which ensures that block sync works in presence of forks.
The test implements the following scenario:
1. split validators into two halves: `X, Y` and let them produce blocks
for a while
2. kill `X`, let `Y` produce blocks for a while
3. kill `Y`, recover `X` and let it produce blocks for a while
4. recover `Y`, both `X` and `Y` should discover a fork and one of them
should perform a reorg

## Type of change

local test
  • Loading branch information
woocash2 authored Jul 25, 2023
1 parent 8669332 commit 833c54c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
36 changes: 30 additions & 6 deletions local-tests/chainrunner/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,28 @@ def fork(self, forkoff_path, ws_endpoint, snapshot_file=None):
chainspec = join(self.path, 'chainspec.json')
snapshot = snapshot_file if snapshot_file else join(self.path, 'snapshot.json')
cmd = [check_file(forkoff_path), '--ws-rpc-endpoint', ws_endpoint,
'--initial-spec-path', chainspec,
'--snapshot-path', snapshot,
'--combined-spec-path', forked]
'--initial-spec-path', chainspec,
'--snapshot-path', snapshot,
'--combined-spec-path', forked]
if snapshot_file:
cmd.append('--use-snapshot-file')
subprocess.run(cmd, check=True)
self.set_chainspec(forked)

def get_highest_imported(self, nodes=None):
"""Return the maximum height such that each of the selected nodes (all nodes if None)
imported a block of such height."""
nodes = range(len(self.nodes)) if nodes is None else nodes
nodes = [self.nodes[i] for i in nodes]
return min([n.highest_block()[0] for n in nodes], default=-1)

def get_highest_finalized(self, nodes=None):
"""Return the maximum height such that each of the selected nodes (all nodes if None)
finalized a block of such height."""
nodes = range(len(self.nodes)) if nodes is None else nodes
nodes = [self.nodes[i] for i in nodes]
return min([n.highest_block()[1] for n in nodes], default=-1)

def wait_for_finalization(self, old_finalized, nodes=None, timeout=600, finalized_delta=3, catchup=True, catchup_delta=10):
"""Wait for finalization to catch up with the newest blocks. Requires providing the number
of the last finalized block, which will be used as a reference against recently finalized blocks.
Expand All @@ -185,12 +199,13 @@ def wait_for_finalization(self, old_finalized, nodes=None, timeout=600, finalize
If `catchup` is True, wait until finalization catches up with the newly produced blocks
(within `catchup_delta` blocks). 'timeout' (in seconds) is a global timeout for the whole method
to execute. Raise TimeoutError if finalization fails to recover within the given timeout."""
nodes = [self.nodes[i] for i in nodes] if nodes else self.nodes
nodes = range(len(self.nodes)) if nodes is None else nodes
deadline = time.time() + timeout
while any((n.highest_block()[1] <= old_finalized + finalized_delta) for n in nodes):
while self.get_highest_finalized(nodes) <= old_finalized + finalized_delta:
time.sleep(5)
if time.time() > deadline:
raise TimeoutError(f'Block finalization stalled after {timeout} seconds')
nodes = [self.nodes[i] for i in nodes]
if catchup:
def lags(node):
r, f = node.highest_block()
Expand All @@ -201,10 +216,19 @@ def lags(node):
print(f'Finalization restored, but failed to catch up with recent blocks within {timeout} seconds')
break

def wait_for_imported_at_height(self, height, nodes=None, timeout=600):
"""Wait until all the selected `nodes` (all nodes if None) imported a block at height `height`"""
nodes = range(len(self.nodes)) if nodes is None else nodes
deadline = time.time() + timeout
while self.get_highest_imported(nodes) < height:
time.sleep(1)
if time.time() > deadline:
raise TimeoutError(f'Block production stalled after {timeout} seconds')

def wait_for_authorities(self, nodes=None, timeout=600):
"""Wait for the selected `nodes` (all validator nodes if None) to connect to all known authorities.
If not successful within the given `timeout` (in seconds), raise TimeoutError."""
nodes = [self.nodes[i] for i in nodes] if nodes else self.validator_nodes
nodes = [self.nodes[i] for i in nodes] if nodes is not None else self.validator_nodes
deadline = time.time() + timeout
while not all(n.check_authorities() for n in nodes):
time.sleep(5)
Expand Down
66 changes: 66 additions & 0 deletions local-tests/test_force_reorg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/env python
import os
from os.path import abspath, join
from time import ctime
from chainrunner import Chain, Seq, generate_keys


def printt(s): print(ctime() + ' | ' + s)


# Path to working directory, where chainspec, logs and nodes' dbs are written:
workdir = abspath(os.getenv('WORKDIR', '/tmp/workdir'))
# Path to the aleph-node binary (important DON'T use short-session feature):
binary = abspath(os.getenv('ALEPH_NODE_BINARY', join(workdir, 'aleph-node')))

phrases = [f'//{i}' for i in range(8)]
keys = generate_keys(binary, phrases)
chain = Chain(workdir)
printt('Bootstrapping the chain with binary')

chain.bootstrap(binary,
keys.values(),
sudo_account_id=keys[phrases[0]],
chain_type='local')
chain.set_flags('no-mdns',
port=Seq(30334),
validator_port=Seq(30343),
ws_port=Seq(9944),
rpc_port=Seq(9933),
unit_creation_delay=200,
execution='Native')

addresses = [n.address() for n in chain]
validator_addresses = [n.validator_address() for n in chain]
chain.set_flags(bootnodes=addresses[1])
chain.set_flags_validator(public_addr=addresses, public_validator_addresses=validator_addresses)

BLOCKS_PER_STAGE = 50
chain.set_flags_validator('validator')
chain.set_flags('max_nonfinalized_blocks', max_nonfinalized_blocks=BLOCKS_PER_STAGE)

printt('Starting the chain')
chain.start('aleph')
part1 = [0, 2, 4, 6] # Node ids partitioned into two halves
part2 = [1, 3, 5, 7]

chain.wait_for_finalization(BLOCKS_PER_STAGE, catchup=True, catchup_delta=5) # run normally for some time

printt('Stopping nodes: ' + ' '.join([str(n) for n in part2]))
chain.stop(nodes=part2)
f1 = chain.get_highest_finalized(nodes=part1)
chain.wait_for_imported_at_height(f1 + BLOCKS_PER_STAGE, nodes=part1)

printt('Stopping nodes: ' + ' '.join([str(n) for n in part1]))
chain.stop(nodes=part1)

f2 = chain.get_highest_finalized(nodes=part2) # highest finalized before stop
printt('Starting nodes: ' + ' '.join([str(n) for n in part2]))
chain.start('aleph-recovered', nodes=part2)
chain.wait_for_imported_at_height(f2 + BLOCKS_PER_STAGE, nodes=part2)

printt('Starting nodes: ' + ' '.join([str(n) for n in part1]))
chain.start('aleph-recovered', nodes=part1)
chain.wait_for_finalization(0, catchup=True, catchup_delta=5) # wait for finalization catchup

print('Ok')

0 comments on commit 833c54c

Please sign in to comment.