Skip to content

Commit

Permalink
A0-3901: Handle events in the base protocol (#1708)
Browse files Browse the repository at this point in the history
# Description

Make the base protocol service handle network events. In particular, it
internally adds all the new peers to the other protocols, so it's not
necessary to produce events for any outside actors.

Also some minor improvements for things I found on the way.

## Type of change

- New feature (non-breaking change which adds functionality)

# Checklist:

- I have made corresponding changes to the existing documentation
- I have created new documentation
  • Loading branch information
timorleph authored May 3, 2024
1 parent 8a1778e commit 8201ca8
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 108 deletions.
250 changes: 164 additions & 86 deletions finality-aleph/src/base_protocol/handler.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,155 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
fmt::{Display, Formatter, Result as FmtResult},
};

use parity_scale_codec::{DecodeAll, Error as CodecError};
use sc_network::{config::FullNetworkConfiguration, PeerId};
use sc_network::{config::NetworkConfiguration, service::traits::Direction, PeerId};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::{Block, Header, Saturating};

use crate::{BlockHash, BlockNumber};

#[derive(Clone, Debug)]
pub enum DisconnectError {
PeerWasNotConnected,
}

/// The role of the connected node.
#[derive(Clone, Copy, Debug)]
pub enum Role {
/// A full node, the expected type.
Full,
/// A light node, we support these connecting to us, but don't provide any implementations.
Light,
}

impl From<Roles> for Role {
fn from(roles: Roles) -> Self {
match roles.is_full() {
true => Role::Full,
false => Role::Light,
}
}
}

#[derive(Clone, Debug)]
struct PeerInfo {
role: Role,
direction: Direction,
}

impl PeerInfo {
pub fn new(role: Role, direction: Direction) -> Self {
PeerInfo { role, direction }
}
}

/// Reasons to refuse connecting to a peer.
#[derive(Clone, Debug)]
pub enum ConnectError {
/// We weren't able to decode the handshake.
BadlyEncodedHandshake(CodecError),
BadHandshakeGenesis,
PeerAlreadyConnected,
TooManyFullInboundPeers,
TooManyFullOutboundPeers,
/// The peer is running on a different chain.
BadHandshakeGenesis(BlockHash),
/// The peer is already connected.
AlreadyConnected(PeerId),
/// There are too many full peers already connected in the given direction.
TooManyFullPeers(Direction),
/// There are too many light peers already connected.
TooManyLightPeers,
}

