Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-3364: cache network details of validators #1449

2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ pub fn new_authority(
.unwrap_or(usize::MAX),
};

let validator_address_cache = None;
timorleph marked this conversation as resolved.
Show resolved Hide resolved

let aleph_config = AlephConfig {
network,
sync_network,
Expand All @@ -424,6 +426,7 @@ pub fn new_authority(
protocol_naming,
rate_limiter_config,
sync_oracle,
validator_address_cache,
};

task_manager.spawn_essential_handle().spawn_blocking(
Expand Down
2 changes: 1 addition & 1 deletion clique/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "network-clique"
version = "0.5.0"
version = "0.6.0"
license = "Apache 2.0"
authors.workspace = true
edition.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions clique/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub trait AddressingInformation: Debug + Hash + Codec + Clone + Eq + Send + Sync

/// Verify the information.
fn verify(&self) -> bool;

// Address, *only* for debugging purposes.
fn address(&self) -> String;
}

/// Abstraction for requesting own network addressing information.
Expand Down
4 changes: 4 additions & 0 deletions clique/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ impl AddressingInformation for MockAddressingInformation {
fn verify(&self) -> bool {
self.valid
}

fn address(&self) -> String {
self.address.clone()
}
}

impl NetworkIdentity for MockAddressingInformation {
Expand Down
83 changes: 83 additions & 0 deletions finality-aleph/src/idx_to_account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::{marker::PhantomData, sync::Arc};

use primitives::{AccountId, AlephSessionApi, AuraId, BlockHash, BlockNumber};
use sc_client_api::Backend;
use sp_consensus_aura::AuraApi;
use sp_runtime::traits::{Block, Header};

use crate::{
abft::NodeIndex,
session::{SessionBoundaryInfo, SessionId},
session_map::{AuthorityProvider, AuthorityProviderImpl},
ClientForAleph,
};

pub trait ValidatorIndexToAccountIdConverter {
fn account(&self, session: SessionId, validator_index: NodeIndex) -> Option<AccountId>;
}

pub struct ValidatorIndexToAccountIdConverterImpl<C, B, BE>
where
C: ClientForAleph<B, BE> + Send + Sync + 'static,
C::Api: crate::aleph_primitives::AlephSessionApi<B> + AuraApi<B, AuraId>,
B: Block<Hash = BlockHash>,
BE: Backend<B> + 'static,
{
client: Arc<C>,
session_boundary_info: SessionBoundaryInfo,
authority_provider: AuthorityProviderImpl<C, B, BE>,
_phantom: PhantomData<(B, BE)>,
ggawryal marked this conversation as resolved.
Show resolved Hide resolved
}

impl<C, B, BE> ValidatorIndexToAccountIdConverterImpl<C, B, BE>
where
C: ClientForAleph<B, BE> + Send + Sync + 'static,
C::Api: crate::aleph_primitives::AlephSessionApi<B> + AuraApi<B, AuraId>,
B: Block<Hash = BlockHash>,
B::Header: Header<Number = BlockNumber>,
BE: Backend<B> + 'static,
{
pub fn new(client: Arc<C>, session_boundary_info: SessionBoundaryInfo) -> Self {
Self {
client: client.clone(),
session_boundary_info,
authority_provider: AuthorityProviderImpl::new(client),
_phantom: PhantomData,
}
}
}

impl<C, B, BE> ValidatorIndexToAccountIdConverter
for ValidatorIndexToAccountIdConverterImpl<C, B, BE>
where
C: ClientForAleph<B, BE> + Send + Sync + 'static,
C::Api: crate::aleph_primitives::AlephSessionApi<B> + AuraApi<B, AuraId>,
B: Block<Hash = BlockHash>,
B::Header: Header<Number = BlockNumber>,
BE: Backend<B> + 'static,
{
fn account(&self, session: SessionId, validator_index: NodeIndex) -> Option<AccountId> {
let block_number = self
.session_boundary_info
.boundaries_for_session(session)
.first_block();
let block_hash = self.client.block_hash(block_number).ok()??;

let authority_data = self.authority_provider.authority_data(block_number)?;
let aleph_key = authority_data.authorities()[validator_index.0].clone();
self.client
.runtime_api()
.key_owner(block_hash, aleph_key)
.ok()?
}
}

#[cfg(test)]
pub struct MockConverter;

#[cfg(test)]
impl ValidatorIndexToAccountIdConverter for MockConverter {
fn account(&self, _: SessionId, _: NodeIndex) -> Option<AccountId> {
None
}
}
4 changes: 3 additions & 1 deletion finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod compatibility;
mod crypto;
mod data_io;
mod finalization;
mod idx_to_account;
mod import;
mod justification;
mod metrics;
Expand All @@ -61,7 +62,7 @@ pub use crate::{
import::{AlephBlockImport, RedirectingBlockImport, TracingBlockImport},
justification::AlephJustification,
metrics::TimingBlockMetrics,
network::{Protocol, ProtocolNaming},
network::{address_cache::ValidatorAddressCache, Protocol, ProtocolNaming},
nodes::run_validator_node,
session::SessionPeriod,
sync::{
Expand Down Expand Up @@ -288,4 +289,5 @@ pub struct AlephConfig<C, SC> {
pub protocol_naming: ProtocolNaming,
pub rate_limiter_config: RateLimiterConfig,
pub sync_oracle: SyncOracle,
pub validator_address_cache: Option<ValidatorAddressCache>,
}
107 changes: 107 additions & 0 deletions finality-aleph/src/network/address_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::{fmt::Debug, num::NonZeroUsize, sync::Arc};

use lru::LruCache;
use parking_lot::Mutex;
use primitives::AccountId;

use crate::{
abft::NodeIndex, idx_to_account::ValidatorIndexToAccountIdConverter, session::SessionId,
};

/// Network details for a given validator in a given session.
#[derive(Debug, Clone)]
pub struct ValidatorAddressingInfo {
/// Session to which given information applies.
pub session: SessionId,
/// Network level address of the validator, i.e. IP address (for validator network)
pub network_level_address: String,
/// PeerId of the validator used in validator (clique) network
pub validator_network_peer_id: String,
kostekIV marked this conversation as resolved.
Show resolved Hide resolved
}

/// Stores most recent information about validator addresses.
#[derive(Debug, Clone)]
pub struct ValidatorAddressCache {
data: Arc<Mutex<LruCache<AccountId, ValidatorAddressingInfo>>>,
}

const VALIDATOR_ADDRESS_CACHE_SIZE: usize = 1000;

impl ValidatorAddressCache {
pub fn new() -> Self {
Self {
data: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::try_from(VALIDATOR_ADDRESS_CACHE_SIZE)
.expect("the cache size is a non-zero constant"),
))),
}
}

pub fn insert(&self, validator_stash: AccountId, info: ValidatorAddressingInfo) {
self.data.lock().put(validator_stash, info);
}
}

impl Default for ValidatorAddressCache {
fn default() -> Self {
Self::new()
}
}

pub trait ValidatorAddressCacheUpdater {
timorleph marked this conversation as resolved.
Show resolved Hide resolved
/// In session `session_info.session`, validator `NodeIndex` was using addresses specified in
/// `session_info`. A session and validator_index identify the validator uniquely.
fn update(&self, validator_index: NodeIndex, session_info: ValidatorAddressingInfo);
}

enum ValidatorAddressCacheUpdaterImpl<C: ValidatorIndexToAccountIdConverter> {
Noop,
BackendBased {
validator_address_cache: ValidatorAddressCache,
key_owner_info_provider: C,
},
}

/// Construct a struct that can be used to update `validator_address_cache`, if it is `Some`.
/// If passed None, the returned struct will be a no-op.
pub fn validator_address_cache_updater<C: ValidatorIndexToAccountIdConverter>(
validator_address_cache: Option<ValidatorAddressCache>,
key_owner_info_provider: C,
) -> impl ValidatorAddressCacheUpdater {
match validator_address_cache {
Some(validator_address_cache) => ValidatorAddressCacheUpdaterImpl::BackendBased {
validator_address_cache,
key_owner_info_provider,
},
None => ValidatorAddressCacheUpdaterImpl::Noop,
}
}

impl<C: ValidatorIndexToAccountIdConverter> ValidatorAddressCacheUpdater
for ValidatorAddressCacheUpdaterImpl<C>
{
fn update(&self, validator_index: NodeIndex, info: ValidatorAddressingInfo) {
if let ValidatorAddressCacheUpdaterImpl::BackendBased {
validator_address_cache,
key_owner_info_provider,
} = self
{
if let Some(validator) = key_owner_info_provider.account(info.session, validator_index)
{
validator_address_cache.insert(validator, info)
}
}
}
}

#[cfg(test)]
pub mod test {
use crate::{
idx_to_account::MockConverter,
network::address_cache::{validator_address_cache_updater, ValidatorAddressCacheUpdater},
};

pub fn noop_updater() -> impl ValidatorAddressCacheUpdater {
validator_address_cache_updater(None, MockConverter)
timorleph marked this conversation as resolved.
Show resolved Hide resolved
}
}
1 change: 1 addition & 0 deletions finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parity_scale_codec::Codec;

pub mod address_cache;
pub mod data;
mod gossip;
#[cfg(test)]
Expand Down
51 changes: 40 additions & 11 deletions finality-aleph/src/network/session/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
abft::Recipient,
crypto::{AuthorityPen, AuthorityVerifier},
network::{
address_cache::{ValidatorAddressCacheUpdater, ValidatorAddressingInfo},
session::{
data::DataInSession, Authentication, Connections, Discovery, DiscoveryMessage,
SessionHandler, SessionHandlerError,
Expand Down Expand Up @@ -78,10 +79,11 @@ impl<A: AddressingInformation> ManagerActions<A> {
/// 1. In-session messages are forwarded to the user.
/// 2. Authentication messages forwarded to session handlers.
/// 4. Running periodic maintenance, mostly related to node discovery.
pub struct Manager<NI: NetworkIdentity, D: Data> {
pub struct Manager<NI: NetworkIdentity, D: Data, VCU: ValidatorAddressCacheUpdater> {
network_identity: NI,
connections: Connections<NI::PeerId>,
sessions: HashMap<SessionId, Session<D, NI::AddressingInformation>>,
validator_address_cache_updater: VCU,
discovery_cooldown: Duration,
}

Expand All @@ -92,13 +94,18 @@ pub enum SendError {
NoSession,
}

impl<NI: NetworkIdentity, D: Data> Manager<NI, D> {
impl<NI: NetworkIdentity, D: Data, VCU: ValidatorAddressCacheUpdater> Manager<NI, D, VCU> {
/// Create a new connection manager.
pub fn new(network_identity: NI, discovery_cooldown: Duration) -> Self {
pub fn new(
network_identity: NI,
validator_address_cache_updater: VCU,
discovery_cooldown: Duration,
) -> Self {
Manager {
network_identity,
connections: Connections::new(),
sessions: HashMap::new(),
validator_address_cache_updater,
discovery_cooldown,
}
}
Expand Down Expand Up @@ -205,6 +212,15 @@ impl<NI: NetworkIdentity, D: Data> Manager<NI, D> {
node_id,
pen,
} = pre_session;
self.validator_address_cache_updater.update(
node_id,
ValidatorAddressingInfo {
session: session_id,
network_level_address: address.address(),
validator_network_peer_id: address.peer_id().to_string(),
},
);

let peers_to_stay = session
.handler
.update(Some((node_id, pen)), verifier, address)?
Expand Down Expand Up @@ -308,20 +324,29 @@ impl<NI: NetworkIdentity, D: Data> Manager<NI, D> {
message: DiscoveryMessage<NI::AddressingInformation>,
) -> ManagerActions<NI::AddressingInformation> {
let session_id = message.session_id();
let creator = message.0.creator();
match self.sessions.get_mut(&session_id) {
Some(Session {
handler, discovery, ..
}) => {
let (maybe_address, maybe_message) =
discovery.handle_authentication(message, handler);
let maybe_command = match (maybe_address, handler.is_validator()) {
(Some(address), true) => {
let mut maybe_command = None;
if let Some(address) = maybe_address {
kostekIV marked this conversation as resolved.
Show resolved Hide resolved
self.validator_address_cache_updater.update(
creator,
ValidatorAddressingInfo {
session: session_id,
network_level_address: address.address(),
validator_network_peer_id: address.peer_id().to_string(),
},
);
if handler.is_validator() {
debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, address);
self.connections.add_peers(session_id, [address.peer_id()]);
Some(ConnectionCommand::AddReserved([address].into()))
maybe_command = Some(ConnectionCommand::AddReserved([address].into()));
}
_ => None,
};
}
ManagerActions {
maybe_command,
maybe_message,
Expand Down Expand Up @@ -439,15 +464,19 @@ mod tests {
SendError,
};
use crate::{
network::{mock::crypto_basics, session::data::DataInSession},
network::{
address_cache::{test::noop_updater, ValidatorAddressCacheUpdater},
mock::crypto_basics,
session::data::DataInSession,
},
Recipient, SessionId,
};

const NUM_NODES: usize = 7;
const DISCOVERY_PERIOD: Duration = Duration::from_secs(60);

fn build() -> Manager<MockAddressingInformation, i32> {
Manager::new(random_address(), DISCOVERY_PERIOD)
fn build() -> Manager<MockAddressingInformation, i32, impl ValidatorAddressCacheUpdater> {
Manager::new(random_address(), noop_updater(), DISCOVERY_PERIOD)
}

#[test]
Expand Down
Loading
Loading