Skip to content

Commit

Permalink
Merge branch 'main' into L1-59
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin-Radecki committed May 17, 2024
2 parents a72a233 + 6bd8d35 commit 78291e1
Show file tree
Hide file tree
Showing 20 changed files with 132 additions and 110 deletions.
12 changes: 6 additions & 6 deletions .github/scripts/run_consensus.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ function run_containers() {
for index in $(seq 0 "${authorities_count}"); do
containers+=("Node${index}")
done
docker-compose $(get_compose_file_list "${docker_compose_file}" "${override_file}") up -d "${containers[@]}"
docker compose $(get_compose_file_list "${docker_compose_file}" "${override_file}") up -d "${containers[@]}"
}

function archive_logs() {
Expand All @@ -133,7 +133,7 @@ function archive_logs() {
pushd $(mktemp -d) > /dev/null
for index in $(seq 0 "${node_count}"); do
echo "Archiving "Node${index}" logs..."
docker-compose ${compose_file_list} logs --no-color --no-log-prefix "Node${index}" > "Node${index}.log"
docker compose ${compose_file_list} logs --no-color --no-log-prefix "Node${index}" > "Node${index}.log"
done
tar -czf "${tarball_output}" Node*
popd > /dev/null
Expand Down Expand Up @@ -163,12 +163,12 @@ else
exit 1
fi

if docker inspect ${NODE_IMAGE} > /dev/null; then
echo "aleph-node image tag ${NODE_IMAGE} found locally"
if docker inspect ${CHAIN_BOOTSTRAPPER_IMAGE} > /dev/null; then
echo "chain-bootstrapper image tag ${CHAIN_BOOTSTRAPPER_IMAGE} found locally"
else
echo "${NODE_IMAGE} not found locally."
echo "${CHAIN_BOOTSTRAPPER_IMAGE} not found locally."
echo "Build image first with:"
echo "docker build -t ${NODE_IMAGE} -f docker/Dockerfile ."
echo "docker build -t ${CHAIN_BOOTSTRAPPER_IMAGE} -f ./bin/chain-bootstrapper/Dockerfile ."
exit 1
fi

Expand Down
18 changes: 18 additions & 0 deletions .github/workflows/on-pull-request-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,21 @@ jobs:
test-case: finalization::finalization
# yamllint disable-line rule:line-length
aleph-e2e-client-image: ${{ needs.build-aleph-e2e-client-image.outputs.aleph-e2e-client-image }}

run-e2e-token-transfer-test:
name: Run e2e token transfer test
needs:
- build-test-node
- build-aleph-e2e-client-image
- build-chain-bootstrapper
runs-on: ubuntu-20.04
steps:
- name: Checkout source code
uses: actions/checkout@v4

- name: Run e2e test
uses: ./.github/actions/run-e2e-test
with:
test-case: token_transfer
# yamllint disable-line rule:line-length
aleph-e2e-client-image: ${{ needs.build-aleph-e2e-client-image.outputs.aleph-e2e-client-image }}
16 changes: 6 additions & 10 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,12 @@ impl<
warn!(target: "aleph-aggregator", "failed broadcasting a message from rmc: {:?}", e);
}
}
message_from_network = self.network.next() => match message_from_network {
Some(message) => {
trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message);
if let Some(multisigned) = self.rmc_service.process_message(message) {
self.multisigned_events.push_back(multisigned);
}
},
None => {
// In case the network is down we can terminate (?).
return Err(IOError::NetworkChannelClosed);
message_from_network = self.network.next() => {
// In case the network is down we can terminate (?).
let message = message_from_network.ok_or(IOError::NetworkChannelClosed)?;
trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message);
if let Some(multisigned) = self.rmc_service.process_message(message) {
self.multisigned_events.push_back(multisigned);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion clique/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod testing;

pub use crypto::{PublicKey, SecretKey};
pub use rate_limiting::{RateLimitingDialer, RateLimitingListener};
pub use service::{Service, SpawnHandleT};
pub use service::{Service, SpawnHandleExt, SpawnHandleT};

const LOG_TARGET: &str = "network-clique";
/// A basic alias for properties we expect basic data to satisfy.
Expand Down
24 changes: 23 additions & 1 deletion clique/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::{
mpsc::{self, UnboundedSender},
oneshot,
},
Future, StreamExt,
Future, StreamExt, TryFutureExt,
};
use log::{info, trace, warn};
use substrate_prometheus_endpoint::Registry;
Expand Down Expand Up @@ -83,7 +83,9 @@ impl<PK: PublicKey, D: Data, A: Data> Network<PK, A, D> for ServiceInterface<PK,
pub trait SpawnHandleT {
/// Run task
fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static);
}

