Skip to content

Commit

Permalink
Tiny preparations for dismemberment
Browse files Browse the repository at this point in the history
  • Loading branch information
timorleph committed Nov 6, 2024
1 parent d843e8f commit 7ec8944
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 129 deletions.
7 changes: 3 additions & 4 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
handle_task_termination,
member::Task::{CoordRequest, ParentsRequest, RequestNewest, UnitBroadcast},
network,
network::{Hub as NetworkHub, NetworkData},
runway::{
self, NetworkIO, NewestUnitResponse, Request, Response, RunwayIO, RunwayNotificationIn,
RunwayNotificationOut,
Expand All @@ -17,7 +17,6 @@ use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamEx
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use network::NetworkData;
use rand::{prelude::SliceRandom, Rng};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -659,14 +658,14 @@ pub async fn run_session<

let network_handle = spawn_handle
.spawn_essential("member/network", async move {
network::run(
NetworkHub::new(
network,
unit_messages_from_units,
unit_messages_for_units,
alert_messages_from_alerter,
alert_messages_for_alerter,
network_terminator,
)
.run(network_terminator)
.await
})
.fuse();
Expand Down
105 changes: 105 additions & 0 deletions consensus/src/network/hub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use crate::{
alerts::AlertMessage,
member::UnitMessage,
network::{NetworkData, NetworkDataInner},
Data, Hasher, Network, PartialMultisignature, Receiver, Recipient, Sender, Signature,
Terminator,
};
use futures::{FutureExt, StreamExt};
use log::{debug, error, warn};

pub struct Hub<
H: Hasher,
D: Data,
S: Signature,
MS: PartialMultisignature,
N: Network<NetworkData<H, D, S, MS>>,
> {
network: N,
units_to_send: Receiver<(UnitMessage<H, D, S>, Recipient)>,
units_received: Sender<UnitMessage<H, D, S>>,
alerts_to_send: Receiver<(AlertMessage<H, D, S, MS>, Recipient)>,
alerts_received: Sender<AlertMessage<H, D, S, MS>>,
}

impl<
H: Hasher,
D: Data,
S: Signature,
MS: PartialMultisignature,
N: Network<NetworkData<H, D, S, MS>>,
> Hub<H, D, S, MS, N>
{
pub fn new(
network: N,
units_to_send: Receiver<(UnitMessage<H, D, S>, Recipient)>,
units_received: Sender<UnitMessage<H, D, S>>,
alerts_to_send: Receiver<(AlertMessage<H, D, S, MS>, Recipient)>,
alerts_received: Sender<AlertMessage<H, D, S, MS>>,
) -> Self {
Hub {
network,
units_to_send,
units_received,
alerts_to_send,
alerts_received,
}
}

fn send(&self, data: NetworkData<H, D, S, MS>, recipient: Recipient) {
self.network.send(data, recipient);
}

fn handle_incoming(&self, network_data: NetworkData<H, D, S, MS>) {
let NetworkData(network_data) = network_data;
use NetworkDataInner::*;
match network_data {
Units(unit_message) => {
if let Err(e) = self.units_received.unbounded_send(unit_message) {
warn!(target: "AlephBFT-network-hub", "Error when sending units to consensus {:?}", e);
}
}

Alert(alert_message) => {
if let Err(e) = self.alerts_received.unbounded_send(alert_message) {
warn!(target: "AlephBFT-network-hub", "Error when sending alerts to consensus {:?}", e);
}
}
}
}

pub async fn run(mut self, mut terminator: Terminator) {
loop {
use NetworkDataInner::*;
futures::select! {
unit_message = self.units_to_send.next() => match unit_message {
Some((unit_message, recipient)) => self.send(NetworkData(Units(unit_message)), recipient),
None => {
error!(target: "AlephBFT-network-hub", "Outgoing units stream closed.");
break;
}
},
alert_message = self.alerts_to_send.next() => match alert_message {
Some((alert_message, recipient)) => self.send(NetworkData(Alert(alert_message)), recipient),
None => {
error!(target: "AlephBFT-network-hub", "Outgoing alerts stream closed.");
break;
}
},
incoming_message = self.network.next_event().fuse() => match incoming_message {
Some(incoming_message) => self.handle_incoming(incoming_message),
None => {
error!(target: "AlephBFT-network-hub", "Network stopped working.");
break;
}
},
_ = terminator.get_exit().fuse() => {
terminator.terminate_sync().await;
break;
}
}
}

debug!(target: "AlephBFT-network-hub", "Network ended.");
}
}
130 changes: 5 additions & 125 deletions consensus/src/network.rs → consensus/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{
alerts::AlertMessage, member::UnitMessage, Data, Hasher, Network, PartialMultisignature,
Receiver, Recipient, Sender, Signature, Terminator,
alerts::AlertMessage, member::UnitMessage, Data, Hasher, PartialMultisignature, Signature,
};
use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use log::{debug, error, warn};
use std::fmt::Debug;

mod hub;

pub use hub::Hub;

#[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)]
pub(crate) enum NetworkDataInner<H: Hasher, D: Data, S: Signature, MS: PartialMultisignature> {
Units(UnitMessage<H, D, S>),
Expand Down Expand Up @@ -38,127 +39,6 @@ impl<H: Hasher, D: Data, S: Signature, MS: PartialMultisignature> NetworkData<H,
}
}

struct NetworkHub<
H: Hasher,
D: Data,
S: Signature,
MS: PartialMultisignature,
N: Network<NetworkData<H, D, S, MS>>,
> {
network: N,
units_to_send: Receiver<(UnitMessage<H, D, S>, Recipient)>,
units_received: Sender<UnitMessage<H, D, S>>,
alerts_to_send: Receiver<(AlertMessage<H, D, S, MS>, Recipient)>,
alerts_received: Sender<AlertMessage<H, D, S, MS>>,
}

impl<
H: Hasher,
D: Data,
S: Signature,
MS: PartialMultisignature,
N: Network<NetworkData<H, D, S, MS>>,
> NetworkHub<H, D, S, MS, N>
{
fn new(
network: N,
units_to_send: Receiver<(UnitMessage<H, D, S>, Recipient)>,
units_received: Sender<UnitMessage<H, D, S>>,
alerts_to_send: Receiver<(AlertMessage<H, D, S, MS>, Recipient)>,
alerts_received: Sender<AlertMessage<H, D, S, MS>>,
) -> Self {
NetworkHub {
network,
units_to_send,
units_received,
alerts_to_send,
alerts_received,
}
}

fn send(&self, data: NetworkData<H, D, S, MS>, recipient: Recipient) {
self.network.send(data, recipient);
}

fn handle_incoming(&self, network_data: NetworkData<H, D, S, MS>) {
let NetworkData(network_data) = network_data;
use NetworkDataInner::*;
match network_data {
Units(unit_message) => {
if let Err(e) = self.units_received.unbounded_send(unit_message) {
warn!(target: "AlephBFT-network-hub", "Error when sending units to consensus {:?}", e);
}
}

Alert(alert_message) => {
if let Err(e) = self.alerts_received.unbounded_send(alert_message) {
warn!(target: "AlephBFT-network-hub", "Error when sending alerts to consensus {:?}", e);
}
}
}
}

async fn run(mut self, mut terminator: Terminator) {
loop {
use NetworkDataInner::*;
futures::select! {
unit_message = self.units_to_send.next() => match unit_message {
Some((unit_message, recipient)) => self.send(NetworkData(Units(unit_message)), recipient),
None => {
error!(target: "AlephBFT-network-hub", "Outgoing units stream closed.");
break;
}
},
alert_message = self.alerts_to_send.next() => match alert_message {
Some((alert_message, recipient)) => self.send(NetworkData(Alert(alert_message)), recipient),
None => {
error!(target: "AlephBFT-network-hub", "Outgoing alerts stream closed.");
break;
}
},
incoming_message = self.network.next_event().fuse() => match incoming_message {
Some(incoming_message) => self.handle_incoming(incoming_message),
None => {
error!(target: "AlephBFT-network-hub", "Network stopped working.");
break;
}
},
_ = terminator.get_exit().fuse() => {
terminator.terminate_sync().await;
break;
}
}
}

debug!(target: "AlephBFT-network-hub", "Network ended.");
}
}

pub(crate) async fn run<
H: Hasher,
D: Data,
S: Signature,
MS: PartialMultisignature,
N: Network<NetworkData<H, D, S, MS>>,
>(
network: N,
units_to_send: Receiver<(UnitMessage<H, D, S>, Recipient)>,
units_received: Sender<UnitMessage<H, D, S>>,
alerts_to_send: Receiver<(AlertMessage<H, D, S, MS>, Recipient)>,
alerts_received: Sender<AlertMessage<H, D, S, MS>>,
terminator: Terminator,
) {
NetworkHub::new(
network,
units_to_send,
units_received,
alerts_to_send,
alerts_received,
)
.run(terminator)
.await
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down

0 comments on commit 7ec8944

Please sign in to comment.