Skip to content

Commit

Permalink
feat: create BackupWriter and BackupReader traits for async unit backup
Browse files Browse the repository at this point in the history
  • Loading branch information
joschisan committed May 1, 2024
1 parent 416d27c commit 083a0a0
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 63 deletions.
15 changes: 7 additions & 8 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::{
collections::HashSet,
fmt::{self, Debug},
marker::PhantomData,
pin::Pin,
};
use aleph_bft_types::BackupReader;

use codec::{Decode, Error as CodecError};
use futures::{channel::oneshot, AsyncRead, AsyncReadExt};
use futures::{channel::oneshot};
use log::{error, info, warn};

use crate::{
Expand Down Expand Up @@ -68,26 +68,25 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: AsyncRead> {
backup: Pin<Box<R>>,
pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: BackupReader> {
backup: R,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: AsyncRead> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: BackupReader> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup: Box::pin(backup),
backup,
index,
session_id,
_phantom: PhantomData,
}
}

async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf).await?;
let buf = self.backup.read().await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down
16 changes: 7 additions & 9 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
use std::pin::Pin;

use crate::{
dag::DagUnit,
units::{UncheckedSignedUnit, WrappedUnit},
Data, Hasher, MultiKeychain, Receiver, Sender, Terminator,
};
use codec::Encode;
use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
use futures::{ FutureExt, StreamExt};
use log::{debug, error};
use aleph_bft_types::BackupWriter;