pub trait SpawnHandleExt: SpawnHandleT {
/// Run an essential task
fn spawn_essential(
&self,
Expand All @@ -92,6 +94,26 @@ pub trait SpawnHandleT {
) -> Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
}

impl<SH: SpawnHandleT> SpawnHandleExt for SH {
fn spawn_essential(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) -> Pin<Box<dyn Future<Output = Result<(), ()>> + Send>> {
let (tx, rx) = oneshot::channel();
self.spawn(name, async move {
task.await;
let _ = tx.send(());
});
Box::pin(rx.map_err(move |_| {
warn!(
target: LOG_TARGET,
"Task '{name}' exited early."
)
}))
}
}

#[derive(Debug)]
pub enum Error {
Commands,
Expand Down
8 changes: 0 additions & 8 deletions clique/src/testing/clique_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ impl SpawnHandleT for Spawner {
fn spawn(&self, name: &'static str, task: impl futures::Future<Output = ()> + Send + 'static) {
SpawnHandle::spawn(self, name, task)
}

fn spawn_essential(
&self,
name: &'static str,
task: impl futures::Future<Output = ()> + Send + 'static,
) -> std::pin::Pin<Box<dyn futures::Future<Output = Result<(), ()>> + Send>> {
SpawnHandle::spawn_essential(self, name, task)
}
}

pub const LOG_TARGET: &str = "network-clique-test";
Expand Down
2 changes: 1 addition & 1 deletion e2e-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion finality-aleph/src/abft/current/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use current_aleph_bft::{create_config, default_delay_config, Config, LocalIO, Terminator};
use log::debug;
use network_clique::SpawnHandleT;
use network_clique::SpawnHandleExt;

mod network;
mod traits;
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/abft/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{pin::Pin, time::Duration};

use legacy_aleph_bft::{create_config, default_delay_config, Config, LocalIO, Terminator};
use log::debug;
use network_clique::SpawnHandleT;
use network_clique::SpawnHandleExt;

mod network;
mod traits;
Expand Down
23 changes: 5 additions & 18 deletions finality-aleph/src/abft/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use std::{cmp::Ordering, fmt::Debug, hash::Hash as StdHash, marker::PhantomData, pin::Pin};

use futures::{channel::oneshot, Future, TryFutureExt};
use network_clique::SpawnHandleT;
use futures::{channel::oneshot, Future};
use network_clique::{SpawnHandleExt, SpawnHandleT};
use parity_scale_codec::{Codec, Decode, Encode};
use sc_service::SpawnTaskHandle;
use sp_runtime::traits::Hash as SpHash;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl SpawnHandle {
let result = task.await;
let _ = tx.send(result);
};
let result = <Self as SpawnHandleT>::spawn_essential(self, name, wrapped_task);
let result = <Self as SpawnHandleExt>::spawn_essential(self, name, wrapped_task);
let wrapped_result = async move {
let main_result = result.await;
if main_result.is_err() {
Expand All @@ -111,19 +111,6 @@ impl SpawnHandleT for SpawnHandle {
fn spawn(&self, name: &'static str, task: impl Future<Output = ()> + Send + 'static) {
self.0.spawn(name, None, task)
}

fn spawn_essential(
&self,
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) -> Pin<Box<dyn Future<Output = Result<(), ()>> + Send>> {
let (tx, rx) = oneshot::channel();
self.spawn(name, async move {
task.await;
let _ = tx.send(());
});
Box::pin(rx.map_err(|_| ()))
}
}

impl current_aleph_bft::SpawnHandle for SpawnHandle {
Expand All @@ -136,7 +123,7 @@ impl current_aleph_bft::SpawnHandle for SpawnHandle {
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) -> current_aleph_bft::TaskHandle {
SpawnHandleT::spawn_essential(self, name, task)
SpawnHandleExt::spawn_essential(self, name, task)
}
}

Expand All @@ -150,6 +137,6 @@ impl legacy_aleph_bft::SpawnHandle for SpawnHandle {
name: &'static str,
task: impl Future<Output = ()> + Send + 'static,
) -> legacy_aleph_bft::TaskHandle {
SpawnHandleT::spawn_essential(self, name, task)
SpawnHandleExt::spawn_essential(self, name, task)
}
}
8 changes: 6 additions & 2 deletions finality-aleph/src/network/data/component.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fmt::Display, marker::PhantomData};

use futures::{channel::mpsc, StreamExt};
use futures::{channel::mpsc, stream::FusedStream, StreamExt};
use log::warn;

use crate::{
Expand Down Expand Up @@ -135,7 +135,11 @@ impl<D: Data> Sender<D> for mpsc::UnboundedSender<(D, Recipient)> {
#[async_trait::async_trait]
impl<D: Data> Receiver<D> for mpsc::UnboundedReceiver<D> {
async fn next(&mut self) -> Option<D> {
StreamExt::next(self).await
if self.is_terminated() {
None
} else {
StreamExt::next(self).await
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/data/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn forward_or_wait<
RightData: Data,
R: Receiver<Split<LeftData, RightData>>,
>(
receiver: &Arc<Mutex<R>>,
receiver: &Mutex<R>,
left_sender: &mpsc::UnboundedSender<LeftData>,
right_sender: &mpsc::UnboundedSender<RightData>,
name: &str,
Expand Down
19 changes: 9 additions & 10 deletions finality-aleph/src/network/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,15 @@ impl<D: Data> GossipNetwork<D> for ProtocolNetwork {
tokio::select! {
maybe_event = self.service.next_event() => {
let event = maybe_event.ok_or(Self::Error::NetworkStreamTerminated)?;
if let Some((message, peer_id)) = self.handle_network_event(event) {
match D::decode_all(&mut &message[..]) {
Ok(message) => return Ok((message, peer_id)),
Err(e) => {
warn!(
target: LOG_TARGET,
"Error decoding message: {}", e
)
},
}
let Some((message, peer_id)) = self.handle_network_event(event) else { continue };
match D::decode_all(&mut &message[..]) {
Ok(message) => return Ok((message, peer_id)),
Err(e) => {
warn!(
target: LOG_TARGET,
"Error decoding message: {}", e
)
},
}
},
_ = status_ticker.tick() => {
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/party/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use async_trait::async_trait;
use futures::channel::oneshot;
use log::{debug, info, trace, warn};
use network_clique::SpawnHandleT;
use network_clique::SpawnHandleExt;
use pallet_aleph_runtime_api::AlephSessionApi;
use sc_keystore::{Keystore, LocalKeystore};
use sp_application_crypto::RuntimeAppPublic;
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/party/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{boxed::Box, pin::Pin};

use futures::channel::oneshot;
use log::{debug, warn};
use network_clique::SpawnHandleT;
use network_clique::SpawnHandleExt;

use crate::{Future, SpawnHandle};

Expand Down
Loading

0 comments on commit 78291e1

Please sign in to comment.