From 8201ca8053027ee28e69beede73307e88a0acdd1 Mon Sep 17 00:00:00 2001 From: timorleph <145755355+timorleph@users.noreply.github.com> Date: Fri, 3 May 2024 10:08:48 +0200 Subject: [PATCH] A0-3901: Handle events in the base protocol (#1708) # Description Make the base protocol service handle network events. In particular, it internally adds all the new peers to the other protocols, so it's not necessary to produce events for any outside actors. Also some minor improvements for things I found on the way. ## Type of change - New feature (non-breaking change which adds functionality) # Checklist: - I have made corresponding changes to the existing documentation - I have created new documentation --- finality-aleph/src/base_protocol/handler.rs | 250 +++++++++++++------- finality-aleph/src/base_protocol/service.rs | 109 +++++++-- finality-aleph/src/lib.rs | 4 +- finality-aleph/src/network/substrate.rs | 9 +- 4 files changed, 264 insertions(+), 108 deletions(-) diff --git a/finality-aleph/src/base_protocol/handler.rs b/finality-aleph/src/base_protocol/handler.rs index ffd034ce36..057cb2dc06 100644 --- a/finality-aleph/src/base_protocol/handler.rs +++ b/finality-aleph/src/base_protocol/handler.rs @@ -1,31 +1,146 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + fmt::{Display, Formatter, Result as FmtResult}, +}; use parity_scale_codec::{DecodeAll, Error as CodecError}; -use sc_network::{config::FullNetworkConfiguration, PeerId}; +use sc_network::{config::NetworkConfiguration, service::traits::Direction, PeerId}; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; use sp_runtime::traits::{Block, Header, Saturating}; use crate::{BlockHash, BlockNumber}; +#[derive(Clone, Debug)] +pub enum DisconnectError { + PeerWasNotConnected, +} + +/// The role of the connected node. +#[derive(Clone, Copy, Debug)] +pub enum Role { + /// A full node, the expected type. + Full, + /// A light node, we support these connecting to us, but don't provide any implementations. + Light, +} + +impl From for Role { + fn from(roles: Roles) -> Self { + match roles.is_full() { + true => Role::Full, + false => Role::Light, + } + } +} + +#[derive(Clone, Debug)] +struct PeerInfo { + role: Role, + direction: Direction, +} + +impl PeerInfo { + pub fn new(role: Role, direction: Direction) -> Self { + PeerInfo { role, direction } + } +} + +/// Reasons to refuse connecting to a peer. +#[derive(Clone, Debug)] pub enum ConnectError { + /// We weren't able to decode the handshake. BadlyEncodedHandshake(CodecError), - BadHandshakeGenesis, - PeerAlreadyConnected, - TooManyFullInboundPeers, - TooManyFullOutboundPeers, + /// The peer is running on a different chain. + BadHandshakeGenesis(BlockHash), + /// The peer is already connected. + AlreadyConnected(PeerId), + /// There are too many full peers already connected in the given direction. + TooManyFullPeers(Direction), + /// There are too many light peers already connected. TooManyLightPeers, } -pub enum DisconnectError { - PeerWasNotConnected, +impl ConnectError { + fn too_many_peers(peer: PeerInfo) -> Self { + use ConnectError::*; + use Role::*; + match peer.role { + Full => TooManyFullPeers(peer.direction), + Light => TooManyLightPeers, + } + } } -struct PeerInfo { - role: Roles, - is_inbound: bool, +impl Display for ConnectError { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + use ConnectError::*; + match self { + BadlyEncodedHandshake(e) => write!(f, "failed to decode handshake: {}", e), + BadHandshakeGenesis(genesis) => write!(f, "peer has different genesis {}", genesis), + AlreadyConnected(peer_id) => write!(f, "peer {} already connected", peer_id), + TooManyFullPeers(direction) => { + write!(f, "too many full nodes connected {:?}", direction) + } + TooManyLightPeers => write!(f, "too many light nodes connected"), + } + } +} + +struct ConnectionLimits { + num_full_in_peers: usize, + num_full_out_peers: usize, + num_light_peers: usize, + max_full_in_peers: usize, + max_full_out_peers: usize, + max_light_peers: usize, +} + +impl ConnectionLimits { + pub fn new(net_config: &NetworkConfiguration) -> Self { + // It is assumed that `default_peers_set.out_peers` only refers to full nodes, but + // `default_peers_set.in_peers` refers to both full and light nodes. + // Moreover, `default_peers_set_num_full` refers to the total of full nodes. + let max_full_out_peers = net_config.default_peers_set.out_peers as usize; + let max_full_in_peers = + (net_config.default_peers_set_num_full as usize).saturating_sub(max_full_out_peers); + let max_light_peers = + (net_config.default_peers_set.in_peers as usize).saturating_sub(max_full_in_peers); + ConnectionLimits { + num_full_in_peers: 0, + num_full_out_peers: 0, + num_light_peers: 0, + max_full_in_peers, + max_full_out_peers, + max_light_peers, + } + } + + pub fn allowed(&self, peer: &PeerInfo) -> bool { + match (peer.role, peer.direction) { + (Role::Light, _) => self.num_light_peers < self.max_light_peers, + (Role::Full, Direction::Inbound) => self.num_full_in_peers < self.max_full_in_peers, + (Role::Full, Direction::Outbound) => self.num_full_out_peers < self.max_full_out_peers, + } + } + + fn count_for(&mut self, peer: &PeerInfo) -> &mut usize { + match (peer.role, peer.direction) { + (Role::Light, _) => &mut self.num_light_peers, + (Role::Full, Direction::Inbound) => &mut self.num_full_in_peers, + (Role::Full, Direction::Outbound) => &mut self.num_full_out_peers, + } + } + + pub fn add(&mut self, peer: &PeerInfo) { + self.count_for(peer).saturating_inc(); + } + + pub fn remove(&mut self, peer: &PeerInfo) { + self.count_for(peer).saturating_dec(); + } } -/// Handler for the base protocol. +/// Handler for the base protocol. Deals with accepting and counting connections. pub struct Handler where B: Block, @@ -33,13 +148,8 @@ where { reserved_nodes: HashSet, peers: HashMap, - // the below counters and bounds ignore the nodes which belong to `reserved_nodes` - num_full_in_peers: usize, - num_full_out_peers: usize, - num_light_peers: usize, - max_full_in_peers: usize, - max_full_out_peers: usize, - max_light_peers: usize, + // the limits ignore the nodes which belong to `reserved_nodes` + limits: ConnectionLimits, genesis_hash: B::Hash, } @@ -49,122 +159,90 @@ where B::Header: Header, { /// Create a new handler. - pub fn new(genesis_hash: B::Hash, net_config: &FullNetworkConfiguration) -> Self { + pub fn new(genesis_hash: B::Hash, net_config: &NetworkConfiguration) -> Self { let reserved_nodes = net_config - .network_config .default_peers_set .reserved_nodes .iter() .map(|reserved| reserved.peer_id) .collect(); - - // It is assumed that `default_peers_set.out_peers` only refers to full nodes, but - // `default_peers_set.in_peers` refers to both full and light nodes. - // Moreover, `default_peers_set_num_full` refers to the total of full nodes. - let max_full_out_peers = net_config.network_config.default_peers_set.out_peers as usize; - let max_full_in_peers = (net_config.network_config.default_peers_set_num_full as usize) - .saturating_sub(max_full_out_peers); - let max_light_peers = (net_config.network_config.default_peers_set.in_peers as usize) - .saturating_sub(max_full_in_peers); + let limits = ConnectionLimits::new(net_config); Handler { reserved_nodes, peers: HashMap::new(), - max_full_in_peers, - max_full_out_peers, - max_light_peers, - num_full_in_peers: 0, - num_full_out_peers: 0, - num_light_peers: 0, + limits, genesis_hash, } } + fn is_reserved(&self, peer_id: &PeerId) -> bool { + self.reserved_nodes.contains(peer_id) + } + fn verify_connection( &self, peer_id: PeerId, handshake: Vec, - is_inbound: bool, - ) -> Result { + direction: Direction, + ) -> Result { let handshake = BlockAnnouncesHandshake::::decode_all(&mut &handshake[..]) .map_err(ConnectError::BadlyEncodedHandshake)?; if handshake.genesis_hash != self.genesis_hash { - return Err(ConnectError::BadHandshakeGenesis); + return Err(ConnectError::BadHandshakeGenesis(handshake.genesis_hash)); } if self.peers.contains_key(&peer_id) { - return Err(ConnectError::PeerAlreadyConnected); + return Err(ConnectError::AlreadyConnected(peer_id)); } - if self.reserved_nodes.contains(&peer_id) { - return Ok(handshake.roles); - } + let peer = PeerInfo::new(handshake.roles.into(), direction); - // Check slot constraints depending on the node's role and the connection's direction. - if is_inbound - && handshake.roles.is_full() - && self.num_full_in_peers >= self.max_full_in_peers - { - return Err(ConnectError::TooManyFullInboundPeers); - } - if !is_inbound - && handshake.roles.is_full() - && self.num_full_out_peers >= self.max_full_out_peers - { - return Err(ConnectError::TooManyFullOutboundPeers); + match self.is_reserved(&peer_id) || self.limits.allowed(&peer) { + true => Ok(peer), + false => Err(ConnectError::too_many_peers(peer)), } - if handshake.roles.is_light() && self.num_light_peers >= self.max_light_peers { - return Err(ConnectError::TooManyLightPeers); - } - - Ok(handshake.roles) } + /// Accept or reject a peer. pub fn on_peer_connect( &mut self, peer_id: PeerId, handshake: Vec, - is_inbound: bool, + direction: Direction, ) -> Result<(), ConnectError> { - let role = self.verify_connection(peer_id, handshake, is_inbound)?; + let peer = self.verify_connection(peer_id, handshake, direction)?; - self.peers.insert(peer_id, PeerInfo { role, is_inbound }); - - if self.reserved_nodes.contains(&peer_id) { - return Ok(()); + if !self.is_reserved(&peer_id) { + self.limits.add(&peer); } - // Assign a slot for the node depending on their role and the connection's direction. - if is_inbound && role.is_full() { - self.num_full_in_peers += 1; - } else if !is_inbound && role.is_full() { - self.num_full_out_peers += 1; - } else if role.is_light() { - self.num_light_peers += 1; - } + self.peers.insert(peer_id, peer); Ok(()) } + /// Clean up a disconnected peer. pub fn on_peer_disconnect(&mut self, peer_id: PeerId) -> Result<(), DisconnectError> { - let info = self + let peer = self .peers .remove(&peer_id) .ok_or(DisconnectError::PeerWasNotConnected)?; - if self.reserved_nodes.contains(&peer_id) { - return Ok(()); - } - - // Free the slot of the node depending on their role and the connection's direction. - if info.is_inbound && info.role.is_full() { - self.num_full_in_peers.saturating_dec(); - } else if !info.is_inbound && info.role.is_full() { - self.num_full_out_peers.saturating_dec(); - } else if info.role.is_light() { - self.num_light_peers.saturating_dec(); + if !self.is_reserved(&peer_id) { + self.limits.remove(&peer) } Ok(()) } + + /// Checks whether an inbound peer would be accepted. + pub fn verify_inbound_connection( + &mut self, + peer_id: PeerId, + handshake: Vec, + ) -> Result<(), ConnectError> { + self.verify_connection(peer_id, handshake, Direction::Inbound) + .map(|_| ()) + } } diff --git a/finality-aleph/src/base_protocol/service.rs b/finality-aleph/src/base_protocol/service.rs index 3aee32a42a..ae06f71efa 100644 --- a/finality-aleph/src/base_protocol/service.rs +++ b/finality-aleph/src/base_protocol/service.rs @@ -1,5 +1,6 @@ use std::{ fmt::{Display, Formatter, Result as FmtResult}, + iter, sync::{ atomic::{AtomicBool, AtomicUsize}, Arc, @@ -7,10 +8,15 @@ use std::{ }; use futures::stream::StreamExt; -use log::{debug, trace, warn}; -use sc_network::config::FullNetworkConfiguration; -use sc_network_sync::{service::syncing_service::ToServiceCommand, SyncEvent, SyncingService}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use log::{debug, error, trace, warn}; +use sc_network::{ + config::{NetworkConfiguration, NotificationService}, + multiaddr::Protocol as MultiaddressProtocol, + service::traits::{NotificationEvent as SubstrateEvent, ValidationResult}, + Multiaddr, NetworkPeers, NetworkService, ProtocolName, +}; +use sc_network_sync::{service::syncing_service::ToServiceCommand, SyncingService}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; use sp_runtime::traits::{Block, Header}; use crate::{ @@ -21,26 +27,31 @@ use crate::{ #[derive(Debug)] pub enum Error { NoIncomingCommands, + NoNetworkEvents, } impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { use Error::*; match self { - NoIncomingCommands => write!(f, "Channel with commands from user closed."), + NoIncomingCommands => write!(f, "channel with commands from user closed"), + NoNetworkEvents => write!(f, "channel with events from network closed"), } } } /// A service that needs to be run to have the base protocol of the network work. +/// It also responds to some external requests, but mostly by mocking them. pub struct Service where B: Block, B::Header: Header, { handler: Handler, + protocol_names: Vec, + network: Arc>, commands_from_user: TracingUnboundedReceiver>, - events_for_users: Vec>, + events_from_network: Box, } impl Service @@ -55,15 +66,20 @@ where pub fn new( major_sync: Arc, genesis_hash: B::Hash, - net_config: &FullNetworkConfiguration, + net_config: &NetworkConfiguration, + protocol_names: Vec, + network: Arc>, + events_from_network: Box, ) -> (Self, SyncingService) { let (commands_for_service, commands_from_user) = tracing_unbounded("mpsc_base_protocol", 100_000); ( Service { handler: Handler::new(genesis_hash, net_config), + protocol_names, + network, commands_from_user, - events_for_users: Vec::new(), + events_from_network, }, SyncingService::new( commands_for_service, @@ -77,8 +93,12 @@ where fn handle_command(&mut self, command: ToServiceCommand) { use ToServiceCommand::*; match command { - EventStream(events_for_user) => self.events_for_users.push(events_for_user), + EventStream(_) => { + warn!(target: LOG_TARGET, "We don't support sending downstream events to users, yet someone requested them.") + } PeersInfo(_) => { + // This unfortunately will happen when someone calls the appropriate RPC. + // Almost no one uses it though, and supporting it would be too much of a pain. debug!( target: LOG_TARGET, "Failed to send response to peers info request - call unsupported." @@ -102,16 +122,75 @@ where } } + fn handle_network_event(&mut self, event: SubstrateEvent) { + use SubstrateEvent::*; + match event { + ValidateInboundSubstream { + peer, + handshake, + result_tx, + } => { + let result = match self.handler.verify_inbound_connection(peer, handshake) { + Ok(()) => ValidationResult::Accept, + Err(e) => { + debug!(target: LOG_TARGET, "Rejecting incoming substream: {}.", e); + ValidationResult::Reject + } + }; + if result_tx.send(result).is_err() { + debug!( + target: LOG_TARGET, + "Failed to send response to inbound substream validation request." + ); + } + } + NotificationStreamOpened { + peer, + handshake, + direction, + negotiated_fallback: _, + } => match self.handler.on_peer_connect(peer, handshake, direction) { + Ok(()) => { + let multiaddress: Multiaddr = + iter::once(MultiaddressProtocol::P2p(peer.into())).collect(); + trace!(target: LOG_TARGET, "Connect event from address {:?}.", multiaddress); + for name in &self.protocol_names { + if let Err(e) = self.network.add_peers_to_reserved_set( + name.clone(), + iter::once(multiaddress.clone()).collect(), + ) { + error!(target: LOG_TARGET, "Adding peer to the {} reserved set failed: {}.", name, e); + } + } + } + Err(e) => debug!(target:LOG_TARGET, "Failed to accept connection: {}.", e), + }, + NotificationStreamClosed { peer } => { + trace!(target: LOG_TARGET, "Disconnect event for peer {:?}", peer); + let addresses: Vec<_> = iter::once(peer).collect(); + for name in &self.protocol_names { + if let Err(e) = self + .network + .remove_peers_from_reserved_set(name.clone(), addresses.clone()) + { + warn!(target: LOG_TARGET, "Removing peer from the {} reserved set failed: {}", name, e) + } + } + } + NotificationReceived { peer, .. } => { + debug!(target: LOG_TARGET, "Received unexpected message in the base protocol from {}.", peer) + } + } + } + /// Run the service managing the base protocol. pub async fn run(mut self) -> Result<(), Error> { use Error::*; loop { - let command = self - .commands_from_user - .next() - .await - .ok_or(NoIncomingCommands)?; - self.handle_command(command); + tokio::select! { + command = self.commands_from_user.next() => self.handle_command(command.ok_or(NoIncomingCommands)?), + event = self.events_from_network.next_event() => self.handle_network_event(event.ok_or(NoNetworkEvents)?), + } } } } diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 9bfd039636..fec523575a 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -15,7 +15,7 @@ use futures::{ }; use parity_scale_codec::{Decode, Encode, Output}; use primitives as aleph_primitives; -use primitives::{AuthorityId, Block as AlephBlock, BlockHash, BlockNumber, Hash as AlephHash}; +use primitives::{AuthorityId, Block as AlephBlock, BlockHash, BlockNumber}; use sc_client_api::{ Backend, BlockBackend, BlockchainEvents, Finalizer, LockImportRun, StorageProvider, }; @@ -258,7 +258,7 @@ pub struct RateLimiterConfig { pub struct AlephConfig { pub authentication_network: ProtocolNetwork, pub block_sync_network: ProtocolNetwork, - pub sync_network_service: SyncNetworkService, + pub sync_network_service: SyncNetworkService, pub client: Arc, pub chain_status: SubstrateChainStatus, pub import_queue_handle: BlockImporter, diff --git a/finality-aleph/src/network/substrate.rs b/finality-aleph/src/network/substrate.rs index 26e7c26fa5..9f6cf58781 100644 --- a/finality-aleph/src/network/substrate.rs +++ b/finality-aleph/src/network/substrate.rs @@ -16,7 +16,6 @@ use sc_network::{ service::traits::{NotificationEvent as SubstrateEvent, ValidationResult}, Multiaddr, NetworkPeers, NetworkService, ProtocolName, }; -use sc_network_common::ExHashT; use sc_network_sync::{SyncEvent, SyncEventStream, SyncingService}; use sp_runtime::traits::Block; use tokio::time; @@ -42,15 +41,15 @@ impl Display for SyncNetworkServiceError { } /// Service responsible for handling network events emitted by the base sync protocol. -pub struct SyncNetworkService { +pub struct SyncNetworkService { sync_stream: Fuse + Send>>>, - network: Arc>, + network: Arc>, protocol_names: Vec, } -impl SyncNetworkService { +impl SyncNetworkService { pub fn new( - network: Arc>, + network: Arc>, sync_network: Arc>, protocol_names: Vec, ) -> Self {