pub enum DisconnectError {
PeerWasNotConnected,
impl ConnectError {
fn too_many_peers(peer: PeerInfo) -> Self {
use ConnectError::*;
use Role::*;
match peer.role {
Full => TooManyFullPeers(peer.direction),
Light => TooManyLightPeers,
}
}
}

struct PeerInfo {
role: Roles,
is_inbound: bool,
impl Display for ConnectError {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
use ConnectError::*;
match self {
BadlyEncodedHandshake(e) => write!(f, "failed to decode handshake: {}", e),
BadHandshakeGenesis(genesis) => write!(f, "peer has different genesis {}", genesis),
AlreadyConnected(peer_id) => write!(f, "peer {} already connected", peer_id),
TooManyFullPeers(direction) => {
write!(f, "too many full nodes connected {:?}", direction)
}
TooManyLightPeers => write!(f, "too many light nodes connected"),
}
}
}

struct ConnectionLimits {
num_full_in_peers: usize,
num_full_out_peers: usize,
num_light_peers: usize,
max_full_in_peers: usize,
max_full_out_peers: usize,
max_light_peers: usize,
}

impl ConnectionLimits {
pub fn new(net_config: &NetworkConfiguration) -> Self {
// It is assumed that `default_peers_set.out_peers` only refers to full nodes, but
// `default_peers_set.in_peers` refers to both full and light nodes.
// Moreover, `default_peers_set_num_full` refers to the total of full nodes.
let max_full_out_peers = net_config.default_peers_set.out_peers as usize;
let max_full_in_peers =
(net_config.default_peers_set_num_full as usize).saturating_sub(max_full_out_peers);
let max_light_peers =
(net_config.default_peers_set.in_peers as usize).saturating_sub(max_full_in_peers);
ConnectionLimits {
num_full_in_peers: 0,
num_full_out_peers: 0,
num_light_peers: 0,
max_full_in_peers,
max_full_out_peers,
max_light_peers,
}
}

pub fn allowed(&self, peer: &PeerInfo) -> bool {
match (peer.role, peer.direction) {
(Role::Light, _) => self.num_light_peers < self.max_light_peers,
(Role::Full, Direction::Inbound) => self.num_full_in_peers < self.max_full_in_peers,
(Role::Full, Direction::Outbound) => self.num_full_out_peers < self.max_full_out_peers,
}
}

fn count_for(&mut self, peer: &PeerInfo) -> &mut usize {
match (peer.role, peer.direction) {
(Role::Light, _) => &mut self.num_light_peers,
(Role::Full, Direction::Inbound) => &mut self.num_full_in_peers,
(Role::Full, Direction::Outbound) => &mut self.num_full_out_peers,
}
}

pub fn add(&mut self, peer: &PeerInfo) {
self.count_for(peer).saturating_inc();
}

pub fn remove(&mut self, peer: &PeerInfo) {
self.count_for(peer).saturating_dec();
}
}

/// Handler for the base protocol.
/// Handler for the base protocol. Deals with accepting and counting connections.
pub struct Handler<B>
where
B: Block<Hash = BlockHash>,
B::Header: Header<Number = BlockNumber>,
{
reserved_nodes: HashSet<PeerId>,
peers: HashMap<PeerId, PeerInfo>,
// the below counters and bounds ignore the nodes which belong to `reserved_nodes`
num_full_in_peers: usize,
num_full_out_peers: usize,
num_light_peers: usize,
max_full_in_peers: usize,
max_full_out_peers: usize,
max_light_peers: usize,
// the limits ignore the nodes which belong to `reserved_nodes`
limits: ConnectionLimits,
genesis_hash: B::Hash,
}

Expand All @@ -49,122 +159,90 @@ where
B::Header: Header<Number = BlockNumber>,
{
/// Create a new handler.
pub fn new(genesis_hash: B::Hash, net_config: &FullNetworkConfiguration) -> Self {
pub fn new(genesis_hash: B::Hash, net_config: &NetworkConfiguration) -> Self {
let reserved_nodes = net_config
.network_config
.default_peers_set
.reserved_nodes
.iter()
.map(|reserved| reserved.peer_id)
.collect();

// It is assumed that `default_peers_set.out_peers` only refers to full nodes, but
// `default_peers_set.in_peers` refers to both full and light nodes.
// Moreover, `default_peers_set_num_full` refers to the total of full nodes.
let max_full_out_peers = net_config.network_config.default_peers_set.out_peers as usize;
let max_full_in_peers = (net_config.network_config.default_peers_set_num_full as usize)
.saturating_sub(max_full_out_peers);
let max_light_peers = (net_config.network_config.default_peers_set.in_peers as usize)
.saturating_sub(max_full_in_peers);
let limits = ConnectionLimits::new(net_config);

Handler {
reserved_nodes,
peers: HashMap::new(),
max_full_in_peers,
max_full_out_peers,
max_light_peers,
num_full_in_peers: 0,
num_full_out_peers: 0,
num_light_peers: 0,
limits,
genesis_hash,
}
}

fn is_reserved(&self, peer_id: &PeerId) -> bool {
self.reserved_nodes.contains(peer_id)
}

fn verify_connection(
&self,
peer_id: PeerId,
handshake: Vec<u8>,
is_inbound: bool,
) -> Result<Roles, ConnectError> {
direction: Direction,
) -> Result<PeerInfo, ConnectError> {
let handshake = BlockAnnouncesHandshake::<B>::decode_all(&mut &handshake[..])
.map_err(ConnectError::BadlyEncodedHandshake)?;
if handshake.genesis_hash != self.genesis_hash {
return Err(ConnectError::BadHandshakeGenesis);
return Err(ConnectError::BadHandshakeGenesis(handshake.genesis_hash));
}

if self.peers.contains_key(&peer_id) {
return Err(ConnectError::PeerAlreadyConnected);
return Err(ConnectError::AlreadyConnected(peer_id));
}

if self.reserved_nodes.contains(&peer_id) {
return Ok(handshake.roles);
}
let peer = PeerInfo::new(handshake.roles.into(), direction);

// Check slot constraints depending on the node's role and the connection's direction.
if is_inbound
&& handshake.roles.is_full()
&& self.num_full_in_peers >= self.max_full_in_peers
{
return Err(ConnectError::TooManyFullInboundPeers);
}
if !is_inbound
&& handshake.roles.is_full()
&& self.num_full_out_peers >= self.max_full_out_peers
{
return Err(ConnectError::TooManyFullOutboundPeers);
match self.is_reserved(&peer_id) || self.limits.allowed(&peer) {
true => Ok(peer),
false => Err(ConnectError::too_many_peers(peer)),
}
if handshake.roles.is_light() && self.num_light_peers >= self.max_light_peers {
return Err(ConnectError::TooManyLightPeers);
}

Ok(handshake.roles)
}

/// Accept or reject a peer.
pub fn on_peer_connect(
&mut self,
peer_id: PeerId,
handshake: Vec<u8>,
is_inbound: bool,
direction: Direction,
) -> Result<(), ConnectError> {
let role = self.verify_connection(peer_id, handshake, is_inbound)?;
let peer = self.verify_connection(peer_id, handshake, direction)?;

self.peers.insert(peer_id, PeerInfo { role, is_inbound });

if self.reserved_nodes.contains(&peer_id) {
return Ok(());
if !self.is_reserved(&peer_id) {
self.limits.add(&peer);
}

// Assign a slot for the node depending on their role and the connection's direction.
if is_inbound && role.is_full() {
self.num_full_in_peers += 1;
} else if !is_inbound && role.is_full() {
self.num_full_out_peers += 1;
} else if role.is_light() {
self.num_light_peers += 1;
}
self.peers.insert(peer_id, peer);

Ok(())
}

/// Clean up a disconnected peer.
pub fn on_peer_disconnect(&mut self, peer_id: PeerId) -> Result<(), DisconnectError> {
let info = self
let peer = self
.peers
.remove(&peer_id)
.ok_or(DisconnectError::PeerWasNotConnected)?;

if self.reserved_nodes.contains(&peer_id) {
return Ok(());
}

// Free the slot of the node depending on their role and the connection's direction.
if info.is_inbound && info.role.is_full() {
self.num_full_in_peers.saturating_dec();
} else if !info.is_inbound && info.role.is_full() {
self.num_full_out_peers.saturating_dec();
} else if info.role.is_light() {
self.num_light_peers.saturating_dec();
if !self.is_reserved(&peer_id) {
self.limits.remove(&peer)
}

Ok(())
}

/// Checks whether an inbound peer would be accepted.
pub fn verify_inbound_connection(
&mut self,
peer_id: PeerId,
handshake: Vec<u8>,
) -> Result<(), ConnectError> {
self.verify_connection(peer_id, handshake, Direction::Inbound)
.map(|_| ())
}
}
Loading

0 comments on commit 8201ca8

Please sign in to comment.