From ecf12e99eab90dae6c788979b40bff9796ce1d6a Mon Sep 17 00:00:00 2001 From: joschisan <122358257+joschisan@users.noreply.github.com> Date: Wed, 1 May 2024 14:16:13 +0200 Subject: [PATCH] feat: switch to BackupWriter and BackupReader --- consensus/src/backup/loader.rs | 15 +++++++-------- consensus/src/backup/saver.rs | 16 +++++++--------- consensus/src/member.rs | 15 +++++++++------ consensus/src/runway/mod.rs | 16 +++++++++------- examples/ordering/src/main.rs | 28 ++++++++++++++-------------- mock/src/dataio.rs | 28 +++++++++------------------- 6 files changed, 55 insertions(+), 63 deletions(-) diff --git a/consensus/src/backup/loader.rs b/consensus/src/backup/loader.rs index 97fefb7b..c263e963 100644 --- a/consensus/src/backup/loader.rs +++ b/consensus/src/backup/loader.rs @@ -2,11 +2,11 @@ use std::{ collections::HashSet, fmt::{self, Debug}, marker::PhantomData, - pin::Pin, }; +use aleph_bft_types::BackupReader; use codec::{Decode, Error as CodecError}; -use futures::{channel::oneshot, AsyncRead, AsyncReadExt}; +use futures::{channel::oneshot}; use log::{error, info, warn}; use crate::{ @@ -68,17 +68,17 @@ impl From for LoaderError { } } -pub struct BackupLoader { - backup: Pin>, +pub struct BackupLoader { + backup: R, index: NodeIndex, session_id: SessionId, _phantom: PhantomData<(H, D, S)>, } -impl BackupLoader { +impl BackupLoader { pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader { BackupLoader { - backup: Box::pin(backup), + backup, index, session_id, _phantom: PhantomData, @@ -86,8 +86,7 @@ impl BackupLoader { } async fn load(&mut self) -> Result>, LoaderError> { - let mut buf = Vec::new(); - self.backup.read_to_end(&mut buf).await?; + let buf = self.backup.read().await?; let input = &mut &buf[..]; let mut result = Vec::new(); while !input.is_empty() { diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index 252c0f9b..19cbcbe1 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -1,26 +1,25 @@ -use std::pin::Pin; - use crate::{ dag::DagUnit, units::{UncheckedSignedUnit, WrappedUnit}, Data, Hasher, MultiKeychain, Receiver, Sender, Terminator, }; use codec::Encode; -use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{ FutureExt, StreamExt}; use log::{debug, error}; +use aleph_bft_types::BackupWriter; const LOG_TARGET: &str = "AlephBFT-backup-saver"; /// Component responsible for saving units into backup. /// It waits for items to appear on its receivers, and writes them to backup. /// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { +pub struct BackupSaver { units_from_runway: Receiver>, responses_for_runway: Sender>, - backup: Pin>, + backup: W, } -impl BackupSaver { +impl BackupSaver { pub fn new( units_from_runway: Receiver>, responses_for_runway: Sender>, @@ -29,14 +28,13 @@ impl BackupSaver) -> Result<(), std::io::Error> { let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into(); - self.backup.write_all(&unit.encode()).await?; - self.backup.flush().await + self.backup.append(&unit.encode()).await } pub async fn run(&mut self, mut terminator: Terminator) { diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 73fe67fb..cf2ab5f3 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -13,12 +13,13 @@ use crate::{ }; use aleph_bft_types::NodeMap; use codec::{Decode, Encode}; -use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt}; +use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use network::NetworkData; use rand::{prelude::SliceRandom, Rng}; +use aleph_bft_types::BackupWriter; use std::{ collections::HashSet, convert::TryInto, @@ -26,6 +27,8 @@ use std::{ marker::PhantomData, time::Duration, }; +use aleph_bft_types::BackupReader; + /// A message concerning units, either about new units or some requests for them. #[derive(Clone, Eq, PartialEq, Debug, Decode, Encode)] @@ -111,8 +114,8 @@ pub struct LocalIO< D: Data, DP: DataProvider, FH: FinalizationHandler, - US: AsyncWrite, - UL: AsyncRead, + US: BackupWriter, + UL: BackupReader, > { data_provider: DP, finalization_handler: FH, @@ -121,7 +124,7 @@ pub struct LocalIO< _phantom: PhantomData, } -impl, FH: FinalizationHandler, US: AsyncWrite, UL: AsyncRead> +impl, FH: FinalizationHandler, US: BackupWriter, UL: BackupReader> LocalIO { pub fn new( @@ -578,8 +581,8 @@ pub async fn run_session< D: Data, DP: DataProvider, FH: FinalizationHandler, - US: AsyncWrite + Send + Sync + 'static, - UL: AsyncRead + Send + Sync + 'static, + US: BackupWriter + Send + Sync + 'static, + UL: BackupReader + Send + Sync + 'static, N: Network> + 'static, SH: SpawnHandle, MK: MultiKeychain, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 7dcf0243..7e8d7b62 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -14,11 +14,13 @@ use crate::{ UncheckedSigned, }; use aleph_bft_types::Recipient; +use aleph_bft_types::BackupReader; use futures::{ channel::{mpsc, oneshot}, future::pending, - pin_mut, AsyncRead, AsyncWrite, Future, FutureExt, StreamExt, + pin_mut, Future, FutureExt, StreamExt, }; +use aleph_bft_types::BackupWriter; use futures_timer::Delay; use itertools::Itertools; use log::{debug, error, info, trace, warn}; @@ -667,8 +669,8 @@ pub struct RunwayIO< H: Hasher, D: Data, MK: MultiKeychain, - W: AsyncWrite + Send + Sync + 'static, - R: AsyncRead + Send + Sync + 'static, + W: BackupWriter + Send + Sync + 'static, + R: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > { @@ -683,8 +685,8 @@ impl< H: Hasher, D: Data, MK: MultiKeychain, - W: AsyncWrite + Send + Sync + 'static, - R: AsyncRead + Send + Sync + 'static, + W: BackupWriter + Send + Sync + 'static, + R: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, > RunwayIO @@ -715,8 +717,8 @@ pub(crate) async fn run( ) where H: Hasher, D: Data, - US: AsyncWrite + Send + Sync + 'static, - UL: AsyncRead + Send + Sync + 'static, + US: BackupWriter + Send + Sync + 'static, + UL: BackupReader + Send + Sync + 'static, DP: DataProvider, FH: FinalizationHandler, MK: MultiKeychain, diff --git a/examples/ordering/src/main.rs b/examples/ordering/src/main.rs index 0cab2992..ae771abd 100644 --- a/examples/ordering/src/main.rs +++ b/examples/ordering/src/main.rs @@ -1,18 +1,20 @@ -use std::io::Write; mod dataio; mod network; +use std::io::Write; +use std::io; + use aleph_bft::{run_session, NodeIndex, Terminator}; use aleph_bft_mock::{Keychain, Spawner}; use clap::Parser; use dataio::{Data, DataProvider, FinalizationHandler}; -use futures::{channel::oneshot, io, StreamExt}; +use futures::{channel::oneshot, StreamExt}; use log::{debug, error, info}; use network::Network; use std::{collections::HashMap, path::Path, time::Duration}; use time::{macros::format_description, OffsetDateTime}; -use tokio::fs::{self, File}; -use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt}; +use std::fs::{self, File}; + /// Example node producing linear order. #[derive(Parser, Debug)] @@ -43,23 +45,23 @@ struct Args { crash: bool, } -async fn create_backup( +fn create_backup( node_id: NodeIndex, -) -> Result<(Compat, io::Cursor>), io::Error> { +) -> Result<(File, io::Cursor>), io::Error> { let stash_path = Path::new("./aleph-bft-examples-ordering-backup"); - fs::create_dir_all(stash_path).await?; + fs::create_dir_all(stash_path)?; let file_path = stash_path.join(format!("{}.units", node_id.0)); let loader = if file_path.exists() { - io::Cursor::new(fs::read(&file_path).await?) + io::Cursor::new(fs::read(&file_path)?) } else { io::Cursor::new(Vec::new()) }; let saver = fs::OpenOptions::new() .create(true) .append(true) - .open(file_path) - .await?; - Ok((saver.compat_write(), loader)) + .open(file_path)?; + + Ok((saver, loader)) } fn finalized_counts(cf: &HashMap) -> Vec { @@ -110,9 +112,7 @@ async fn main() { let n_members = ports.len().into(); let data_provider = DataProvider::new(id, n_starting, n_data - n_starting, stalled); let (finalization_handler, mut finalized_rx) = FinalizationHandler::new(); - let (backup_saver, backup_loader) = create_backup(id) - .await - .expect("Error setting up unit saving"); + let (backup_saver, backup_loader) = create_backup(id).expect("Error setting up unit saving"); let local_io = aleph_bft::LocalIO::new( data_provider, finalization_handler, diff --git a/mock/src/dataio.rs b/mock/src/dataio.rs index c7ef0648..f16eac0a 100644 --- a/mock/src/dataio.rs +++ b/mock/src/dataio.rs @@ -1,15 +1,14 @@ use aleph_bft_types::{DataProvider as DataProviderT, FinalizationHandler as FinalizationHandlerT}; use async_trait::async_trait; use codec::{Decode, Encode}; -use futures::{channel::mpsc::unbounded, future::pending, AsyncWrite}; +use futures::{channel::mpsc::unbounded, future::pending}; use log::error; use parking_lot::Mutex; use std::{ - io::{self}, - pin::Pin, sync::Arc, - task::{self, Poll}, }; +use std::io::Write; + type Receiver = futures::channel::mpsc::UnboundedReceiver; type Sender = futures::channel::mpsc::UnboundedSender; @@ -101,22 +100,13 @@ impl Saver { } } -impl AsyncWrite for Saver { - fn poll_write( - self: Pin<&mut Self>, - _: &mut task::Context<'_>, - buf: &[u8], - ) -> Poll> { +impl Write for Saver { + fn write(&mut self, buf: &[u8]) -> Result { self.data.lock().extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + Ok(buf.len()) } - - fn poll_close(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) } } @@ -126,4 +116,4 @@ impl From>>> for Saver { } } -pub type Loader = futures::io::Cursor>; +pub type Loader = std::io::Cursor>;