Skip to content

Commit

Permalink
Handle own blocks in sync
Browse files Browse the repository at this point in the history
  • Loading branch information
timorleph committed Oct 6, 2023
1 parent a683c0b commit f590a10
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 29 deletions.
21 changes: 9 additions & 12 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use aleph_runtime::{self, opaque::Block, RuntimeApi};
use finality_aleph::{
run_validator_node, AlephBlockImport, AlephConfig, BlockImporter, Justification,
JustificationTranslator, MillisecsPerBlock, Protocol, ProtocolNaming, RateLimiterConfig,
SessionPeriod, SubstrateChainStatus, SyncOracle, TimingBlockMetrics, TracingBlockImport,
RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, SyncOracle, TimingBlockMetrics,
TracingBlockImport,
};
use futures::channel::mpsc;
use log::warn;
Expand Down Expand Up @@ -88,7 +89,6 @@ pub fn new_partial(
sc_consensus::DefaultImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
(
TracingBlockImport<Arc<FullClient>>,
mpsc::UnboundedSender<Justification>,
mpsc::UnboundedReceiver<Justification>,
Option<Telemetry>,
Expand Down Expand Up @@ -151,7 +151,7 @@ pub fn new_partial(
.map_err(|e| ServiceError::Other(format!("failed to set up chain status: {e}")))?,
);
let aleph_block_import = AlephBlockImport::new(
tracing_block_import.clone(),
tracing_block_import,
justification_tx.clone(),
justification_translator,
);
Expand Down Expand Up @@ -190,13 +190,7 @@ pub fn new_partial(
keystore_container,
select_chain,
transaction_pool,
other: (
tracing_block_import,
justification_tx,
justification_rx,
telemetry,
metrics,
),
other: (justification_tx, justification_rx, telemetry, metrics),
})
}

Expand Down Expand Up @@ -310,9 +304,11 @@ pub fn new_authority(
keystore_container,
select_chain,
transaction_pool,
other: (block_import, justification_tx, justification_rx, mut telemetry, metrics),
other: (justification_tx, justification_rx, mut telemetry, metrics),
} = new_partial(&config)?;

let (block_import, block_rx) = RedirectingBlockImport::new(client.clone());

let backup_path = backup_path(&aleph_config, config.base_path.path());

let finalized = client.info().finalized_hash;
Expand Down Expand Up @@ -378,7 +374,7 @@ pub fn new_authority(
backoff_authoring_blocks,
keystore: keystore_container.keystore(),
sync_oracle: sync_oracle.clone(),
justification_sync_link: sync_network.clone(),
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
max_block_proposal_slot_portion: None,
telemetry: telemetry.as_ref().map(|x| x.handle()),
Expand Down Expand Up @@ -413,6 +409,7 @@ pub fn new_authority(
spawn_handle: task_manager.spawn_handle().into(),
keystore: keystore_container.keystore(),
justification_rx,
block_rx,
metrics,
registry: prometheus_registry,
unit_creation_delay: aleph_config.unit_creation_delay(),
Expand Down
90 changes: 88 additions & 2 deletions finality-aleph/src/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::{fmt::Debug, time::Instant};
use std::{
error::Error,
fmt::{Debug, Display, Error as FmtError, Formatter},
time::Instant,
};

use futures::channel::mpsc::{TrySendError, UnboundedSender};
use futures::channel::mpsc::{self, TrySendError, UnboundedReceiver, UnboundedSender};
use log::{debug, warn};
use sc_consensus::{
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
Expand Down Expand Up @@ -217,3 +221,85 @@ where
})
}
}

/// A wrapper around a block import that actually sends all the blocks elsewhere through a channel.
/// Very barebones, e.g. does not work with justifications, but sufficient for passing to Aura.
#[derive(Clone)]
pub struct RedirectingBlockImport<I>
where
I: BlockImport<Block> + Clone + Send,
{
inner: I,
blocks_tx: UnboundedSender<Block>,
}

impl<I> RedirectingBlockImport<I>
where
I: BlockImport<Block> + Clone + Send,
{
pub fn new(inner: I) -> (Self, UnboundedReceiver<Block>) {
let (blocks_tx, blocks_rx) = mpsc::unbounded();
(Self { inner, blocks_tx }, blocks_rx)
}
}

/// What can go wrong when redirecting a block import.
#[derive(Debug)]
pub enum RedirectingImportError<E> {
Inner(E),
MissingBody,
ChannelClosed,
}

