diff --git a/Cargo.lock b/Cargo.lock index ee4b849dfa..71e45290cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5383,7 +5383,7 @@ dependencies = [ [[package]] name = "network-clique" -version = "0.5.0" +version = "0.6.0" dependencies = [ "aleph-bft-mock", "aleph-bft-types 0.8.1", diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index 9debdc6c00..502e113b63 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -402,6 +402,8 @@ pub fn new_authority( .unwrap_or(usize::MAX), }; + let validator_address_cache = None; + let aleph_config = AlephConfig { network, sync_network, @@ -424,6 +426,7 @@ pub fn new_authority( protocol_naming, rate_limiter_config, sync_oracle, + validator_address_cache, }; task_manager.spawn_essential_handle().spawn_blocking( diff --git a/clique/Cargo.toml b/clique/Cargo.toml index c455afdc7a..95cc4b187b 100644 --- a/clique/Cargo.toml +++ b/clique/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "network-clique" -version = "0.5.0" +version = "0.6.0" license = "Apache 2.0" authors.workspace = true edition.workspace = true diff --git a/clique/src/lib.rs b/clique/src/lib.rs index 9e8940044d..8df5b538b9 100644 --- a/clique/src/lib.rs +++ b/clique/src/lib.rs @@ -58,6 +58,9 @@ pub trait AddressingInformation: Debug + Hash + Codec + Clone + Eq + Send + Sync /// Verify the information. fn verify(&self) -> bool; + + // Address, *only* for debugging purposes. + fn address(&self) -> String; } /// Abstraction for requesting own network addressing information. diff --git a/clique/src/mock.rs b/clique/src/mock.rs index b04a803981..b6e735a4e5 100644 --- a/clique/src/mock.rs +++ b/clique/src/mock.rs @@ -293,6 +293,10 @@ impl AddressingInformation for MockAddressingInformation { fn verify(&self) -> bool { self.valid } + + fn address(&self) -> String { + self.address.clone() + } } impl NetworkIdentity for MockAddressingInformation { diff --git a/finality-aleph/src/idx_to_account.rs b/finality-aleph/src/idx_to_account.rs new file mode 100644 index 0000000000..88ae13a1fd --- /dev/null +++ b/finality-aleph/src/idx_to_account.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use primitives::{AccountId, AlephSessionApi, AuraId, BlockHash, BlockNumber}; +use sc_client_api::Backend; +use sp_consensus_aura::AuraApi; +use sp_runtime::traits::{Block, Header}; + +use crate::{ + abft::NodeIndex, + session::{SessionBoundaryInfo, SessionId}, + session_map::{AuthorityProvider, AuthorityProviderImpl}, + ClientForAleph, +}; + +pub trait ValidatorIndexToAccountIdConverter { + fn account(&self, session: SessionId, validator_index: NodeIndex) -> Option; +} + +pub struct ValidatorIndexToAccountIdConverterImpl +where + C: ClientForAleph + Send + Sync + 'static, + C::Api: crate::aleph_primitives::AlephSessionApi + AuraApi, + B: Block, + BE: Backend + 'static, +{ + client: Arc, + session_boundary_info: SessionBoundaryInfo, + authority_provider: AuthorityProviderImpl, +} + +impl ValidatorIndexToAccountIdConverterImpl +where + C: ClientForAleph + Send + Sync + 'static, + C::Api: crate::aleph_primitives::AlephSessionApi + AuraApi, + B: Block, + B::Header: Header, + BE: Backend + 'static, +{ + pub fn new(client: Arc, session_boundary_info: SessionBoundaryInfo) -> Self { + Self { + client: client.clone(), + session_boundary_info, + authority_provider: AuthorityProviderImpl::new(client), + } + } +} + +impl ValidatorIndexToAccountIdConverter + for ValidatorIndexToAccountIdConverterImpl +where + C: ClientForAleph + Send + Sync + 'static, + C::Api: crate::aleph_primitives::AlephSessionApi + AuraApi, + B: Block, + B::Header: Header, + BE: Backend + 'static, +{ + fn account(&self, session: SessionId, validator_index: NodeIndex) -> Option { + let block_number = self + .session_boundary_info + .boundaries_for_session(session) + .first_block(); + let block_hash = self.client.block_hash(block_number).ok()??; + + let authority_data = self.authority_provider.authority_data(block_number)?; + let aleph_key = authority_data.authorities()[validator_index.0].clone(); + self.client + .runtime_api() + .key_owner(block_hash, aleph_key) + .ok()? + } +} + +#[cfg(test)] +pub struct MockConverter; + +#[cfg(test)] +impl ValidatorIndexToAccountIdConverter for MockConverter { + fn account(&self, _: SessionId, _: NodeIndex) -> Option { + None + } +} diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 7d51208c13..058d006413 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -44,6 +44,7 @@ mod compatibility; mod crypto; mod data_io; mod finalization; +mod idx_to_account; mod import; mod justification; mod metrics; @@ -61,7 +62,7 @@ pub use crate::{ import::{AlephBlockImport, RedirectingBlockImport, TracingBlockImport}, justification::AlephJustification, metrics::TimingBlockMetrics, - network::{Protocol, ProtocolNaming}, + network::{address_cache::ValidatorAddressCache, Protocol, ProtocolNaming}, nodes::run_validator_node, session::SessionPeriod, sync::{ @@ -288,4 +289,5 @@ pub struct AlephConfig { pub protocol_naming: ProtocolNaming, pub rate_limiter_config: RateLimiterConfig, pub sync_oracle: SyncOracle, + pub validator_address_cache: Option, } diff --git a/finality-aleph/src/network/address_cache.rs b/finality-aleph/src/network/address_cache.rs new file mode 100644 index 0000000000..469a08ea7b --- /dev/null +++ b/finality-aleph/src/network/address_cache.rs @@ -0,0 +1,107 @@ +use std::{fmt::Debug, num::NonZeroUsize, sync::Arc}; + +use lru::LruCache; +use parking_lot::Mutex; +use primitives::AccountId; + +use crate::{ + abft::NodeIndex, idx_to_account::ValidatorIndexToAccountIdConverter, session::SessionId, +}; + +/// Network details for a given validator in a given session. +#[derive(Debug, Clone)] +pub struct ValidatorAddressingInfo { + /// Session to which given information applies. + pub session: SessionId, + /// Network level address of the validator, i.e. IP address (for validator network) + pub network_level_address: String, + /// PeerId of the validator used in validator (clique) network + pub validator_network_peer_id: String, +} + +/// Stores most recent information about validator addresses. +#[derive(Debug, Clone)] +pub struct ValidatorAddressCache { + data: Arc>>, +} + +const VALIDATOR_ADDRESS_CACHE_SIZE: usize = 1000; + +impl ValidatorAddressCache { + pub fn new() -> Self { + Self { + data: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::try_from(VALIDATOR_ADDRESS_CACHE_SIZE) + .expect("the cache size is a non-zero constant"), + ))), + } + } + + pub fn insert(&self, validator_stash: AccountId, info: ValidatorAddressingInfo) { + self.data.lock().put(validator_stash, info); + } +} + +impl Default for ValidatorAddressCache { + fn default() -> Self { + Self::new() + } +} + +pub trait ValidatorAddressCacheUpdater { + /// In session `session_info.session`, validator `NodeIndex` was using addresses specified in + /// `session_info`. A session and validator_index identify the validator uniquely. + fn update(&self, validator_index: NodeIndex, session_info: ValidatorAddressingInfo); +} + +enum ValidatorAddressCacheUpdaterImpl { + Noop, + BackendBased { + validator_address_cache: ValidatorAddressCache, + key_owner_info_provider: C, + }, +} + +/// Construct a struct that can be used to update `validator_address_cache`, if it is `Some`. +/// If passed None, the returned struct will be a no-op. +pub fn validator_address_cache_updater( + validator_address_cache: Option, + key_owner_info_provider: C, +) -> impl ValidatorAddressCacheUpdater { + match validator_address_cache { + Some(validator_address_cache) => ValidatorAddressCacheUpdaterImpl::BackendBased { + validator_address_cache, + key_owner_info_provider, + }, + None => ValidatorAddressCacheUpdaterImpl::Noop, + } +} + +impl ValidatorAddressCacheUpdater + for ValidatorAddressCacheUpdaterImpl +{ + fn update(&self, validator_index: NodeIndex, info: ValidatorAddressingInfo) { + if let ValidatorAddressCacheUpdaterImpl::BackendBased { + validator_address_cache, + key_owner_info_provider, + } = self + { + if let Some(validator) = key_owner_info_provider.account(info.session, validator_index) + { + validator_address_cache.insert(validator, info) + } + } + } +} + +#[cfg(test)] +pub mod test { + use crate::{ + idx_to_account::MockConverter, + network::address_cache::{ValidatorAddressCacheUpdater, ValidatorAddressCacheUpdaterImpl}, + }; + + pub fn noop_updater() -> impl ValidatorAddressCacheUpdater { + ValidatorAddressCacheUpdaterImpl::::Noop + } +} diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 531502c8c0..a01893bde8 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -1,5 +1,6 @@ use parity_scale_codec::Codec; +pub mod address_cache; pub mod data; mod gossip; #[cfg(test)] diff --git a/finality-aleph/src/network/session/manager.rs b/finality-aleph/src/network/session/manager.rs index 6aabd0b7da..32d8d25dea 100644 --- a/finality-aleph/src/network/session/manager.rs +++ b/finality-aleph/src/network/session/manager.rs @@ -11,6 +11,7 @@ use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, network::{ + address_cache::{ValidatorAddressCacheUpdater, ValidatorAddressingInfo}, session::{ data::DataInSession, Authentication, Connections, Discovery, DiscoveryMessage, SessionHandler, SessionHandlerError, @@ -78,10 +79,11 @@ impl ManagerActions { /// 1. In-session messages are forwarded to the user. /// 2. Authentication messages forwarded to session handlers. /// 4. Running periodic maintenance, mostly related to node discovery. -pub struct Manager { +pub struct Manager { network_identity: NI, connections: Connections, sessions: HashMap>, + validator_address_cache_updater: VCU, discovery_cooldown: Duration, } @@ -92,13 +94,18 @@ pub enum SendError { NoSession, } -impl Manager { +impl Manager { /// Create a new connection manager. - pub fn new(network_identity: NI, discovery_cooldown: Duration) -> Self { + pub fn new( + network_identity: NI, + validator_address_cache_updater: VCU, + discovery_cooldown: Duration, + ) -> Self { Manager { network_identity, connections: Connections::new(), sessions: HashMap::new(), + validator_address_cache_updater, discovery_cooldown, } } @@ -205,6 +212,15 @@ impl Manager { node_id, pen, } = pre_session; + self.validator_address_cache_updater.update( + node_id, + ValidatorAddressingInfo { + session: session_id, + network_level_address: address.address(), + validator_network_peer_id: address.peer_id().to_string(), + }, + ); + let peers_to_stay = session .handler .update(Some((node_id, pen)), verifier, address)? @@ -308,20 +324,29 @@ impl Manager { message: DiscoveryMessage, ) -> ManagerActions { let session_id = message.session_id(); + let creator = message.0.creator(); match self.sessions.get_mut(&session_id) { Some(Session { handler, discovery, .. }) => { let (maybe_address, maybe_message) = discovery.handle_authentication(message, handler); - let maybe_command = match (maybe_address, handler.is_validator()) { - (Some(address), true) => { + let mut maybe_command = None; + if let Some(address) = maybe_address { + self.validator_address_cache_updater.update( + creator, + ValidatorAddressingInfo { + session: session_id, + network_level_address: address.address(), + validator_network_peer_id: address.peer_id().to_string(), + }, + ); + if handler.is_validator() { debug!(target: "aleph-network", "Adding addresses for session {:?} to reserved: {:?}", session_id, address); self.connections.add_peers(session_id, [address.peer_id()]); - Some(ConnectionCommand::AddReserved([address].into())) + maybe_command = Some(ConnectionCommand::AddReserved([address].into())); } - _ => None, - }; + } ManagerActions { maybe_command, maybe_message, @@ -439,15 +464,19 @@ mod tests { SendError, }; use crate::{ - network::{mock::crypto_basics, session::data::DataInSession}, + network::{ + address_cache::{test::noop_updater, ValidatorAddressCacheUpdater}, + mock::crypto_basics, + session::data::DataInSession, + }, Recipient, SessionId, }; const NUM_NODES: usize = 7; const DISCOVERY_PERIOD: Duration = Duration::from_secs(60); - fn build() -> Manager { - Manager::new(random_address(), DISCOVERY_PERIOD) + fn build() -> Manager { + Manager::new(random_address(), noop_updater(), DISCOVERY_PERIOD) } #[test] diff --git a/finality-aleph/src/network/session/service.rs b/finality-aleph/src/network/session/service.rs index cd5bc1a5ea..f44598955d 100644 --- a/finality-aleph/src/network/session/service.rs +++ b/finality-aleph/src/network/session/service.rs @@ -16,6 +16,7 @@ use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, network::{ + address_cache::ValidatorAddressCacheUpdater, session::{ data::DataInSession, manager::{ @@ -176,10 +177,11 @@ pub struct Service< NI: NetworkIdentity, CN: CliqueNetwork>, GN: GossipNetwork>, + VCU: ValidatorAddressCacheUpdater, > where NI::PeerId: PublicKey, { - manager: Manager, + manager: Manager, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, validator_network: CN, @@ -214,7 +216,8 @@ impl< NI: NetworkIdentity, CN: CliqueNetwork>, GN: GossipNetwork>, - > Service + VCU: ValidatorAddressCacheUpdater, + > Service where NI::PeerId: PublicKey, { @@ -222,9 +225,10 @@ where network_identity: NI, validator_network: CN, gossip_network: GN, + validator_address_cache_updater: VCU, config: Config, ) -> ( - Service, + Service, impl SessionManager, ) { let Config { @@ -232,7 +236,11 @@ where maintenance_period, initial_delay, } = config; - let manager = Manager::new(network_identity, discovery_cooldown); + let manager = Manager::new( + network_identity, + validator_address_cache_updater, + discovery_cooldown, + ); let (commands_for_service, commands_from_user) = mpsc::unbounded(); let (messages_for_service, messages_from_user) = mpsc::unbounded(); ( diff --git a/finality-aleph/src/network/tcp.rs b/finality-aleph/src/network/tcp.rs index b863cc11c1..8d480bb133 100644 --- a/finality-aleph/src/network/tcp.rs +++ b/finality-aleph/src/network/tcp.rs @@ -106,6 +106,10 @@ impl AddressingInformation for SignedTcpAddressingInformation { self.peer_id() .verify(&self.addressing_information.encode(), &self.signature) } + + fn address(&self) -> String { + self.addressing_information.primary_address.clone() + } } impl NetworkIdentity for SignedTcpAddressingInformation { diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 3b71a88017..3957edc27a 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -14,7 +14,9 @@ use crate::{ aleph_primitives::{AlephSessionApi, AuraId, Block}, crypto::AuthorityPen, finalization::AlephFinalizer, + idx_to_account::ValidatorIndexToAccountIdConverterImpl, network::{ + address_cache::validator_address_cache_updater, session::{ConnectionManager, ConnectionManagerConfig}, tcp::{new_tcp_network, KEY_TYPE}, GossipService, SubstrateNetwork, @@ -75,6 +77,7 @@ where protocol_naming, rate_limiter_config, sync_oracle, + validator_address_cache, } = aleph_config; // We generate the phrase manually to only save the key in RAM, we don't want to have these @@ -167,10 +170,16 @@ where }; let sync_task = async move { sync_service.run().await }; + let validator_address_cache_updater = validator_address_cache_updater( + validator_address_cache, + ValidatorIndexToAccountIdConverterImpl::new(client.clone(), session_info.clone()), + ); + let (connection_manager_service, connection_manager) = ConnectionManager::new( network_identity, validator_network, authentication_network, + validator_address_cache_updater, ConnectionManagerConfig::with_session_period(&session_period, &millisecs_per_block), ); diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 168986a2a3..263b2d06e7 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -19,6 +19,7 @@ use tokio::{runtime::Handle, task::JoinHandle, time::timeout}; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ + address_cache::test::noop_updater, data::Network, mock::{crypto_basics, MockData}, session::{ @@ -110,6 +111,7 @@ async fn prepare_one_session_test_data() -> TestData { authorities[0].address(), validator_network.clone(), gossip_network, + noop_updater(), ConnectionManagerConfig::with_session_period(&SESSION_PERIOD, &MILLISECS_PER_BLOCK), ); let session_manager = Box::new(session_manager);