diff --git a/consensus/src/backup/loader.rs b/consensus/src/backup/loader.rs index c7b960b6..2f912bc1 100644 --- a/consensus/src/backup/loader.rs +++ b/consensus/src/backup/loader.rs @@ -1,5 +1,6 @@ use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData}; +use async_trait::async_trait; use codec::{Decode, Error as CodecError}; use futures::channel::oneshot; use log::{error, info, warn}; @@ -63,14 +64,28 @@ impl From for LoaderError { } } -pub struct BackupLoader { +#[async_trait] +pub trait BackupReader { + async fn read(&mut self) -> std::io::Result>; +} + +#[async_trait] +impl BackupReader for R { + async fn read(&mut self) -> std::io::Result> { + let mut buf = Vec::new(); + self.read_to_end(&mut buf)?; + Ok(buf) + } +} + +pub struct BackupLoader { backup: R, index: NodeIndex, session_id: SessionId, _phantom: PhantomData<(H, D, S)>, } -impl BackupLoader { +impl BackupLoader { pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader { BackupLoader { backup, @@ -80,9 +95,8 @@ impl BackupLoader { } } - fn load(&mut self) -> Result>, LoaderError> { - let mut buf = Vec::new(); - self.backup.read_to_end(&mut buf)?; + async fn load(&mut self) -> Result>, LoaderError> { + let buf = self.backup.read().await?; let input = &mut &buf[..]; let mut result = Vec::new(); while !input.is_empty() { @@ -163,7 +177,7 @@ impl BackupLoader { starting_round: oneshot::Sender>, next_round_collection: oneshot::Receiver, ) { - let units = match self.load() { + let units = match self.load().await { Ok(items) => items, Err(e) => { error!(target: LOG_TARGET, "unable to load backup data: {}", e); diff --git a/consensus/src/backup/mod.rs b/consensus/src/backup/mod.rs index d1807330..42a865fd 100644 --- a/consensus/src/backup/mod.rs +++ b/consensus/src/backup/mod.rs @@ -1,5 +1,7 @@ pub use loader::BackupLoader; +pub use loader::BackupReader; pub use saver::BackupSaver; +pub use saver::BackupWriter; mod loader; mod saver; diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index c4657f74..124d1c1c 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -1,22 +1,36 @@ use std::io::Write; use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator}; +use async_trait::async_trait; use codec::Encode; use futures::{FutureExt, StreamExt}; use log::{debug, error}; const LOG_TARGET: &str = "AlephBFT-backup-saver"; +#[async_trait] +pub trait BackupWriter { + async fn append(&mut self, data: &[u8]) -> std::io::Result<()>; +} + +#[async_trait] +impl BackupWriter for W { + async fn append(&mut self, data: &[u8]) -> std::io::Result<()> { + Write::write_all(self, data)?; + self.flush() + } +} + /// Component responsible for saving units into backup. /// It waits for items to appear on its receivers, and writes them to backup. /// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { +pub struct BackupSaver { units_from_runway: Receiver>, responses_for_runway: Sender>, backup: W, } -impl BackupSaver { +impl BackupSaver { pub fn new( units_from_runway: Receiver>, responses_for_runway: Sender>, @@ -29,10 +43,11 @@ impl BackupSaver { } } - pub fn save_item(&mut self, item: &UncheckedSignedUnit) -> Result<(), std::io::Error> { - self.backup.write_all(&item.encode())?; - self.backup.flush()?; - Ok(()) + pub async fn save_item( + &mut self, + item: &UncheckedSignedUnit, + ) -> Result<(), std::io::Error> { + self.backup.append(&item.encode()).await } pub async fn run(&mut self, mut terminator: Terminator) { @@ -47,7 +62,7 @@ impl BackupSaver { break; }, }; - if let Err(e) = self.save_item(&item) { + if let Err(e) = self.save_item(&item).await { error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); break; } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 8958d441..3e76062d 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -25,6 +25,7 @@ pub use aleph_bft_types::{ PartialMultisignature, PartiallyMultisigned, Recipient, Round, SessionId, Signable, Signature, SignatureError, SignatureSet, Signed, SpawnHandle, TaskHandle, UncheckedSigned, }; +pub use backup::{BackupReader, BackupWriter}; pub use config::{ create_config, default_config, default_delay_config, exponential_slowdown, Config, DelayConfig, }; diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 196356a8..b516df71 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -8,8 +8,9 @@ use crate::{ }, task_queue::TaskQueue, units::{UncheckedSignedUnit, UnitCoord}, - Config, Data, DataProvider, FinalizationHandler, Hasher, MultiKeychain, Network, NodeIndex, - Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned, + BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher, + MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle, + Terminator, UncheckedSigned, }; use aleph_bft_types::NodeMap; use codec::{Decode, Encode}; @@ -108,7 +109,13 @@ enum TaskDetails { } #[derive(Clone)] -pub struct LocalIO, FH: FinalizationHandler, US: Write, UL: Read> { +pub struct LocalIO< + D: Data, + DP: DataProvider, + FH: FinalizationHandler, + US: BackupWriter, + UL: BackupReader, +> { data_provider: DP, finalization_handler: FH, unit_saver: US, @@ -116,8 +123,13 @@ pub struct LocalIO, FH: FinalizationHandler, US: _phantom: PhantomData, } -impl, FH: FinalizationHandler, US: Write, UL: Read> - LocalIO +impl< + D: Data, + DP: DataProvider, + FH: FinalizationHandler, + US: BackupWriter, + UL: BackupReader, + > LocalIO { pub fn new( data_provider: DP, @@ -573,8 +585,8 @@ pub async fn run_session< D: Data, DP: DataProvider, FH: FinalizationHandler, - US: Write + Send + Sync + 'static, - UL: Read + Send + Sync + 'static, + US: BackupWriter + Send + Sync + 'static, + UL: BackupReader + Send + Sync + 'static, N: Network> + 'static, SH: SpawnHandle, MK: MultiKeychain, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 19eed02d..2a0cc911 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -6,9 +6,9 @@ use crate::{ ControlHash, PreUnit, SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, Validator, }, - Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain, - NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, Signed, SpawnHandle, - Terminator, UncheckedSigned, + BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher, Index, + Keychain, MultiKeychain, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, + Signed, SpawnHandle, Terminator, UncheckedSigned, }; use aleph_bft_types::Recipient; use futures::{ @@ -871,8 +871,8 @@ pub struct RunwayIO< H: Hasher, D: Data, MK: MultiKeychain, - W: Write + Send + Sync + 'static, - R: Read + Send + Sync + 'static, + W: BackupWriter + Send + Sync + 'static, + R: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > { @@ -887,8 +887,8 @@ impl< H: Hasher, D: Data, MK: MultiKeychain, - W: Write + Send + Sync + 'static, - R: Read + Send + Sync + 'static, + W: BackupWriter + Send + Sync + 'static, + R: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > RunwayIO @@ -919,8 +919,8 @@ pub(crate) async fn run( ) where H: Hasher, D: Data, - US: Write + Send + Sync + 'static, - UL: Read + Send + Sync + 'static, + US: BackupWriter + Send + Sync + 'static, + UL: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, MK: MultiKeychain,