impl<E: Display> Display for RedirectingImportError<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use RedirectingImportError::*;
match self {
Inner(e) => write!(f, "{}", e),
MissingBody => write!(
f,
"redirecting block import does not support importing blocks without a body"
),
ChannelClosed => write!(f, "channel closed, cannot redirect import"),
}
}
}

impl<E: Display + Debug> Error for RedirectingImportError<E> {}

#[async_trait::async_trait]
impl<I> BlockImport<Block> for RedirectingBlockImport<I>
where
I: BlockImport<Block> + Clone + Send,
{
type Error = RedirectingImportError<I::Error>;
type Transaction = I::Transaction;

async fn check_block(
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner
.check_block(block)
.await
.map_err(RedirectingImportError::Inner)
}

async fn import_block(
&mut self,
block: BlockImportParams<Block, Self::Transaction>,
) -> Result<ImportResult, Self::Error> {
let header = block.post_header();
let BlockImportParams { body, .. } = block;

let extrinsics = body.ok_or(RedirectingImportError::MissingBody)?;

self.blocks_tx
.unbounded_send(Block { header, extrinsics })
.map_err(|_| RedirectingImportError::ChannelClosed)?;

// We claim it was successfully imported and no further action is necessary.
// This is likely inaccurate, but again, should be enough for Aura.
Ok(ImportResult::Imported(Default::default()))
}
}
3 changes: 2 additions & 1 deletion finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod sync_oracle;
pub mod testing;

pub use crate::{
import::{AlephBlockImport, TracingBlockImport},
import::{AlephBlockImport, RedirectingBlockImport, TracingBlockImport},
justification::AlephJustification,
metrics::TimingBlockMetrics,
network::{Protocol, ProtocolNaming},
Expand Down Expand Up @@ -276,6 +276,7 @@ pub struct AlephConfig<C, SC> {
pub spawn_handle: SpawnHandle,
pub keystore: Arc<dyn Keystore>,
pub justification_rx: mpsc::UnboundedReceiver<Justification>,
pub block_rx: mpsc::UnboundedReceiver<AlephBlock>,
pub metrics: TimingBlockMetrics,
pub registry: Option<Registry>,
pub session_period: SessionPeriod,
Expand Down
12 changes: 6 additions & 6 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use crate::{
session::SessionBoundaryInfo,
session_map::{AuthorityProviderImpl, FinalityNotifierImpl, SessionMapUpdater},
sync::{
ChainStatus, FinalizationStatus, Justification, JustificationTranslator,
OldSyncCompatibleRequestBlocks, Service as SyncService, SubstrateChainStatusNotifier,
SubstrateFinalizationInfo, VerifierCache, IO as SyncIO,
ChainStatus, DatabaseIO as SyncDatabaseIO, FinalizationStatus, Justification,
JustificationTranslator, OldSyncCompatibleRequestBlocks, Service as SyncService,
SubstrateChainStatusNotifier, SubstrateFinalizationInfo, VerifierCache, IO as SyncIO,
},
AlephConfig,
};
Expand Down Expand Up @@ -65,6 +65,7 @@ where
session_period,
millisecs_per_block,
justification_rx,
block_rx,
backup_saving_path,
external_addresses,
validator_port,
Expand Down Expand Up @@ -151,13 +152,12 @@ where
let finalizer = AlephFinalizer::new(client.clone(), metrics.clone());
import_queue_handle.attach_metrics(metrics.clone());
let sync_io = SyncIO::new(
chain_status.clone(),
finalizer,
import_queue_handle,
SyncDatabaseIO::new(chain_status.clone(), finalizer, import_queue_handle),
block_sync_network,
chain_events,
sync_oracle,
justification_rx,
block_rx,
);
let (sync_service, justifications_for_sync, request_block) =
match SyncService::new(verifier, session_info.clone(), sync_io, registry.clone()) {
Expand Down
8 changes: 7 additions & 1 deletion finality-aleph/src/sync/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
};

mod request_handler;
pub use request_handler::{Action, RequestHandlerError};
pub use request_handler::{block_to_response, Action, RequestHandlerError};

use crate::sync::data::{ResponseItem, ResponseItems};

Expand Down Expand Up @@ -722,6 +722,12 @@ where
pub fn extension_request(&self) -> ExtensionRequest<I> {
self.forest.extension_request()
}

/// Handle a block freshly created by this node. Imports it and returns a form of it that can be broadcast.
pub fn handle_own_block(&mut self, block: B) -> Vec<ResponseItem<B, J>> {
self.block_importer.import_block(block.clone());
block_to_response(block)
}
}

#[cfg(test)]
Expand Down
14 changes: 14 additions & 0 deletions finality-aleph/src/sync/handler/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ where
}
}

