Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-3032: Handle own blocks in sync #1426

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use aleph_runtime::{self, opaque::Block, RuntimeApi};
use finality_aleph::{
run_validator_node, AlephBlockImport, AlephConfig, BlockImporter, Justification,
JustificationTranslator, MillisecsPerBlock, Protocol, ProtocolNaming, RateLimiterConfig,
SessionPeriod, SubstrateChainStatus, SyncOracle, TimingBlockMetrics, TracingBlockImport,
RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, SyncOracle, TimingBlockMetrics,
TracingBlockImport,
};
use futures::channel::mpsc;
use log::warn;
Expand Down Expand Up @@ -88,7 +89,6 @@ pub fn new_partial(
sc_consensus::DefaultImportQueue<Block, FullClient>,
sc_transaction_pool::FullPool<Block, FullClient>,
(
TracingBlockImport<Arc<FullClient>>,
mpsc::UnboundedSender<Justification>,
mpsc::UnboundedReceiver<Justification>,
Option<Telemetry>,
Expand Down Expand Up @@ -151,7 +151,7 @@ pub fn new_partial(
.map_err(|e| ServiceError::Other(format!("failed to set up chain status: {e}")))?,
);
let aleph_block_import = AlephBlockImport::new(
tracing_block_import.clone(),
tracing_block_import,
justification_tx.clone(),
justification_translator,
);
Expand Down Expand Up @@ -190,13 +190,7 @@ pub fn new_partial(
keystore_container,
select_chain,
transaction_pool,
other: (
tracing_block_import,
justification_tx,
justification_rx,
telemetry,
metrics,
),
other: (justification_tx, justification_rx, telemetry, metrics),
})
}

Expand Down Expand Up @@ -313,9 +307,11 @@ pub fn new_authority(
keystore_container,
select_chain,
transaction_pool,
other: (block_import, justification_tx, justification_rx, mut telemetry, metrics),
other: (justification_tx, justification_rx, mut telemetry, metrics),
} = new_partial(&config)?;

let (block_import, block_rx) = RedirectingBlockImport::new(client.clone());

let backup_path = backup_path(&aleph_config, config.base_path.path());

let finalized = client.info().finalized_hash;
Expand Down Expand Up @@ -380,7 +376,7 @@ pub fn new_authority(
backoff_authoring_blocks,
keystore: keystore_container.keystore(),
sync_oracle: sync_oracle.clone(),
justification_sync_link: sync_network.clone(),
justification_sync_link: (),
block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32),
max_block_proposal_slot_portion: None,
telemetry: telemetry.as_ref().map(|x| x.handle()),
Expand Down Expand Up @@ -415,6 +411,7 @@ pub fn new_authority(
spawn_handle: task_manager.spawn_handle().into(),
keystore: keystore_container.keystore(),
justification_rx,
block_rx,
metrics,
registry: prometheus_registry,
unit_creation_delay: aleph_config.unit_creation_delay(),
Expand Down
90 changes: 88 additions & 2 deletions finality-aleph/src/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::{fmt::Debug, time::Instant};
use std::{
error::Error,
fmt::{Debug, Display, Error as FmtError, Formatter},
time::Instant,
};

use futures::channel::mpsc::{TrySendError, UnboundedSender};
use futures::channel::mpsc::{self, TrySendError, UnboundedReceiver, UnboundedSender};
use log::{debug, warn};
use sc_consensus::{
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
Expand Down Expand Up @@ -217,3 +221,85 @@ where
})
}
}

/// A wrapper around a block import that actually sends all the blocks elsewhere through a channel.
/// Very barebones, e.g. does not work with justifications, but sufficient for passing to Aura.
#[derive(Clone)]
pub struct RedirectingBlockImport<I>
where
I: BlockImport<Block> + Clone + Send,
{
inner: I,
blocks_tx: UnboundedSender<Block>,
}

impl<I> RedirectingBlockImport<I>
where
I: BlockImport<Block> + Clone + Send,
{
pub fn new(inner: I) -> (Self, UnboundedReceiver<Block>) {
let (blocks_tx, blocks_rx) = mpsc::unbounded();
(Self { inner, blocks_tx }, blocks_rx)
}
}

/// What can go wrong when redirecting a block import.
#[derive(Debug)]
pub enum RedirectingImportError<E> {
Inner(E),
MissingBody,
ChannelClosed,
}

impl<E: Display> Display for RedirectingImportError<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use RedirectingImportError::*;
match self {
Inner(e) => write!(f, "{}", e),
MissingBody => write!(
f,
"redirecting block import does not support importing blocks without a body"
),
ChannelClosed => write!(f, "channel closed, cannot redirect import"),
}
}
}