const LOG_TARGET: &str = "AlephBFT-backup-saver";

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> {
pub struct BackupSaver<H: Hasher, D: Data, MK: MultiKeychain, W: BackupWriter> {
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
backup: Pin<Box<W>>,
backup: W,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK, W> {
impl<H: Hasher, D: Data, MK: MultiKeychain, W: BackupWriter> BackupSaver<H, D, MK, W> {
pub fn new(
units_from_runway: Receiver<DagUnit<H, D, MK>>,
responses_for_runway: Sender<DagUnit<H, D, MK>>,
Expand All @@ -29,14 +28,13 @@ impl<H: Hasher, D: Data, MK: MultiKeychain, W: AsyncWrite> BackupSaver<H, D, MK,
BackupSaver {
units_from_runway,
responses_for_runway,
backup: Box::pin(backup),
backup,
}
}

pub async fn save_unit(&mut self, unit: &DagUnit<H, D, MK>) -> Result<(), std::io::Error> {
let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into();
self.backup.write_all(&unit.encode()).await?;
self.backup.flush().await
self.backup.append(&unit.encode()).await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand Down
15 changes: 9 additions & 6 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,22 @@ use crate::{
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use network::NetworkData;
use rand::{prelude::SliceRandom, Rng};
use aleph_bft_types::BackupWriter;
use std::{
collections::HashSet,
convert::TryInto,
fmt::{self, Debug},
marker::PhantomData,
time::Duration,
};
use aleph_bft_types::BackupReader;


/// A message concerning units, either about new units or some requests for them.
#[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)]
Expand Down Expand Up @@ -111,8 +114,8 @@ pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite,
UL: AsyncRead,
US: BackupWriter,
UL: BackupReader,
> {
data_provider: DP,
finalization_handler: FH,
Expand All @@ -121,7 +124,7 @@ pub struct LocalIO<
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead>
impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: BackupWriter, UL: BackupReader>
LocalIO<D, DP, FH, US, UL>
{
pub fn new(
Expand Down Expand Up @@ -578,8 +581,8 @@ pub async fn run_session<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
Expand Down
16 changes: 9 additions & 7 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ use crate::{
UncheckedSigned,
};
use aleph_bft_types::Recipient;
use aleph_bft_types::BackupReader;
use futures::{
channel::{mpsc, oneshot},
future::pending,
pin_mut, AsyncRead, AsyncWrite, Future, FutureExt, StreamExt,
pin_mut, Future, FutureExt, StreamExt,
};
use aleph_bft_types::BackupWriter;
use futures_timer::Delay;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -667,8 +669,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -683,8 +685,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
W: BackupWriter + Send + Sync + 'static,
R: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down Expand Up @@ -715,8 +717,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
) where
H: Hasher,
D: Data,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
US: BackupWriter + Send + Sync + 'static,
UL: BackupReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
MK: MultiKeychain,
Expand Down
28 changes: 14 additions & 14 deletions examples/ordering/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::io::Write;
mod dataio;
mod network;

use std::io::Write;
use std::io;

use aleph_bft::{run_session, NodeIndex, Terminator};
use aleph_bft_mock::{Keychain, Spawner};
use clap::Parser;
use dataio::{Data, DataProvider, FinalizationHandler};
use futures::{channel::oneshot, io, StreamExt};
use futures::{channel::oneshot, StreamExt};
use log::{debug, error, info};
use network::Network;
use std::{collections::HashMap, path::Path, time::Duration};
use time::{macros::format_description, OffsetDateTime};
use tokio::fs::{self, File};
use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
use std::fs::{self, File};


/// Example node producing linear order.
#[derive(Parser, Debug)]
Expand Down Expand Up @@ -43,23 +45,23 @@ struct Args {
crash: bool,
}

async fn create_backup(
fn create_backup(
node_id: NodeIndex,
) -> Result<(Compat<File>, io::Cursor<Vec<u8>>), io::Error> {
) -> Result<(File, io::Cursor<Vec<u8>>), io::Error> {
let stash_path = Path::new("./aleph-bft-examples-ordering-backup");
fs::create_dir_all(stash_path).await?;
fs::create_dir_all(stash_path)?;
let file_path = stash_path.join(format!("{}.units", node_id.0));
let loader = if file_path.exists() {
io::Cursor::new(fs::read(&file_path).await?)
io::Cursor::new(fs::read(&file_path)?)
} else {
io::Cursor::new(Vec::new())
};
let saver = fs::OpenOptions::new()
.create(true)
.append(true)
.open(file_path)
.await?;
Ok((saver.compat_write(), loader))
.open(file_path)?;

Ok((saver, loader))
}

fn finalized_counts(cf: &HashMap<NodeIndex, u32>) -> Vec<u32> {
Expand Down Expand Up @@ -110,9 +112,7 @@ async fn main() {
let n_members = ports.len().into();
let data_provider = DataProvider::new(id, n_starting, n_data - n_starting, stalled);
let (finalization_handler, mut finalized_rx) = FinalizationHandler::new();
let (backup_saver, backup_loader) = create_backup(id)
.await
.expect("Error setting up unit saving");
let (backup_saver, backup_loader) = create_backup(id).expect("Error setting up unit saving");
let local_io = aleph_bft::LocalIO::new(
data_provider,
finalization_handler,
Expand Down
28 changes: 9 additions & 19 deletions mock/src/dataio.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use aleph_bft_types::{DataProvider as DataProviderT, FinalizationHandler as FinalizationHandlerT};
use async_trait::async_trait;
use codec::{Decode, Encode};
use futures::{channel::mpsc::unbounded, future::pending, AsyncWrite};
use futures::{channel::mpsc::unbounded, future::pending};
use log::error;
use parking_lot::Mutex;
use std::{
io::{self},
pin::Pin,
sync::Arc,
task::{self, Poll},
};
use std::io::Write;


type Receiver<T> = futures::channel::mpsc::UnboundedReceiver<T>;
type Sender<T> = futures::channel::mpsc::UnboundedSender<T>;
Expand Down Expand Up @@ -101,22 +100,13 @@ impl Saver {
}
}

impl AsyncWrite for Saver {
fn poll_write(
self: Pin<&mut Self>,
_: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
impl Write for Saver {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.data.lock().extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
Ok(buf.len())
}

fn poll_close(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn flush(&mut self) -> Result<(), std::io::Error> {
Ok(())
}
}

Expand All @@ -126,4 +116,4 @@ impl From<Arc<Mutex<Vec<u8>>>> for Saver {
}
}

pub type Loader = futures::io::Cursor<Vec<u8>>;
pub type Loader = std::io::Cursor<Vec<u8>>;
34 changes: 34 additions & 0 deletions types/src/backup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use async_trait::async_trait;
use std::io::{Read, Write};


#[async_trait]
/// Write backups to peristent storage.
pub trait BackupWriter {
/// Append new data to the backup.
async fn append(&mut self, data: &[u8]) -> std::io::Result<()>;
}

#[async_trait]
impl<W: Write + Send> BackupWriter for W {
async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
self.write_all(data)?;
self.flush()
}
}


#[async_trait]
pub trait BackupReader {
/// Read the entire backup.
async fn read(&mut self) -> std::io::Result<Vec<u8>>;
}

#[async_trait]
impl<R: Read + Send> BackupReader for R {
async fn read(&mut self) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
self.read_to_end(&mut buf)?;
Ok(buf)
}
}
2 changes: 2 additions & 0 deletions types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod dataio;
mod network;
mod tasks;
mod backup;

pub use aleph_bft_crypto::{
IncompleteMultisignatureError, Index, Indexed, Keychain, MultiKeychain, Multisigned, NodeCount,
Expand All @@ -12,6 +13,7 @@ pub use aleph_bft_crypto::{
pub use dataio::{DataProvider, FinalizationHandler};
pub use network::{Network, Recipient};
pub use tasks::{SpawnHandle, TaskHandle};
pub use backup::{BackupReader, BackupWriter};

use codec::Codec;
use std::{fmt::Debug, hash::Hash as StdHash};
Expand Down

0 comments on commit 083a0a0

Please sign in to comment.