Skip to content

Commit

Permalink
Allow async implementation for BackupReader and BackupWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
maan2003 committed Jan 19, 2024
1 parent 976c164 commit b3e9564
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 29 deletions.
26 changes: 20 additions & 6 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData};

use async_trait::async_trait;
use codec::{Decode, Error as CodecError};
use futures::channel::oneshot;
use log::{error, info, warn};
Expand Down Expand Up @@ -63,14 +64,28 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: Read> {
#[async_trait]
pub trait BackupReader {
async fn read(&mut self) -> std::io::Result<Vec<u8>>;
}

#[async_trait]
impl<R: Read + Send> BackupReader for R {
async fn read(&mut self) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
self.read_to_end(&mut buf)?;
Ok(buf)
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: BackupReader> {
backup: R,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: BackupReader> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup,
Expand All @@ -80,9 +95,8 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
}
}

fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf)?;
async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let buf = self.backup.read().await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down Expand Up @@ -163,7 +177,7 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
starting_round: oneshot::Sender<Option<Round>>,
next_round_collection: oneshot::Receiver<Round>,
) {
let units = match self.load() {
let units = match self.load().await {
Ok(items) => items,
Err(e) => {
error!(target: LOG_TARGET, "unable to load backup data: {}", e);
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/backup/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub use loader::BackupLoader;
pub use loader::BackupReader;
pub use saver::BackupSaver;
pub use saver::BackupWriter;

mod loader;
mod saver;
29 changes: 22 additions & 7 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
use std::io::Write;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use async_trait::async_trait;
use codec::Encode;
use futures::{FutureExt, StreamExt};
use log::{debug, error};

const LOG_TARGET: &str = "AlephBFT-backup-saver";

#[async_trait]
pub trait BackupWriter {
async fn append(&mut self, data: &[u8]) -> std::io::Result<()>;
}

#[async_trait]
impl<W: Write + Send> BackupWriter for W {
async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
Write::write_all(self, data)?;
self.flush()
}
}

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: Write> {
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: BackupWriter> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
backup: W,
}

impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, S: Signature, W: BackupWriter> BackupSaver<H, D, S, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
Expand All @@ -29,10 +43,11 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
}
}

pub fn save_item(&mut self, item: &UncheckedSignedUnit<H, D, S>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.append(&item.encode()).await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand All @@ -47,7 +62,7 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item) {
if let Err(e) = self.save_item(&item).await {
error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use aleph_bft_types::{
PartialMultisignature, PartiallyMultisigned, Recipient, Round, SessionId, Signable, Signature,
SignatureError, SignatureSet, Signed, SpawnHandle, TaskHandle, UncheckedSigned,
};
pub use backup::{BackupReader, BackupWriter};
pub use config::{
create_config, default_config, default_delay_config, exponential_slowdown, Config, DelayConfig,
};
Expand Down
26 changes: 19 additions & 7 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::{
},
task_queue::TaskQueue,
units::{UncheckedSignedUnit, UnitCoord},
Config, Data, DataProvider, FinalizationHandler, Hasher, MultiKeychain, Network, NodeIndex,
Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher,
MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle,
Terminator, UncheckedSigned,
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
Expand Down Expand Up @@ -108,16 +109,27 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
}

#[derive(Clone)]
pub struct LocalIO<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read> {
pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: BackupWriter,
UL: BackupReader,
> {
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read>
LocalIO<D, DP, FH, US, UL>
impl<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: BackupWriter,
UL: BackupReader,
> LocalIO<D, DP, FH, US, UL>
{
pub fn new(
data_provider: DP,
Expand Down Expand Up @@ -573,8 +585,8 @@ pub async fn run_session<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
Expand Down
18 changes: 9 additions & 9 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::{
ControlHash, PreUnit, SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore,
UnitStoreStatus, Validator,
},
Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain,
NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, Signed, SpawnHandle,
Terminator, UncheckedSigned,
BackupReader, BackupWriter, Config, Data, DataProvider, FinalizationHandler, Hasher, Index,
Keychain, MultiKeychain, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature,
Signed, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::Recipient;
use futures::{
Expand Down Expand Up @@ -871,8 +871,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -887,8 +887,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down Expand Up @@ -919,8 +919,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
) where
H: Hasher,
D: Data,
US: Write + Send + Sync + 'static,
UL: Read + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
MK: MultiKeychain,
Expand Down

0 comments on commit b3e9564

Please sign in to comment.