From 7ec8944a03bc411a45ddb8d4229f1d2b9b8eecbf Mon Sep 17 00:00:00 2001 From: timorleph Date: Wed, 6 Nov 2024 16:22:46 +0100 Subject: [PATCH] Tiny preparations for dismemberment --- consensus/src/member.rs | 7 +- consensus/src/network/hub.rs | 105 +++++++++++++++ consensus/src/{network.rs => network/mod.rs} | 130 +------------------ 3 files changed, 113 insertions(+), 129 deletions(-) create mode 100644 consensus/src/network/hub.rs rename consensus/src/{network.rs => network/mod.rs} (68%) diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 706c88ec..a7f6ebdf 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -1,7 +1,7 @@ use crate::{ handle_task_termination, member::Task::{CoordRequest, ParentsRequest, RequestNewest, UnitBroadcast}, - network, + network::{Hub as NetworkHub, NetworkData}, runway::{ self, NetworkIO, NewestUnitResponse, Request, Response, RunwayIO, RunwayNotificationIn, RunwayNotificationOut, @@ -17,7 +17,6 @@ use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamEx use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; -use network::NetworkData; use rand::{prelude::SliceRandom, Rng}; use std::{ collections::HashSet, @@ -659,14 +658,14 @@ pub async fn run_session< let network_handle = spawn_handle .spawn_essential("member/network", async move { - network::run( + NetworkHub::new( network, unit_messages_from_units, unit_messages_for_units, alert_messages_from_alerter, alert_messages_for_alerter, - network_terminator, ) + .run(network_terminator) .await }) .fuse(); diff --git a/consensus/src/network/hub.rs b/consensus/src/network/hub.rs new file mode 100644 index 00000000..764489d5 --- /dev/null +++ b/consensus/src/network/hub.rs @@ -0,0 +1,105 @@ +use crate::{ + alerts::AlertMessage, + member::UnitMessage, + network::{NetworkData, NetworkDataInner}, + Data, Hasher, Network, PartialMultisignature, Receiver, Recipient, Sender, Signature, + Terminator, +}; +use futures::{FutureExt, StreamExt}; +use log::{debug, error, warn}; + +pub struct Hub< + H: Hasher, + D: Data, + S: Signature, + MS: PartialMultisignature, + N: Network>, +> { + network: N, + units_to_send: Receiver<(UnitMessage, Recipient)>, + units_received: Sender>, + alerts_to_send: Receiver<(AlertMessage, Recipient)>, + alerts_received: Sender>, +} + +impl< + H: Hasher, + D: Data, + S: Signature, + MS: PartialMultisignature, + N: Network>, + > Hub +{ + pub fn new( + network: N, + units_to_send: Receiver<(UnitMessage, Recipient)>, + units_received: Sender>, + alerts_to_send: Receiver<(AlertMessage, Recipient)>, + alerts_received: Sender>, + ) -> Self { + Hub { + network, + units_to_send, + units_received, + alerts_to_send, + alerts_received, + } + } + + fn send(&self, data: NetworkData, recipient: Recipient) { + self.network.send(data, recipient); + } + + fn handle_incoming(&self, network_data: NetworkData) { + let NetworkData(network_data) = network_data; + use NetworkDataInner::*; + match network_data { + Units(unit_message) => { + if let Err(e) = self.units_received.unbounded_send(unit_message) { + warn!(target: "AlephBFT-network-hub", "Error when sending units to consensus {:?}", e); + } + } + + Alert(alert_message) => { + if let Err(e) = self.alerts_received.unbounded_send(alert_message) { + warn!(target: "AlephBFT-network-hub", "Error when sending alerts to consensus {:?}", e); + } + } + } + } + + pub async fn run(mut self, mut terminator: Terminator) { + loop { + use NetworkDataInner::*; + futures::select! { + unit_message = self.units_to_send.next() => match unit_message { + Some((unit_message, recipient)) => self.send(NetworkData(Units(unit_message)), recipient), + None => { + error!(target: "AlephBFT-network-hub", "Outgoing units stream closed."); + break; + } + }, + alert_message = self.alerts_to_send.next() => match alert_message { + Some((alert_message, recipient)) => self.send(NetworkData(Alert(alert_message)), recipient), + None => { + error!(target: "AlephBFT-network-hub", "Outgoing alerts stream closed."); + break; + } + }, + incoming_message = self.network.next_event().fuse() => match incoming_message { + Some(incoming_message) => self.handle_incoming(incoming_message), + None => { + error!(target: "AlephBFT-network-hub", "Network stopped working."); + break; + } + }, + _ = terminator.get_exit().fuse() => { + terminator.terminate_sync().await; + break; + } + } + } + + debug!(target: "AlephBFT-network-hub", "Network ended."); + } +} diff --git a/consensus/src/network.rs b/consensus/src/network/mod.rs similarity index 68% rename from consensus/src/network.rs rename to consensus/src/network/mod.rs index 8091e37a..1093e8f4 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network/mod.rs @@ -1,12 +1,13 @@ use crate::{ - alerts::AlertMessage, member::UnitMessage, Data, Hasher, Network, PartialMultisignature, - Receiver, Recipient, Sender, Signature, Terminator, + alerts::AlertMessage, member::UnitMessage, Data, Hasher, PartialMultisignature, Signature, }; use codec::{Decode, Encode}; -use futures::{FutureExt, StreamExt}; -use log::{debug, error, warn}; use std::fmt::Debug; +mod hub; + +pub use hub::Hub; + #[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)] pub(crate) enum NetworkDataInner { Units(UnitMessage), @@ -38,127 +39,6 @@ impl NetworkData>, -> { - network: N, - units_to_send: Receiver<(UnitMessage, Recipient)>, - units_received: Sender>, - alerts_to_send: Receiver<(AlertMessage, Recipient)>, - alerts_received: Sender>, -} - -impl< - H: Hasher, - D: Data, - S: Signature, - MS: PartialMultisignature, - N: Network>, - > NetworkHub -{ - fn new( - network: N, - units_to_send: Receiver<(UnitMessage, Recipient)>, - units_received: Sender>, - alerts_to_send: Receiver<(AlertMessage, Recipient)>, - alerts_received: Sender>, - ) -> Self { - NetworkHub { - network, - units_to_send, - units_received, - alerts_to_send, - alerts_received, - } - } - - fn send(&self, data: NetworkData, recipient: Recipient) { - self.network.send(data, recipient); - } - - fn handle_incoming(&self, network_data: NetworkData) { - let NetworkData(network_data) = network_data; - use NetworkDataInner::*; - match network_data { - Units(unit_message) => { - if let Err(e) = self.units_received.unbounded_send(unit_message) { - warn!(target: "AlephBFT-network-hub", "Error when sending units to consensus {:?}", e); - } - } - - Alert(alert_message) => { - if let Err(e) = self.alerts_received.unbounded_send(alert_message) { - warn!(target: "AlephBFT-network-hub", "Error when sending alerts to consensus {:?}", e); - } - } - } - } - - async fn run(mut self, mut terminator: Terminator) { - loop { - use NetworkDataInner::*; - futures::select! { - unit_message = self.units_to_send.next() => match unit_message { - Some((unit_message, recipient)) => self.send(NetworkData(Units(unit_message)), recipient), - None => { - error!(target: "AlephBFT-network-hub", "Outgoing units stream closed."); - break; - } - }, - alert_message = self.alerts_to_send.next() => match alert_message { - Some((alert_message, recipient)) => self.send(NetworkData(Alert(alert_message)), recipient), - None => { - error!(target: "AlephBFT-network-hub", "Outgoing alerts stream closed."); - break; - } - }, - incoming_message = self.network.next_event().fuse() => match incoming_message { - Some(incoming_message) => self.handle_incoming(incoming_message), - None => { - error!(target: "AlephBFT-network-hub", "Network stopped working."); - break; - } - }, - _ = terminator.get_exit().fuse() => { - terminator.terminate_sync().await; - break; - } - } - } - - debug!(target: "AlephBFT-network-hub", "Network ended."); - } -} - -pub(crate) async fn run< - H: Hasher, - D: Data, - S: Signature, - MS: PartialMultisignature, - N: Network>, ->( - network: N, - units_to_send: Receiver<(UnitMessage, Recipient)>, - units_received: Sender>, - alerts_to_send: Receiver<(AlertMessage, Recipient)>, - alerts_received: Sender>, - terminator: Terminator, -) { - NetworkHub::new( - network, - units_to_send, - units_received, - alerts_to_send, - alerts_received, - ) - .run(terminator) - .await -} - #[cfg(test)] mod tests { use crate::{