fn single_block(block: B) -> Self {
let mut result = Self::empty();
result.add_block_and_header(block);
result
}

fn from_just(justification: J) -> Self {
Self {
just: Some(justification),
Expand Down Expand Up @@ -436,3 +442,11 @@ where
Ok(Action::new(response_items))
}
}

/// Create a pseudo-response from a single block that assumes the recipent has the parent block.
/// USeful for broadcasting self-created blocks.
pub fn block_to_response<B: Block, J: Justification<Header = B::Header>>(
block: B,
) -> Vec<ResponseItem<B, J>> {
PreChunk::single_block(block).into_chunk()
}
1 change: 1 addition & 0 deletions finality-aleph/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod tasks;
mod ticker;

pub use compatibility::OldSyncCompatibleRequestBlocks;
pub use handler::DatabaseIO;
pub use service::{Service, IO};
pub use substrate::{
Justification as SubstrateJustification, JustificationTranslator, SessionVerifier,
Expand Down
34 changes: 27 additions & 7 deletions finality-aleph/src/sync/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::marker::PhantomData;
use std::{collections::HashSet, time::Duration};

use futures::{channel::mpsc, StreamExt};
Expand Down Expand Up @@ -45,6 +44,7 @@ where
chain_events: CE,
sync_oracle: SyncOracle,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
blocks_from_creator: mpsc::UnboundedReceiver<B>,
database_io: DatabaseIO<B, J, CS, F, BI>,
}

Expand All @@ -59,20 +59,19 @@ where
BI: BlockImport<B>,
{
pub fn new(
chain_status: CS,
finalizer: F,
block_importer: BI,
database_io: DatabaseIO<B, J, CS, F, BI>,
network: N,
chain_events: CE,
sync_oracle: SyncOracle,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
blocks_from_creator: mpsc::UnboundedReceiver<B>,
) -> Self {
let database_io = DatabaseIO::new(chain_status, finalizer, block_importer);
IO {
network,
chain_events,
sync_oracle,
additional_justifications_from_user,
blocks_from_creator,
database_io,
}
}
Expand All @@ -99,7 +98,7 @@ where
justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
additional_justifications_from_user: mpsc::UnboundedReceiver<J::Unverified>,
block_requests_from_user: mpsc::UnboundedReceiver<BlockId>,
_phantom: PhantomData<B>,
blocks_from_creator: mpsc::UnboundedReceiver<B>,
metrics: Metrics,
}

Expand Down Expand Up @@ -151,6 +150,7 @@ where
chain_events,
sync_oracle,
additional_justifications_from_user,
blocks_from_creator,
database_io,
} = io;
let network = VersionWrapper::new(network);
Expand Down Expand Up @@ -178,9 +178,9 @@ where
chain_events,
justifications_from_user,
additional_justifications_from_user,
blocks_from_creator,
block_requests_from_user,
metrics,
_phantom: PhantomData,
},
justifications_for_sync,
block_requests_for_sync,
Expand Down Expand Up @@ -596,6 +596,19 @@ where
}
}

fn handle_own_block(&mut self, block: B) {
let broadcast = self.handler.handle_own_block(block);
if let Err(e) = self
.network
.broadcast(NetworkData::RequestResponse(broadcast))
{
warn!(
target: LOG_TARGET,
"Error broadcasting newly created block: {}.", e
)
}
}

/// Stay synchronized.
pub async fn run(mut self) {
loop {
Expand Down Expand Up @@ -632,6 +645,13 @@ where
},
None => warn!(target: LOG_TARGET, "Channel with internal block request from user closed."),
},
maybe_own_block = self.blocks_from_creator.next() => match maybe_own_block {
Some(block) => {
debug!(target: LOG_TARGET, "Received new onw block: {:?}.", block.header().id());
self.handle_own_block(block)
},
None => warn!(target: LOG_TARGET, "Channel with own blocks closed."),
},
}
}
}
Expand Down

0 comments on commit f590a10

Please sign in to comment.