impl<E: Display + Debug> Error for RedirectingImportError<E> {}

#[async_trait::async_trait]
impl<I> BlockImport<Block> for RedirectingBlockImport<I>
where
I: BlockImport<Block> + Clone + Send,
{
type Error = RedirectingImportError<I::Error>;
type Transaction = I::Transaction;

async fn check_block(
&mut self,
woocash2 marked this conversation as resolved.
Show resolved Hide resolved
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner
.check_block(block)
.await
.map_err(RedirectingImportError::Inner)
}

async fn import_block(
&mut self,
block: BlockImportParams<Block, Self::Transaction>,
) -> Result<ImportResult, Self::Error> {
let header = block.post_header();
let BlockImportParams { body, .. } = block;

let extrinsics = body.ok_or(RedirectingImportError::MissingBody)?;

self.blocks_tx
.unbounded_send(Block { header, extrinsics })
.map_err(|_| RedirectingImportError::ChannelClosed)?;

// We claim it was successfully imported and no further action is necessary.
// This is likely inaccurate, but again, should be enough for Aura.
Ok(ImportResult::Imported(Default::default()))
woocash2 marked this conversation as resolved.
Show resolved Hide resolved
}
}
3 changes: 2 additions & 1 deletion finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod sync_oracle;
pub mod testing;

