From 91831f0335f9a2476a4abc57c3cc8d95d1b177c4 Mon Sep 17 00:00:00 2001 From: Marcin Date: Fri, 17 May 2024 10:52:05 +0000 Subject: [PATCH 1/2] A0-4306: Migrate to docker compose from docker-compose (#1733) # Description * Migrate to docker compose from docker-compose, as the former works both on ubuntu 20 and 22. * Add a token transfer test to the PR pipeline. It is important to see if the chain's basic functionality works. It is currently expected to fail, as it does on `main`. * fix some logging in `run_conensus.sh` ## Type of change Please delete options that are not relevant. - New feature (non-breaking change which adds functionality) - bug fix --- .github/scripts/run_consensus.sh | 12 ++++++------ .github/workflows/on-pull-request-commit.yml | 18 ++++++++++++++++++ e2e-tests/Cargo.lock | 2 +- scripts/run_local_e2e_pipeline.sh | 7 ++++--- scripts/synthetic-network/README.md | 2 +- .../run_script_for_synthetic-network.sh | 2 +- 6 files changed, 31 insertions(+), 12 deletions(-) diff --git a/.github/scripts/run_consensus.sh b/.github/scripts/run_consensus.sh index eb52abba26..b53f65d7c5 100755 --- a/.github/scripts/run_consensus.sh +++ b/.github/scripts/run_consensus.sh @@ -118,7 +118,7 @@ function run_containers() { for index in $(seq 0 "${authorities_count}"); do containers+=("Node${index}") done - docker-compose $(get_compose_file_list "${docker_compose_file}" "${override_file}") up -d "${containers[@]}" + docker compose $(get_compose_file_list "${docker_compose_file}" "${override_file}") up -d "${containers[@]}" } function archive_logs() { @@ -133,7 +133,7 @@ function archive_logs() { pushd $(mktemp -d) > /dev/null for index in $(seq 0 "${node_count}"); do echo "Archiving "Node${index}" logs..." - docker-compose ${compose_file_list} logs --no-color --no-log-prefix "Node${index}" > "Node${index}.log" + docker compose ${compose_file_list} logs --no-color --no-log-prefix "Node${index}" > "Node${index}.log" done tar -czf "${tarball_output}" Node* popd > /dev/null @@ -163,12 +163,12 @@ else exit 1 fi -if docker inspect ${NODE_IMAGE} > /dev/null; then - echo "aleph-node image tag ${NODE_IMAGE} found locally" +if docker inspect ${CHAIN_BOOTSTRAPPER_IMAGE} > /dev/null; then + echo "chain-bootstrapper image tag ${CHAIN_BOOTSTRAPPER_IMAGE} found locally" else - echo "${NODE_IMAGE} not found locally." + echo "${CHAIN_BOOTSTRAPPER_IMAGE} not found locally." echo "Build image first with:" - echo "docker build -t ${NODE_IMAGE} -f docker/Dockerfile ." + echo "docker build -t ${CHAIN_BOOTSTRAPPER_IMAGE} -f ./bin/chain-bootstrapper/Dockerfile ." exit 1 fi diff --git a/.github/workflows/on-pull-request-commit.yml b/.github/workflows/on-pull-request-commit.yml index d4e56ca0c6..1775c57335 100644 --- a/.github/workflows/on-pull-request-commit.yml +++ b/.github/workflows/on-pull-request-commit.yml @@ -75,3 +75,21 @@ jobs: test-case: finalization::finalization # yamllint disable-line rule:line-length aleph-e2e-client-image: ${{ needs.build-aleph-e2e-client-image.outputs.aleph-e2e-client-image }} + + run-e2e-token-transfer-test: + name: Run e2e token transfer test + needs: + - build-test-node + - build-aleph-e2e-client-image + - build-chain-bootstrapper + runs-on: ubuntu-20.04 + steps: + - name: Checkout source code + uses: actions/checkout@v4 + + - name: Run e2e test + uses: ./.github/actions/run-e2e-test + with: + test-case: token_transfer + # yamllint disable-line rule:line-length + aleph-e2e-client-image: ${{ needs.build-aleph-e2e-client-image.outputs.aleph-e2e-client-image }} diff --git a/e2e-tests/Cargo.lock b/e2e-tests/Cargo.lock index c83deaebb8..1efc5fe605 100644 --- a/e2e-tests/Cargo.lock +++ b/e2e-tests/Cargo.lock @@ -139,7 +139,7 @@ dependencies = [ [[package]] name = "aleph_client" -version = "3.14.0" +version = "3.15.0" dependencies = [ "anyhow", "async-trait", diff --git a/scripts/run_local_e2e_pipeline.sh b/scripts/run_local_e2e_pipeline.sh index 57e9c10430..cdfab4ed97 100755 --- a/scripts/run_local_e2e_pipeline.sh +++ b/scripts/run_local_e2e_pipeline.sh @@ -2,11 +2,12 @@ set -e -# build release binary -cargo build --release -p aleph-node --features "short_session enable_treasury_proposals" -# build docker image +cargo build --release -p aleph-node docker build --tag aleph-node:latest -f ./docker/Dockerfile . +cargo build --release -p chain-bootstrapper --features short_session +docker build --tag chain-bootstrapper:latest -f ./bin/chain-bootstrapper/Dockerfile . + # run the chain and the tests in two separate tmux windows tmux new-session -d -s aleph0 './.github/scripts/run_consensus.sh'; tmux new-window -t "aleph0:1"; diff --git a/scripts/synthetic-network/README.md b/scripts/synthetic-network/README.md index f4dccce018..1a7043a7c9 100644 --- a/scripts/synthetic-network/README.md +++ b/scripts/synthetic-network/README.md @@ -10,7 +10,7 @@ page, i.e. :3001 is Node1, ...). Main file in this folder is `run_consensus_synthetic-network.sh`. It builds a docker-image containing `aleph-node` and some arbitrary set of networking and debugging tools. It also consists of files required to spawn an instance of the -synthetic-network. Its requirements are: docker, docker-compose, git, `aleph-node:latest` docker-image. +synthetic-network. Its requirements are: docker, docker compose, git, `aleph-node:latest` docker-image. `set_defaults_synthetic-network.sh` allows you to reset settings of the synthetic-network to some sane defaults. You might need to use it when you set too restrictive values for some of its parameters, i.e. rate limit that make you unable to further diff --git a/scripts/synthetic-network/run_script_for_synthetic-network.sh b/scripts/synthetic-network/run_script_for_synthetic-network.sh index 7a8e88b487..03284ffa8e 100755 --- a/scripts/synthetic-network/run_script_for_synthetic-network.sh +++ b/scripts/synthetic-network/run_script_for_synthetic-network.sh @@ -10,7 +10,7 @@ Usage: $0 This script allows you to run a custom .js script using the synthetic-network network simulation tool. IMPORTANT: first you need to call 'scripts/run_consensus_synthetic-network.sh' and let it run in background. - It spawns docker-compose configured with synthetic-network. + It spawns docker compose configured with synthetic-network. It requires node.js to run. --script-path scripts/vendor/synthetic-network/frontend/udp_rate_sine_demo.js path to a synthetic-network scrypt. Default is a demo scripts/vendor/synthetic-network/frontend/udp_rate_sine_demo.js From 6bd8d352ce40ee5b637359d3e01563da48c38c4a Mon Sep 17 00:00:00 2001 From: fixxxedpoint Date: Fri, 17 May 2024 13:28:52 +0200 Subject: [PATCH 2/2] A0-2963: select-loop imprv (#1728) # Description Few small improvements around async/select-loops. ## Type of change Please delete options that are not relevant. - Bug fix (non-breaking change which fixes an issue) --- aggregator/src/aggregator.rs | 16 ++-- clique/src/lib.rs | 2 +- clique/src/service.rs | 24 +++++- clique/src/testing/clique_network.rs | 8 -- finality-aleph/src/abft/current/mod.rs | 2 +- finality-aleph/src/abft/legacy/mod.rs | 2 +- finality-aleph/src/abft/traits.rs | 23 ++---- finality-aleph/src/network/data/component.rs | 8 +- finality-aleph/src/network/data/split.rs | 2 +- finality-aleph/src/network/substrate.rs | 19 ++--- finality-aleph/src/party/manager/mod.rs | 2 +- finality-aleph/src/party/manager/task.rs | 2 +- finality-aleph/src/party/mod.rs | 87 ++++++++++---------- finality-aleph/src/sync/service.rs | 2 + 14 files changed, 101 insertions(+), 98 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 03b8aef33b..8848fe6ec0 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -171,16 +171,12 @@ impl< warn!(target: "aleph-aggregator", "failed broadcasting a message from rmc: {:?}", e); } } - message_from_network = self.network.next() => match message_from_network { - Some(message) => { - trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message); - if let Some(multisigned) = self.rmc_service.process_message(message) { - self.multisigned_events.push_back(multisigned); - } - }, - None => { - // In case the network is down we can terminate (?). - return Err(IOError::NetworkChannelClosed); + message_from_network = self.network.next() => { + // In case the network is down we can terminate (?). + let message = message_from_network.ok_or(IOError::NetworkChannelClosed)?; + trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message); + if let Some(multisigned) = self.rmc_service.process_message(message) { + self.multisigned_events.push_back(multisigned); } } } diff --git a/clique/src/lib.rs b/clique/src/lib.rs index 8df5b538b9..c91ccad459 100644 --- a/clique/src/lib.rs +++ b/clique/src/lib.rs @@ -24,7 +24,7 @@ mod testing; pub use crypto::{PublicKey, SecretKey}; pub use rate_limiting::{RateLimitingDialer, RateLimitingListener}; -pub use service::{Service, SpawnHandleT}; +pub use service::{Service, SpawnHandleExt, SpawnHandleT}; const LOG_TARGET: &str = "network-clique"; /// A basic alias for properties we expect basic data to satisfy. diff --git a/clique/src/service.rs b/clique/src/service.rs index 908608e009..852b2c3bd4 100644 --- a/clique/src/service.rs +++ b/clique/src/service.rs @@ -9,7 +9,7 @@ use futures::{ mpsc::{self, UnboundedSender}, oneshot, }, - Future, StreamExt, + Future, StreamExt, TryFutureExt, }; use log::{info, trace, warn}; use substrate_prometheus_endpoint::Registry; @@ -83,7 +83,9 @@ impl Network for ServiceInterface + Send + 'static); +} +pub trait SpawnHandleExt: SpawnHandleT { /// Run an essential task fn spawn_essential( &self, @@ -92,6 +94,26 @@ pub trait SpawnHandleT { ) -> Pin> + Send>>; } +impl SpawnHandleExt for SH { + fn spawn_essential( + &self, + name: &'static str, + task: impl Future + Send + 'static, + ) -> Pin> + Send>> { + let (tx, rx) = oneshot::channel(); + self.spawn(name, async move { + task.await; + let _ = tx.send(()); + }); + Box::pin(rx.map_err(move |_| { + warn!( + target: LOG_TARGET, + "Task '{name}' exited early." + ) + })) + } +} + #[derive(Debug)] pub enum Error { Commands, diff --git a/clique/src/testing/clique_network.rs b/clique/src/testing/clique_network.rs index f4be043a4f..c4f9d926f3 100644 --- a/clique/src/testing/clique_network.rs +++ b/clique/src/testing/clique_network.rs @@ -26,14 +26,6 @@ impl SpawnHandleT for Spawner { fn spawn(&self, name: &'static str, task: impl futures::Future + Send + 'static) { SpawnHandle::spawn(self, name, task) } - - fn spawn_essential( - &self, - name: &'static str, - task: impl futures::Future + Send + 'static, - ) -> std::pin::Pin> + Send>> { - SpawnHandle::spawn_essential(self, name, task) - } } pub const LOG_TARGET: &str = "network-clique-test"; diff --git a/finality-aleph/src/abft/current/mod.rs b/finality-aleph/src/abft/current/mod.rs index adea4d9379..36eac05c3e 100644 --- a/finality-aleph/src/abft/current/mod.rs +++ b/finality-aleph/src/abft/current/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; use current_aleph_bft::{create_config, default_delay_config, Config, LocalIO, Terminator}; use log::debug; -use network_clique::SpawnHandleT; +use network_clique::SpawnHandleExt; mod network; mod traits; diff --git a/finality-aleph/src/abft/legacy/mod.rs b/finality-aleph/src/abft/legacy/mod.rs index 98bb765e93..5a92c269fc 100644 --- a/finality-aleph/src/abft/legacy/mod.rs +++ b/finality-aleph/src/abft/legacy/mod.rs @@ -2,7 +2,7 @@ use std::{pin::Pin, time::Duration}; use legacy_aleph_bft::{create_config, default_delay_config, Config, LocalIO, Terminator}; use log::debug; -use network_clique::SpawnHandleT; +use network_clique::SpawnHandleExt; mod network; mod traits; diff --git a/finality-aleph/src/abft/traits.rs b/finality-aleph/src/abft/traits.rs index 526920f6cc..b6b742a919 100644 --- a/finality-aleph/src/abft/traits.rs +++ b/finality-aleph/src/abft/traits.rs @@ -2,8 +2,8 @@ use std::{cmp::Ordering, fmt::Debug, hash::Hash as StdHash, marker::PhantomData, pin::Pin}; -use futures::{channel::oneshot, Future, TryFutureExt}; -use network_clique::SpawnHandleT; +use futures::{channel::oneshot, Future}; +use network_clique::{SpawnHandleExt, SpawnHandleT}; use parity_scale_codec::{Codec, Decode, Encode}; use sc_service::SpawnTaskHandle; use sp_runtime::traits::Hash as SpHash; @@ -88,7 +88,7 @@ impl SpawnHandle { let result = task.await; let _ = tx.send(result); }; - let result = ::spawn_essential(self, name, wrapped_task); + let result = ::spawn_essential(self, name, wrapped_task); let wrapped_result = async move { let main_result = result.await; if main_result.is_err() { @@ -111,19 +111,6 @@ impl SpawnHandleT for SpawnHandle { fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { self.0.spawn(name, None, task) } - - fn spawn_essential( - &self, - name: &'static str, - task: impl Future + Send + 'static, - ) -> Pin> + Send>> { - let (tx, rx) = oneshot::channel(); - self.spawn(name, async move { - task.await; - let _ = tx.send(()); - }); - Box::pin(rx.map_err(|_| ())) - } } impl current_aleph_bft::SpawnHandle for SpawnHandle { @@ -136,7 +123,7 @@ impl current_aleph_bft::SpawnHandle for SpawnHandle { name: &'static str, task: impl Future + Send + 'static, ) -> current_aleph_bft::TaskHandle { - SpawnHandleT::spawn_essential(self, name, task) + SpawnHandleExt::spawn_essential(self, name, task) } } @@ -150,6 +137,6 @@ impl legacy_aleph_bft::SpawnHandle for SpawnHandle { name: &'static str, task: impl Future + Send + 'static, ) -> legacy_aleph_bft::TaskHandle { - SpawnHandleT::spawn_essential(self, name, task) + SpawnHandleExt::spawn_essential(self, name, task) } } diff --git a/finality-aleph/src/network/data/component.rs b/finality-aleph/src/network/data/component.rs index f9cfd6458c..22a90c4ebb 100644 --- a/finality-aleph/src/network/data/component.rs +++ b/finality-aleph/src/network/data/component.rs @@ -1,6 +1,6 @@ use std::{fmt::Display, marker::PhantomData}; -use futures::{channel::mpsc, StreamExt}; +use futures::{channel::mpsc, stream::FusedStream, StreamExt}; use log::warn; use crate::{ @@ -135,7 +135,11 @@ impl Sender for mpsc::UnboundedSender<(D, Recipient)> { #[async_trait::async_trait] impl Receiver for mpsc::UnboundedReceiver { async fn next(&mut self) -> Option { - StreamExt::next(self).await + if self.is_terminated() { + None + } else { + StreamExt::next(self).await + } } } diff --git a/finality-aleph/src/network/data/split.rs b/finality-aleph/src/network/data/split.rs index 7d1a4d50c5..05725187ee 100644 --- a/finality-aleph/src/network/data/split.rs +++ b/finality-aleph/src/network/data/split.rs @@ -142,7 +142,7 @@ async fn forward_or_wait< RightData: Data, R: Receiver>, >( - receiver: &Arc>, + receiver: &Mutex, left_sender: &mpsc::UnboundedSender, right_sender: &mpsc::UnboundedSender, name: &str, diff --git a/finality-aleph/src/network/substrate.rs b/finality-aleph/src/network/substrate.rs index db7aabe8c6..ac3ab1db42 100644 --- a/finality-aleph/src/network/substrate.rs +++ b/finality-aleph/src/network/substrate.rs @@ -149,16 +149,15 @@ impl GossipNetwork for ProtocolNetwork { tokio::select! { maybe_event = self.service.next_event() => { let event = maybe_event.ok_or(Self::Error::NetworkStreamTerminated)?; - if let Some((message, peer_id)) = self.handle_network_event(event) { - match D::decode_all(&mut &message[..]) { - Ok(message) => return Ok((message, peer_id)), - Err(e) => { - warn!( - target: LOG_TARGET, - "Error decoding message: {}", e - ) - }, - } + let Some((message, peer_id)) = self.handle_network_event(event) else { continue }; + match D::decode_all(&mut &message[..]) { + Ok(message) => return Ok((message, peer_id)), + Err(e) => { + warn!( + target: LOG_TARGET, + "Error decoding message: {}", e + ) + }, } }, _ = status_ticker.tick() => { diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index a38d706252..29cb5b0674 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use async_trait::async_trait; use futures::channel::oneshot; use log::{debug, info, trace, warn}; -use network_clique::SpawnHandleT; +use network_clique::SpawnHandleExt; use sc_keystore::{Keystore, LocalKeystore}; use sp_application_crypto::RuntimeAppPublic; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; diff --git a/finality-aleph/src/party/manager/task.rs b/finality-aleph/src/party/manager/task.rs index f41c02fa4e..a7ad40802f 100644 --- a/finality-aleph/src/party/manager/task.rs +++ b/finality-aleph/src/party/manager/task.rs @@ -2,7 +2,7 @@ use std::{boxed::Box, pin::Pin}; use futures::channel::oneshot; use log::{debug, warn}; -use network_clique::SpawnHandleT; +use network_clique::SpawnHandleExt; use crate::{Future, SpawnHandle}; diff --git a/finality-aleph/src/party/mod.rs b/finality-aleph/src/party/mod.rs index b6f8cbcdcc..2bf012f9e6 100644 --- a/finality-aleph/src/party/mod.rs +++ b/finality-aleph/src/party/mod.rs @@ -1,7 +1,9 @@ use std::{default::Default, path::PathBuf, time::Duration}; +use futures::FutureExt; use futures_timer::Delay; use log::{debug, error, info, trace, warn}; +use primitives::AuthorityId; use tokio::{task::spawn_blocking, time::sleep}; use crate::{ @@ -71,6 +73,32 @@ where } } + fn try_start_next_session( + &self, + next_session_id: SessionId, + next_session_authorities: &[AuthorityId], + ) { + match self.session_manager.node_idx(next_session_authorities) { + Some(next_session_node_id) => { + if let Err(e) = self.session_manager.early_start_validator_session( + next_session_id, + next_session_node_id, + next_session_authorities, + ) { + warn!(target: "aleph-party", "Failed to early start validator session{:?}: {}", next_session_id, e); + } + } + None => { + if let Err(e) = self + .session_manager + .start_nonvalidator_session(next_session_id, next_session_authorities) + { + warn!(target: "aleph-party", "Failed to early start nonvalidator session{:?}: {}", next_session_id, e); + } + } + } + } + async fn run_session(&mut self, session_id: SessionId) { let last_block = self.session_info.last_block_of_session(session_id); if session_id.0.checked_sub(1).is_some() { @@ -153,11 +181,12 @@ where }; let mut check_session_status = Delay::new(SESSION_STATUS_CHECK_PERIOD); let next_session_id = SessionId(session_id.0 + 1); - let mut start_next_session_network = Some( - self.session_authorities - .subscribe_to_insertion(next_session_id) - .await, - ); + let mut start_next_session_network = self + .session_authorities + .subscribe_to_insertion(next_session_id) + .await + .fuse(); + loop { tokio::select! { _ = &mut check_session_status => { @@ -166,47 +195,19 @@ where debug!(target: "aleph-party", "Terminating session {:?}", session_id); break; } - check_session_status = Delay::new(SESSION_STATUS_CHECK_PERIOD); + check_session_status.reset(SESSION_STATUS_CHECK_PERIOD); }, - Some(next_session_authority_data) = async { - match &mut start_next_session_network { - Some(notification) => { - match notification.await { - Err(e) => { - warn!(target: "aleph-party", "Error with subscription {:?}", e); - start_next_session_network = Some(self.session_authorities.subscribe_to_insertion(next_session_id).await); - None - }, - Ok(next_session_authority_data) => { - Some(next_session_authority_data) - } - } + next_session_authority_data = &mut start_next_session_network => { + let next_session_authority_data = match next_session_authority_data { + Ok(data) => data, + Err(e) => { + warn!(target: "aleph-party", "Error with subscription {:?}", e); + start_next_session_network = self.session_authorities.subscribe_to_insertion(next_session_id).await.fuse(); + continue; }, - None => None, - } - } => { + }; let next_session_authorities = next_session_authority_data.authorities(); - match self.session_manager.node_idx(next_session_authorities) { - Some(next_session_node_id) => if let Err(e) = self - .session_manager - .early_start_validator_session( - next_session_id, - next_session_node_id, - next_session_authorities, - ) - { - warn!(target: "aleph-party", "Failed to early start validator session{:?}: {}", next_session_id, e); - } - None => { - if let Err(e) = self - .session_manager - .start_nonvalidator_session(next_session_id, next_session_authorities) - { - warn!(target: "aleph-party", "Failed to early start nonvalidator session{:?}: {}", next_session_id, e); - } - } - } - start_next_session_network = None; + self.try_start_next_session(next_session_id, next_session_authorities); }, Some(_) = async { match maybe_authority_task.as_mut() { diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index c316bc9b6d..d50adbbe53 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -736,6 +736,7 @@ where debug!(target: LOG_TARGET, "Received new justification from user: {:?}.", justification); self.handle_justification_from_user(justification); }, + maybe_header = self.block_requests_from_user.next() => { let header = maybe_header.ok_or(Error::BlockRequestChannelClosed)?; debug!(target: LOG_TARGET, "Received new internal block request from user: {:?}.", header); @@ -747,6 +748,7 @@ where debug!(target: LOG_TARGET, "Received new own block: {:?}.", block.header().id()); self.handle_own_block(block); }, + _ = status_ticker.tick() => { info!(target: LOG_TARGET, "{}", self.handler.status()); },