From 7fd599c75563f963dd8b7a81220ca32b9f7f174b Mon Sep 17 00:00:00 2001 From: timorl Date: Tue, 29 Aug 2023 09:10:19 +0200 Subject: [PATCH] A0-3060: Separate network metrics (#1371) # Description Move the network metrics out of the global metrics, the last ones that are there and shouldn't be. ## Type of change # Checklist: Co-authored-by: timorl --- finality-aleph/src/metrics.rs | 51 +------- finality-aleph/src/network/gossip/metrics.rs | 71 ++++++++++ finality-aleph/src/network/gossip/mod.rs | 1 + finality-aleph/src/network/gossip/service.rs | 131 +++++++++++++------ finality-aleph/src/nodes.rs | 2 +- finality-aleph/src/testing/network.rs | 14 +- 6 files changed, 168 insertions(+), 102 deletions(-) create mode 100644 finality-aleph/src/network/gossip/metrics.rs diff --git a/finality-aleph/src/metrics.rs b/finality-aleph/src/metrics.rs index 0d5758f45e..3aa811abfc 100644 --- a/finality-aleph/src/metrics.rs +++ b/finality-aleph/src/metrics.rs @@ -10,12 +10,7 @@ use log::{trace, warn}; use lru::LruCache; use parking_lot::Mutex; use sc_service::Arc; -use substrate_prometheus_endpoint::{ - exponential_buckets, prometheus::HistogramTimer, register, Gauge, Histogram, HistogramOpts, - Opts, PrometheusError, Registry, U64, -}; - -use crate::Protocol; +use substrate_prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64}; // How many entries (block hash + timestamp) we keep in memory per one checkpoint type. // Each entry takes 32B (Hash) + 16B (Instant), so a limit of 5000 gives ~234kB (per checkpoint). @@ -32,7 +27,6 @@ struct Inner { prev: HashMap, gauges: HashMap>, starts: HashMap>, - network_send_times: HashMap, } impl Inner { @@ -60,29 +54,6 @@ impl Inner { ); } - use Protocol::*; - let mut network_send_times = HashMap::new(); - for key in [Authentication, BlockSync] { - network_send_times.insert( - key, - register( - Histogram::with_opts(HistogramOpts { - common_opts: Opts { - namespace: "gossip_network".to_string(), - subsystem: protocol_name(key), - name: "send_duration".to_string(), - help: "How long did it take for substrate to send a message." - .to_string(), - const_labels: Default::default(), - variable_labels: Default::default(), - }, - buckets: exponential_buckets(0.001, 1.26, 30)?, - })?, - registry, - )?, - ); - } - Ok(Self { prev, gauges, @@ -95,7 +66,6 @@ impl Inner { ) }) .collect(), - network_send_times, }) } @@ -141,19 +111,6 @@ impl Inner { } } } - - fn start_sending_in(&self, protocol: Protocol) -> HistogramTimer { - self.network_send_times[&protocol].start_timer() - } -} - -fn protocol_name(protocol: Protocol) -> String { - use Protocol::*; - match protocol { - Authentication => "authentication", - BlockSync => "block_sync", - } - .to_string() } #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] @@ -189,12 +146,6 @@ impl Metrics { .report_block(hash, checkpoint_time, checkpoint_type); } } - - pub fn start_sending_in(&self, protocol: Protocol) -> Option { - self.inner - .as_ref() - .map(|inner| inner.lock().start_sending_in(protocol)) - } } #[cfg(test)] diff --git a/finality-aleph/src/network/gossip/metrics.rs b/finality-aleph/src/network/gossip/metrics.rs new file mode 100644 index 0000000000..a3cdf95730 --- /dev/null +++ b/finality-aleph/src/network/gossip/metrics.rs @@ -0,0 +1,71 @@ +use std::collections::HashMap; + +use substrate_prometheus_endpoint::{ + exponential_buckets, prometheus::HistogramTimer, register, Histogram, HistogramOpts, Opts, + PrometheusError, Registry, +}; + +use crate::Protocol; + +fn protocol_name(protocol: Protocol) -> String { + use Protocol::*; + match protocol { + Authentication => "authentication", + BlockSync => "block_sync", + } + .to_string() +} + +#[derive(Clone)] +pub enum Metrics { + Prometheus { + send_times: HashMap, + }, + Noop, +} + +impl Metrics { + pub fn new(registry: Option) -> Result { + use Protocol::*; + let registry = match registry { + Some(registry) => registry, + None => return Ok(Metrics::Noop), + }; + + let mut send_times = HashMap::new(); + for protocol in [Authentication, BlockSync] { + send_times.insert( + protocol, + register( + Histogram::with_opts(HistogramOpts { + common_opts: Opts { + namespace: "gossip_network".to_string(), + subsystem: protocol_name(protocol), + name: "send_duration".to_string(), + help: "How long did it take for substrate to send a message." + .to_string(), + const_labels: Default::default(), + variable_labels: Default::default(), + }, + buckets: exponential_buckets(0.001, 1.26, 30)?, + })?, + ®istry, + )?, + ); + } + Ok(Metrics::Prometheus { send_times }) + } + + pub fn noop() -> Self { + Metrics::Noop + } + + pub fn start_sending_in(&self, protocol: Protocol) -> Option { + match self { + Metrics::Prometheus { send_times } => send_times + .get(&protocol) + .map(|histogram| histogram.start_timer()), + Metrics::Noop => None, + } + } +} diff --git a/finality-aleph/src/network/gossip/mod.rs b/finality-aleph/src/network/gossip/mod.rs index 02ca07e67c..55afc8fe47 100644 --- a/finality-aleph/src/network/gossip/mod.rs +++ b/finality-aleph/src/network/gossip/mod.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use crate::network::Data; +mod metrics; #[cfg(test)] pub mod mock; mod service; diff --git a/finality-aleph/src/network/gossip/service.rs b/finality-aleph/src/network/gossip/service.rs index 28e0e43a25..5a1415be21 100644 --- a/finality-aleph/src/network/gossip/service.rs +++ b/finality-aleph/src/network/gossip/service.rs @@ -9,19 +9,23 @@ use futures::{channel::mpsc, StreamExt}; use log::{debug, error, info, trace, warn}; use network_clique::SpawnHandleT; use rand::{seq::IteratorRandom, thread_rng}; +use substrate_prometheus_endpoint::Registry; use tokio::time; const MAX_QUEUE_SIZE: usize = 1_000; use crate::{ - metrics::Key, network::{ - gossip::{Event, EventStream, Network, NetworkSender, Protocol, RawNetwork}, + gossip::{ + metrics::Metrics, Event, EventStream, Network, NetworkSender, Protocol, RawNetwork, + }, Data, }, - Metrics, SpawnHandle, STATUS_REPORT_INTERVAL, + SpawnHandle, STATUS_REPORT_INTERVAL, }; +const LOG_TARGET: &str = "aleph-network"; + enum Command { Send(D, P), SendToRandom(D, HashSet

), @@ -34,7 +38,7 @@ enum Command { /// 1. Messages are forwarded to the user. /// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes. /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. -pub struct Service { +pub struct Service { network: N, messages_from_authentication_user: mpsc::UnboundedReceiver>, messages_from_block_sync_user: mpsc::UnboundedReceiver>, @@ -45,7 +49,7 @@ pub struct Service { block_sync_connected_peers: HashSet, block_sync_peer_senders: HashMap>, spawn_handle: SpawnHandle, - metrics: Metrics, + metrics: Metrics, } struct ServiceInterface { @@ -111,11 +115,11 @@ enum SendError { SendingFailed, } -impl Service { +impl Service { pub fn new( network: N, spawn_handle: SpawnHandle, - metrics: Metrics, + metrics_registry: Option, ) -> ( Self, impl Network, @@ -127,6 +131,13 @@ impl Service { let (messages_for_authentication_service, messages_from_authentication_user) = mpsc::unbounded(); let (messages_for_block_sync_service, messages_from_block_sync_user) = mpsc::unbounded(); + let metrics = match Metrics::new(metrics_registry) { + Ok(metrics) => metrics, + Err(e) => { + warn!(target: LOG_TARGET, "Failed to create metrics: {}.", e); + Metrics::noop() + } + }; ( Service { network, @@ -178,21 +189,30 @@ impl Service { match network.sender(peer_id.clone(), protocol) { Ok(s) => sender.insert(s), Err(e) => { - debug!(target: "aleph-network", "Failed creating sender. Dropping message: {}", e); + debug!( + target: LOG_TARGET, + "Failed creating sender. Dropping message: {}", e + ); continue; } } }; let maybe_timer = metrics.start_sending_in(protocol); if let Err(e) = s.send(data.encode()).await { - debug!(target: "aleph-network", "Failed sending data to peer. Dropping sender and message: {}", e); + debug!( + target: LOG_TARGET, + "Failed sending data to peer. Dropping sender and message: {}", e + ); sender = None; } if let Some(timer) = maybe_timer { timer.observe_duration(); } } else { - debug!(target: "aleph-network", "Sender was dropped for peer {:?}. Peer sender exiting.", peer_id); + debug!( + target: LOG_TARGET, + "Sender was dropped for peer {:?}. Peer sender exiting.", peer_id + ); return; } } @@ -206,7 +226,7 @@ impl Service { Err(e) => { if e.is_full() { warn!( - target: "aleph-network", + target: LOG_TARGET, "Failed sending data through authentication notification channel to peer because peer_sender receiver is full: {:?}", peer ); @@ -214,7 +234,7 @@ impl Service { // Receiver can also be dropped when thread cannot send to peer. In case receiver is dropped this entry will be removed by Event::NotificationStreamClosed // No need to remove the entry here if e.is_disconnected() { - trace!(target: "aleph-network", "Failed sending data to peer because peer_sender receiver is dropped: {:?}", peer); + trace!(target: LOG_TARGET, "Failed sending data to peer because peer_sender receiver is dropped: {:?}", peer); } Err(SendError::SendingFailed) } @@ -234,13 +254,13 @@ impl Service { // No need to remove the entry here if e.is_full() { warn!( - target: "aleph-network", + target: LOG_TARGET, "Failed sending data in block sync notification channel to peer because peer_sender receiver is full: {:?}", peer ); } if e.is_disconnected() { - trace!(target: "aleph-network", "Failed sending data to peer because peer_sender receiver is dropped: {:?}", peer); + trace!(target: LOG_TARGET, "Failed sending data to peer because peer_sender receiver is dropped: {:?}", peer); } Err(SendError::SendingFailed) } @@ -253,13 +273,23 @@ impl Service { fn send_authentication_data(&mut self, data: AD, peer_id: N::PeerId) { if let Err(e) = self.send_to_authentication_peer(data, peer_id.clone()) { - trace!(target: "aleph-network", "Failed to send to peer{:?}, {:?}", peer_id, e); + trace!( + target: LOG_TARGET, + "Failed to send to peer{:?}, {:?}", + peer_id, + e + ); } } fn send_block_sync_data(&mut self, data: BSD, peer_id: N::PeerId) { if let Err(e) = self.send_to_block_sync_peer(data, peer_id.clone()) { - trace!(target: "aleph-network", "Failed to send to peer{:?}, {:?}", peer_id, e); + trace!( + target: LOG_TARGET, + "Failed to send to peer{:?}, {:?}", + peer_id, + e + ); } } @@ -289,7 +319,10 @@ impl Service { let peer_id = match self.random_peer(&peer_ids, Protocol::Authentication) { Some(peer_id) => peer_id.clone(), None => { - trace!(target: "aleph-network", "Failed to send authentication message to random peer, no peers are available."); + trace!( + target: LOG_TARGET, + "Failed to send authentication message to random peer, no peers are available." + ); return; } }; @@ -300,7 +333,10 @@ impl Service { let peer_id = match self.random_peer(&peer_ids, Protocol::BlockSync) { Some(peer_id) => peer_id.clone(), None => { - trace!(target: "aleph-network", "Failed to send block sync message to random peer, no peers are available."); + trace!( + target: LOG_TARGET, + "Failed to send block sync message to random peer, no peers are available." + ); return; } }; @@ -325,7 +361,12 @@ impl Service { use Event::*; match event { StreamOpened(peer, protocol) => { - trace!(target: "aleph-network", "StreamOpened event for peer {:?} and the protocol {:?}.", peer, protocol); + trace!( + target: LOG_TARGET, + "StreamOpened event for peer {:?} and the protocol {:?}.", + peer, + protocol + ); match protocol { Protocol::Authentication => { let (tx, rx) = mpsc::channel(MAX_QUEUE_SIZE); @@ -348,7 +389,12 @@ impl Service { }; } StreamClosed(peer, protocol) => { - trace!(target: "aleph-network", "StreamClosed event for peer {:?} and protocol {:?}", peer, protocol); + trace!( + target: LOG_TARGET, + "StreamClosed event for peer {:?} and protocol {:?}", + peer, + protocol + ); match protocol { Protocol::Authentication => { self.authentication_connected_peers.remove(&peer); @@ -369,7 +415,10 @@ impl Service { .unbounded_send((data, peer_id.clone())) .map_err(|_| ())?, Err(e) => { - warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) + warn!( + target: LOG_TARGET, + "Error decoding authentication protocol message: {}", e + ) } }, Protocol::BlockSync => match BSD::decode(&mut &data[..]) { @@ -378,7 +427,10 @@ impl Service { .unbounded_send((data, peer_id.clone())) .map_err(|_| ())?, Err(e) => { - warn!(target: "aleph-network", "Error decoding block sync protocol message: {}", e) + warn!( + target: LOG_TARGET, + "Error decoding block sync protocol message: {}", e + ) } }, }; @@ -400,7 +452,7 @@ impl Service { self.block_sync_connected_peers.len() )); - info!(target: "aleph-network", "{}", status); + info!(target: LOG_TARGET, "{}", status); } pub async fn run(mut self) { @@ -411,11 +463,11 @@ impl Service { tokio::select! { maybe_event = events_from_network.next_event() => match maybe_event { Some(event) => if self.handle_network_event(event).is_err() { - error!(target: "aleph-network", "Cannot forward messages to user."); + error!(target: LOG_TARGET, "Cannot forward messages to user."); return; }, None => { - error!(target: "aleph-network", "Network event stream ended."); + error!(target: LOG_TARGET, "Network event stream ended."); return; } }, @@ -424,7 +476,7 @@ impl Service { Some(Command::SendToRandom(message, peer_ids)) => self.send_to_random_authentication(message, peer_ids), Some(Command::Send(message, peer_id)) => self.send_authentication_data(message, peer_id), None => { - error!(target: "aleph-network", "Authentication user message stream ended."); + error!(target: LOG_TARGET, "Authentication user message stream ended."); return; } }, @@ -433,7 +485,7 @@ impl Service { Some(Command::SendToRandom(message, peer_ids)) => self.send_to_random_block_sync(message, peer_ids), Some(Command::Send(message, peer_id)) => self.send_block_sync_data(message, peer_id), None => { - error!(target: "aleph-network", "Block sync user message stream ended."); + error!(target: LOG_TARGET, "Block sync user message stream ended."); return; } }, @@ -456,17 +508,13 @@ mod tests { use tokio::runtime::Handle; use super::{Error, SendError, Service}; - use crate::{ - metrics::Metrics, - network::{ - gossip::{ - mock::{MockEvent, MockRawNetwork, MockSenderError}, - Network, - }, - mock::MockData, - Protocol, + use crate::network::{ + gossip::{ + mock::{MockEvent, MockRawNetwork, MockSenderError}, + Network, }, - testing::mocks::THash, + mock::MockData, + Protocol, }; const PROTOCOL: Protocol = Protocol::Authentication; @@ -474,7 +522,7 @@ mod tests { pub struct TestData { pub network: MockRawNetwork, gossip_network: Box>, - pub service: Service, + pub service: Service, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, // If we drop the sync network, the underlying network service dies, stopping the whole @@ -491,11 +539,8 @@ mod tests { // Prepare service let network = MockRawNetwork::new(event_stream_oneshot_tx); - let (service, gossip_network, other_network) = Service::new( - network.clone(), - task_manager.spawn_handle().into(), - Metrics::::noop(), - ); + let (service, gossip_network, other_network) = + Service::new(network.clone(), task_manager.spawn_handle().into(), None); let gossip_network = Box::new(gossip_network); let other_network = Box::new(other_network); diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 08fb82f959..b7c568a347 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -111,7 +111,7 @@ where let (gossip_network_service, authentication_network, block_sync_network) = GossipService::new( SubstrateNetwork::new(network.clone(), sync_network.clone(), protocol_naming), spawn_handle.clone(), - metrics.clone(), + registry.clone(), ); let gossip_network_task = async move { gossip_network_service.run().await }; diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index ac83830cd6..168986a2a3 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -27,8 +27,7 @@ use crate::{ }, GossipError, GossipNetwork, GossipService, MockEvent, MockRawNetwork, Protocol, }, - testing::mocks::THash, - Metrics, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, + MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, }; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -101,12 +100,11 @@ async fn prepare_one_session_test_data() -> TestData { let network = MockRawNetwork::new(event_stream_tx); let validator_network = MockCliqueNetwork::new(); - let (gossip_service, gossip_network, sync_network) = - GossipService::<_, _, MockData, THash>::new( - network.clone(), - task_manager.spawn_handle().into(), - Metrics::noop(), - ); + let (gossip_service, gossip_network, sync_network) = GossipService::<_, _, MockData>::new( + network.clone(), + task_manager.spawn_handle().into(), + None, + ); let (connection_manager_service, session_manager) = ConnectionManager::new( authorities[0].address(),