pub use crate::{
import::{AlephBlockImport, TracingBlockImport},
import::{AlephBlockImport, RedirectingBlockImport, TracingBlockImport},
justification::AlephJustification,
metrics::TimingBlockMetrics,
network::{Protocol, ProtocolNaming},
Expand Down Expand Up @@ -276,6 +276,7 @@ pub struct AlephConfig<C, SC> {
pub spawn_handle: SpawnHandle,
pub keystore: Arc<dyn Keystore>,
pub justification_rx: mpsc::UnboundedReceiver<Justification>,
pub block_rx: mpsc::UnboundedReceiver<AlephBlock>,
pub metrics: TimingBlockMetrics,
pub registry: Option<Registry>,
pub session_period: SessionPeriod,
Expand Down
12 changes: 6 additions & 6 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use crate::{
session::SessionBoundaryInfo,
session_map::{AuthorityProviderImpl, FinalityNotifierImpl, SessionMapUpdater},
sync::{
ChainStatus, FinalizationStatus, Justification, JustificationTranslator,
Service as SyncService, SubstrateChainStatusNotifier, SubstrateFinalizationInfo,
VerifierCache, IO as SyncIO,
ChainStatus, DatabaseIO as SyncDatabaseIO, FinalizationStatus, Justification,
JustificationTranslator, Service as SyncService, SubstrateChainStatusNotifier,
SubstrateFinalizationInfo, VerifierCache, IO as SyncIO,
},
AlephConfig,
};
Expand Down Expand Up @@ -65,6 +65,7 @@ where
session_period,
millisecs_per_block,
justification_rx,
block_rx,
backup_saving_path,
external_addresses,
validator_port,
Expand Down Expand Up @@ -149,13 +150,12 @@ where
let finalizer = AlephFinalizer::new(client.clone(), metrics.clone());
import_queue_handle.attach_metrics(metrics.clone());
let sync_io = SyncIO::new(
chain_status.clone(),
finalizer,
import_queue_handle,
SyncDatabaseIO::new(chain_status.clone(), finalizer, import_queue_handle),
block_sync_network,
chain_events,
sync_oracle.clone(),
justification_rx,
block_rx,
);
let (sync_service, justifications_for_sync, request_block) =
match SyncService::new(verifier, session_info.clone(), sync_io, registry.clone()) {
Expand Down
75 changes: 69 additions & 6 deletions finality-aleph/src/sync/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
};

mod request_handler;
pub use request_handler::{Action, RequestHandlerError};
pub use request_handler::{block_to_response, Action, RequestHandlerError};

use crate::sync::data::{ResponseItem, ResponseItems};

Expand Down Expand Up @@ -583,12 +583,12 @@ where
if self.forest.skippable(&h.id()) {
continue;
}
if let Err(e) = self.forest.update_header(&h, Some(peer.clone()), false) {
return (new_highest, Some(Error::Forest(e)));
}
if !self.forest.importable(&h.id()) {
return (new_highest, Some(Error::HeaderNotRequired));
}
if let Err(e) = self.forest.update_header(&h, Some(peer.clone()), true) {
return (new_highest, Some(Error::Forest(e)));
}
}
ResponseItem::Block(b) => {
if self.forest.skippable(&b.header().id()) {
Expand Down Expand Up @@ -722,6 +722,12 @@ where
pub fn extension_request(&self) -> ExtensionRequest<I> {
self.forest.extension_request()
}

/// Handle a block freshly created by this node. Imports it and returns a form of it that can be broadcast.
pub fn handle_own_block(&mut self, block: B) -> Vec<ResponseItem<B, J>> {
self.block_importer.import_block(block.clone());
block_to_response(block)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1329,7 +1335,10 @@ mod tests {
);
let (new_info, maybe_error) = handler.handle_request_response(response, 12);
assert!(!new_info, "should not create new highest justified");
assert!(maybe_error.is_none(), "should work");
match maybe_error {
None => panic!("should fail when it reaches the top finalized"),
Some(_) => (),
}

// check that the fork is pruned
assert_eq!(
Expand Down Expand Up @@ -1420,7 +1429,10 @@ mod tests {
);
let (new_info, maybe_error) = handler.handle_request_response(response, 12);
assert!(!new_info, "should not create new highest justified");
assert!(maybe_error.is_none(), "should work");
match maybe_error {
None => panic!("should fail when it reaches the top finalized"),
Some(_) => (),
}

// check that the fork is pruned
assert_eq!(
Expand Down Expand Up @@ -2206,6 +2218,57 @@ mod tests {
assert!(!handler.handle_internal_request(&headers[1].id()).unwrap());
}

#[test]
fn broadcasts_own_block() {
let (mut handler, backend, _keep, _genesis) = setup();
let block = MockBlock::new(
backend
.top_finalized()
.expect("mock backend works")
.header()
.random_branch()
.next()
.expect("branch creation succeeds"),
true,
);

let result = handler.handle_own_block(block.clone());
match result.get(0).expect("the header is there") {
ResponseItem::Header(header) => assert_eq!(header, block.header()),
other => panic!("expected header item, got {:?}", other),
}
match result.get(1).expect("the block is there") {
ResponseItem::Block(block_item) => assert_eq!(block_item.header(), block.header()),
other => panic!("expected block item, got {:?}", other),
}
}

#[tokio::test]
async fn accepts_broadcast_block() {
let (mut handler, backend, mut notifier, _genesis) = setup();
let block = MockBlock::new(
backend
.top_finalized()
.expect("mock backend works")
.header()
.random_branch()
.next()
.expect("branch creation succeeds"),
true,
);

let broadcast = handler.handle_own_block(block.clone());
match handler.handle_request_response(broadcast, rand::random()) {
(true, _) => panic!("block unexpectedly changed top finalized"),
(false, Some(e)) => panic!("error handling block broadcast: {}", e),
(false, None) => (),
}
assert_eq!(
notifier.next().await.expect("should receive notification"),
BlockImported(block.header().clone())
);
}

//TODO(A0-2984): remove this after legacy sync is excised
#[tokio::test]
async fn works_with_overzealous_imports() {
Expand Down
14 changes: 14 additions & 0 deletions finality-aleph/src/sync/handler/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ where
}
}

fn single_block(block: B) -> Self {
let mut result = Self::empty();
result.add_block_and_header(block);
result
}

fn from_just(justification: J) -> Self {
Self {
just: Some(justification),
Expand Down Expand Up @@ -436,3 +442,11 @@ where
Ok(Action::new(response_items))
}
}

/// Create a pseudo-response from a single block that assumes the recipent has the parent block.
/// USeful for broadcasting self-created blocks.
pub fn block_to_response<B: Block, J: Justification<Header = B::Header>>(
block: B,
) -> Vec<ResponseItem<B, J>> {
PreChunk::single_block(block).into_chunk()
}
5 changes: 3 additions & 2 deletions finality-aleph/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod task_queue;
mod tasks;
mod ticker;

pub use handler::DatabaseIO;
pub use service::{Service, IO};
pub use substrate::{
Justification as SubstrateJustification, JustificationTranslator, SessionVerifier,
Expand Down Expand Up @@ -95,7 +96,7 @@ pub trait Finalizer<J: Justification> {
}

/// A notification about the chain status changing.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ChainStatusNotification<H: Header> {
/// A block has been imported.
BlockImported(H),
Expand All @@ -107,7 +108,7 @@ pub enum ChainStatusNotification<H: Header> {
/// We assume that this will return all the events, otherwise we will end up with a broken state.
#[async_trait::async_trait]
pub trait ChainStatusNotifier<H: Header> {
type Error: Display;
type Error: Debug + Display;

/// Returns a chain status notification when it is available.
/// This method's implementation must be cancellation safe.
Expand Down
Loading
Loading