From ba7d54ed57d1bb562194a25f6fb4cf4f52638e52 Mon Sep 17 00:00:00 2001 From: KostekIV Date: Mon, 31 Jul 2023 13:25:17 +0200 Subject: [PATCH 1/3] port metrics from release-11 --- finality-aleph/src/aggregation/mod.rs | 8 +- finality-aleph/src/metrics.rs | 163 +++++++++++++++++-- finality-aleph/src/network/gossip/service.rs | 42 +++-- finality-aleph/src/nodes.rs | 2 + finality-aleph/src/sync/service.rs | 27 ++- finality-aleph/src/testing/network.rs | 9 +- 6 files changed, 220 insertions(+), 31 deletions(-) diff --git a/finality-aleph/src/aggregation/mod.rs b/finality-aleph/src/aggregation/mod.rs index 25b1d1ce98..5458dc9f62 100644 --- a/finality-aleph/src/aggregation/mod.rs +++ b/finality-aleph/src/aggregation/mod.rs @@ -1,6 +1,6 @@ //! Module to glue legacy and current version of the aggregator; -use std::{fmt::Debug, hash::Hash, marker::PhantomData, time::Instant}; +use std::{marker::PhantomData, time::Instant}; use current_aleph_aggregator::NetworkError as CurrentNetworkError; use legacy_aleph_aggregator::NetworkError as LegacyNetworkError; @@ -9,7 +9,7 @@ use sp_runtime::traits::Block; use crate::{ abft::SignatureSet, crypto::Signature, - metrics::Checkpoint, + metrics::{Checkpoint, Key}, mpsc, network::{ data::{Network, SendError}, @@ -175,13 +175,13 @@ impl> NetworkWrapper { } } -impl legacy_aleph_aggregator::Metrics for Metrics { +impl legacy_aleph_aggregator::Metrics for Metrics { fn report_aggregation_complete(&mut self, h: H) { self.report_block(h, Instant::now(), Checkpoint::Aggregating); } } -impl current_aleph_aggregator::Metrics for Metrics { +impl current_aleph_aggregator::Metrics for Metrics { fn report_aggregation_complete(&mut self, h: H) { self.report_block(h, Instant::now(), Checkpoint::Aggregating); } diff --git a/finality-aleph/src/metrics.rs b/finality-aleph/src/metrics.rs index c866fc3cc8..80cae3953a 100644 --- a/finality-aleph/src/metrics.rs +++ b/finality-aleph/src/metrics.rs @@ -9,7 +9,12 @@ use log::{trace, warn}; use lru::LruCache; use parking_lot::Mutex; use sc_service::Arc; -use substrate_prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64}; +use substrate_prometheus_endpoint::{ + exponential_buckets, prometheus::HistogramTimer, register, Counter, Gauge, Histogram, + HistogramOpts, Opts, PrometheusError, Registry, U64, +}; + +use crate::Protocol; // How many entries (block hash + timestamp) we keep in memory per one checkpoint type. // Each entry takes 32B (Hash) + 16B (Instant), so a limit of 5000 gives ~234kB (per checkpoint). @@ -17,8 +22,8 @@ use substrate_prometheus_endpoint::{register, Gauge, PrometheusError, Registry, // (e.g. when the gap between checkpoints for a block grows over `MAX_BLOCKS_PER_CHECKPOINT`). const MAX_BLOCKS_PER_CHECKPOINT: usize = 5000; -pub trait Key: Hash + Eq + Debug + Copy {} -impl Key for T {} +pub trait Key: Hash + Eq + Debug + Copy + Send + 'static {} +impl Key for T {} const LOG_TARGET: &str = "aleph-metrics"; @@ -26,6 +31,16 @@ struct Inner { prev: HashMap, gauges: HashMap>, starts: HashMap>, + sync_broadcast_counter: Counter, + sync_send_request_counter: Counter, + sync_send_to_counter: Counter, + sync_handle_state_counter: Counter, + sync_handle_request_response_counter: Counter, + sync_handle_request_counter: Counter, + sync_handle_task_counter: Counter, + sync_handle_block_imported_counter: Counter, + sync_handle_block_finalized_counter: Counter, + network_send_times: HashMap, } impl Inner { @@ -71,10 +86,23 @@ impl Inner { } } } + + fn start_sending_in(&self, protocol: Protocol) -> HistogramTimer { + self.network_send_times[&protocol].start_timer() + } +} + +fn protocol_name(protocol: Protocol) -> String { + use Protocol::*; + match protocol { + Authentication => "authentication", + BlockSync => "block_sync", + } + .to_string() } #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] -pub(crate) enum Checkpoint { +pub enum Checkpoint { Importing, Imported, Ordering, @@ -117,6 +145,29 @@ impl Metrics { ); } + use Protocol::*; + let mut network_send_times = HashMap::new(); + for key in [Authentication, BlockSync] { + network_send_times.insert( + key, + register( + Histogram::with_opts(HistogramOpts { + common_opts: Opts { + namespace: "gossip_network".to_string(), + subsystem: protocol_name(key), + name: "send_duration".to_string(), + help: "How long did it take for substrate to send a message." + .to_string(), + const_labels: Default::default(), + variable_labels: Default::default(), + }, + buckets: exponential_buckets(0.001, 1.26, 30)?, + })?, + registry, + )?, + ); + } + let inner = Some(Arc::new(Mutex::new(Inner { prev, gauges, @@ -124,23 +175,115 @@ impl Metrics { .iter() .map(|k| (*k, LruCache::new(MAX_BLOCKS_PER_CHECKPOINT))) .collect(), + sync_broadcast_counter: register( + Counter::new("aleph_sync_broadcast", "no help")?, + registry, + )?, + sync_send_request_counter: register( + Counter::new("aleph_sync_send_request", "no help")?, + registry, + )?, + sync_send_to_counter: register( + Counter::new("aleph_sync_send_to", "no help")?, + registry, + )?, + sync_handle_state_counter: register( + Counter::new("aleph_sync_handle_state", "no help")?, + registry, + )?, + sync_handle_request_response_counter: register( + Counter::new("aleph_sync_handle_request_response", "no help")?, + registry, + )?, + sync_handle_request_counter: register( + Counter::new("aleph_sync_handle_request", "no help")?, + registry, + )?, + sync_handle_task_counter: register( + Counter::new("aleph_sync_handle_task", "no help")?, + registry, + )?, + sync_handle_block_imported_counter: register( + Counter::new("aleph_sync_handle_block_imported", "no help")?, + registry, + )?, + sync_handle_block_finalized_counter: register( + Counter::new("aleph_sync_handle_block_finalized", "no help")?, + registry, + )?, + network_send_times, }))); Ok(Metrics { inner }) } - pub(crate) fn report_block( - &self, - hash: H, - checkpoint_time: Instant, - checkpoint_type: Checkpoint, - ) { + pub fn report_block(&self, hash: H, checkpoint_time: Instant, checkpoint_type: Checkpoint) { if let Some(inner) = &self.inner { inner .lock() .report_block(hash, checkpoint_time, checkpoint_type); } } + + pub fn report_sync_broadcast(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_broadcast_counter.inc(); + } + } + + pub fn report_sync_send_request(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_send_request_counter.inc(); + } + } + + pub fn report_sync_send_to(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_send_to_counter.inc(); + } + } + + pub fn report_sync_handle_state(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_state_counter.inc(); + } + } + + pub fn report_sync_handle_request_response(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_request_response_counter.inc(); + } + } + + pub fn report_sync_handle_request(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_request_counter.inc(); + } + } + + pub fn report_sync_handle_task(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_task_counter.inc(); + } + } + + pub fn report_sync_handle_block_imported(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_block_imported_counter.inc(); + } + } + + pub fn report_sync_handle_block_finalized(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_block_finalized_counter.inc(); + } + } + + pub fn start_sending_in(&self, protocol: Protocol) -> Option { + self.inner + .as_ref() + .map(|inner| inner.lock().start_sending_in(protocol)) + } } #[cfg(test)] diff --git a/finality-aleph/src/network/gossip/service.rs b/finality-aleph/src/network/gossip/service.rs index b544c37787..28e0e43a25 100644 --- a/finality-aleph/src/network/gossip/service.rs +++ b/finality-aleph/src/network/gossip/service.rs @@ -14,11 +14,12 @@ use tokio::time; const MAX_QUEUE_SIZE: usize = 1_000; use crate::{ + metrics::Key, network::{ gossip::{Event, EventStream, Network, NetworkSender, Protocol, RawNetwork}, Data, }, - SpawnHandle, STATUS_REPORT_INTERVAL, + Metrics, SpawnHandle, STATUS_REPORT_INTERVAL, }; enum Command { @@ -33,7 +34,7 @@ enum Command { /// 1. Messages are forwarded to the user. /// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes. /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. -pub struct Service { +pub struct Service { network: N, messages_from_authentication_user: mpsc::UnboundedReceiver>, messages_from_block_sync_user: mpsc::UnboundedReceiver>, @@ -44,6 +45,7 @@ pub struct Service { block_sync_connected_peers: HashSet, block_sync_peer_senders: HashMap>, spawn_handle: SpawnHandle, + metrics: Metrics, } struct ServiceInterface { @@ -109,12 +111,13 @@ enum SendError { SendingFailed, } -impl Service { +impl Service { pub fn new( network: N, spawn_handle: SpawnHandle, + metrics: Metrics, ) -> ( - Service, + Self, impl Network, impl Network, ) { @@ -132,6 +135,7 @@ impl Service { messages_for_authentication_user, messages_for_block_sync_user, spawn_handle, + metrics, authentication_connected_peers: HashSet::new(), authentication_peer_senders: HashMap::new(), block_sync_connected_peers: HashSet::new(), @@ -163,6 +167,7 @@ impl Service { protocol: Protocol, ) -> impl Future + Send + 'static { let network = self.network.clone(); + let metrics = self.metrics.clone(); async move { let mut sender = None; loop { @@ -178,10 +183,14 @@ impl Service { } } }; + let maybe_timer = metrics.start_sending_in(protocol); if let Err(e) = s.send(data.encode()).await { debug!(target: "aleph-network", "Failed sending data to peer. Dropping sender and message: {}", e); sender = None; } + if let Some(timer) = maybe_timer { + timer.observe_duration(); + } } else { debug!(target: "aleph-network", "Sender was dropped for peer {:?}. Peer sender exiting.", peer_id); return; @@ -447,13 +456,17 @@ mod tests { use tokio::runtime::Handle; use super::{Error, SendError, Service}; - use crate::network::{ - gossip::{ - mock::{MockEvent, MockRawNetwork, MockSenderError}, - Network, + use crate::{ + metrics::Metrics, + network::{ + gossip::{ + mock::{MockEvent, MockRawNetwork, MockSenderError}, + Network, + }, + mock::MockData, + Protocol, }, - mock::MockData, - Protocol, + testing::mocks::THash, }; const PROTOCOL: Protocol = Protocol::Authentication; @@ -461,7 +474,7 @@ mod tests { pub struct TestData { pub network: MockRawNetwork, gossip_network: Box>, - pub service: Service, + pub service: Service, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, // If we drop the sync network, the underlying network service dies, stopping the whole @@ -478,8 +491,11 @@ mod tests { // Prepare service let network = MockRawNetwork::new(event_stream_oneshot_tx); - let (service, gossip_network, other_network) = - Service::new(network.clone(), task_manager.spawn_handle().into()); + let (service, gossip_network, other_network) = Service::new( + network.clone(), + task_manager.spawn_handle().into(), + Metrics::::noop(), + ); let gossip_network = Box::new(gossip_network); let other_network = Box::new(other_network); diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index 27aadc16e3..ce0e4b47a6 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -109,6 +109,7 @@ where let (gossip_network_service, authentication_network, block_sync_network) = GossipService::new( SubstrateNetwork::new(network.clone(), sync_network.clone(), protocol_naming), spawn_handle.clone(), + metrics.clone(), ); let gossip_network_task = async move { gossip_network_service.run().await }; @@ -153,6 +154,7 @@ where database_io, session_info.clone(), justification_rx, + metrics.clone(), ) { Ok(x) => x, Err(e) => panic!("Failed to initialize Sync service: {e}"), diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index bf461956a4..f0e2279b8c 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -6,6 +6,7 @@ use log::{debug, error, trace, warn}; pub use crate::sync::handler::DatabaseIO; use crate::{ + metrics::Key, network::GossipNetwork, session::SessionBoundaryInfo, sync::{ @@ -19,13 +20,14 @@ use crate::{ ChainStatusNotifier, Finalizer, Justification, JustificationSubmissions, RequestBlocks, Verifier, LOG_TARGET, }, + Metrics, }; const BROADCAST_COOLDOWN: Duration = Duration::from_millis(600); const BROADCAST_PERIOD: Duration = Duration::from_secs(5); /// A service synchronizing the knowledge about the chain between the nodes. -pub struct Service +pub struct Service where B: Block, J: Justification
, @@ -35,6 +37,7 @@ where V: Verifier, F: Finalizer, BI: BlockImport, + H: Key, { network: VersionWrapper, handler: Handler, @@ -45,6 +48,7 @@ where additional_justifications_from_user: mpsc::UnboundedReceiver, block_requests_from_user: mpsc::UnboundedReceiver>, _phantom: PhantomData, + metrics: Metrics, } impl JustificationSubmissions for mpsc::UnboundedSender { @@ -63,7 +67,7 @@ impl RequestBlocks for mpsc::UnboundedSender { } } -impl Service +impl Service where B: Block, J: Justification
, @@ -73,6 +77,7 @@ where V: Verifier, F: Finalizer, BI: BlockImport, + H: Key, { /// Create a new service using the provided network for communication. /// Also returns an interface for submitting additional justifications, @@ -84,6 +89,7 @@ where database_io: DatabaseIO, session_info: SessionBoundaryInfo, additional_justifications_from_user: mpsc::UnboundedReceiver, + metrics: Metrics, ) -> Result< ( Self, @@ -108,6 +114,7 @@ where justifications_from_user, additional_justifications_from_user, block_requests_from_user, + metrics, _phantom: PhantomData, }, justifications_for_sync, @@ -145,6 +152,7 @@ where } }; trace!(target: LOG_TARGET, "Broadcasting state: {:?}", state); + self.metrics.report_sync_broadcast(); let data = NetworkData::StateBroadcast(state); if let Err(e) = self.network.broadcast(data) { warn!(target: LOG_TARGET, "Error sending broadcast: {}.", e); @@ -164,6 +172,7 @@ where }; let (request, peers) = pre_request.with_state(state); trace!(target: LOG_TARGET, "Sending a request: {:?}", request); + self.metrics.report_sync_send_request(); let data = NetworkData::Request(request); if let Err(e) = self.network.send_to_random(data, peers) { warn!(target: LOG_TARGET, "Error sending request: {}.", e); @@ -171,6 +180,13 @@ where } fn send_to(&mut self, data: NetworkData, peer: N::PeerId) { + trace!( + target: LOG_TARGET, + "Sending data {:?} to peer {:?}", + data, + peer + ); + self.metrics.report_sync_send_to(); if let Err(e) = self.network.send_to(data, peer) { warn!(target: LOG_TARGET, "Error sending response: {}.", e); } @@ -184,6 +200,7 @@ where state, peer ); + self.metrics.report_sync_handle_state(); match self.handler.handle_state(state, peer.clone()) { Ok(action) => match action { Response(data) => self.send_to(data, peer), @@ -265,6 +282,7 @@ where peer, response_items, ); + self.metrics.report_sync_handle_request_response(); let (maybe_id, maybe_error) = self .handler .handle_request_response(response_items, peer.clone()); @@ -292,6 +310,8 @@ where request, peer ); + self.metrics.report_sync_handle_request(); + match self.handler.handle_request(request) { Ok(Action::Response(response_items)) => { let mut limiter = MsgLimiter::new(&response_items); @@ -328,6 +348,7 @@ where fn handle_task(&mut self, task: RequestTask>) { trace!(target: LOG_TARGET, "Handling task {}.", task); + self.metrics.report_sync_handle_task(); if let TaskAction::Request(pre_request, (task, delay)) = task.process(self.handler.interest_provider()) { @@ -341,6 +362,7 @@ where match event { BlockImported(header) => { trace!(target: LOG_TARGET, "Handling a new imported block."); + self.metrics.report_sync_handle_block_imported(); if let Err(e) = self.handler.block_imported(header) { error!( target: LOG_TARGET, @@ -350,6 +372,7 @@ where } BlockFinalized(_) => { trace!(target: LOG_TARGET, "Handling a new finalized block."); + self.metrics.report_sync_handle_block_finalized(); if self.broadcast_ticker.try_tick() { self.broadcast(); } diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 3b2491bd23..ac83830cd6 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -27,7 +27,8 @@ use crate::{ }, GossipError, GossipNetwork, GossipService, MockEvent, MockRawNetwork, Protocol, }, - MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, + testing::mocks::THash, + Metrics, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, }; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -101,7 +102,11 @@ async fn prepare_one_session_test_data() -> TestData { let validator_network = MockCliqueNetwork::new(); let (gossip_service, gossip_network, sync_network) = - GossipService::<_, _, MockData>::new(network.clone(), task_manager.spawn_handle().into()); + GossipService::<_, _, MockData, THash>::new( + network.clone(), + task_manager.spawn_handle().into(), + Metrics::noop(), + ); let (connection_manager_service, session_manager) = ConnectionManager::new( authorities[0].address(), From bf00f50d102f8d91cbf0ba584f8987bb1c510d9a Mon Sep 17 00:00:00 2001 From: KostekIV Date: Mon, 31 Jul 2023 18:45:45 +0200 Subject: [PATCH 2/3] More metrics --- finality-aleph/src/metrics.rs | 36 ++++++++++++++++++++++++++++++ finality-aleph/src/sync/service.rs | 3 +++ 2 files changed, 39 insertions(+) diff --git a/finality-aleph/src/metrics.rs b/finality-aleph/src/metrics.rs index 80cae3953a..804a6e9b04 100644 --- a/finality-aleph/src/metrics.rs +++ b/finality-aleph/src/metrics.rs @@ -40,6 +40,9 @@ struct Inner { sync_handle_task_counter: Counter, sync_handle_block_imported_counter: Counter, sync_handle_block_finalized_counter: Counter, + sync_handle_state_response_counter: Counter, + sync_handle_justification_from_user_counter: Counter, + sync_handle_internal_request_counter: Counter, network_send_times: HashMap, } @@ -211,6 +214,18 @@ impl Metrics { Counter::new("aleph_sync_handle_block_finalized", "no help")?, registry, )?, + sync_handle_justification_from_user_counter: register( + Counter::new("aleph_sync_handle_justification_from_user", "no help")?, + registry, + )?, + sync_handle_state_response_counter: register( + Counter::new("aleph_sync_handle_state_response", "no help")?, + registry, + )?, + sync_handle_internal_request_counter: register( + Counter::new("aleph_sync_handle_internal_request", "no help")?, + registry, + )?, network_send_times, }))); @@ -279,6 +294,27 @@ impl Metrics { } } + pub fn report_sync_handle_justification_from_user(&self) { + if let Some(inner) = &self.inner { + inner + .lock() + .sync_handle_justification_from_user_counter + .inc(); + } + } + + pub fn report_sync_handle_state_response(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_state_response_counter.inc(); + } + } + + pub fn report_sync_handle_internal_request(&self) { + if let Some(inner) = &self.inner { + inner.lock().sync_handle_internal_request_counter.inc(); + } + } + pub fn start_sending_in(&self, protocol: Protocol) -> Option { self.inner .as_ref() diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index f0e2279b8c..a3d178fcd3 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -233,6 +233,7 @@ where maybe_justification, peer ); + self.metrics.report_sync_handle_state_response(); let (maybe_id, maybe_error) = self.handler .handle_state_response(justification, maybe_justification, peer.clone()); @@ -259,6 +260,7 @@ where "Handling a justification {:?} from user.", justification, ); + self.metrics.report_sync_handle_justification_from_user(); match self.handler.handle_justification_from_user(justification) { Ok(Some(id)) => self.request_highest_justified(id), Ok(None) => (), @@ -386,6 +388,7 @@ where "Handling an internal request for block {:?}.", id, ); + self.metrics.report_sync_handle_internal_request(); match self.handler.handle_internal_request(&id) { Ok(true) => { self.request_block(id); From b90de3357a6555da62037a4644a7332c8aa8f38f Mon Sep 17 00:00:00 2001 From: KostekIV Date: Tue, 1 Aug 2023 12:42:40 +0200 Subject: [PATCH 3/3] review --- finality-aleph/src/sync/service.rs | 65 ++++++++++++++++++------------ 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index a3d178fcd3..9e2d4bd289 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -152,10 +152,11 @@ where } }; trace!(target: LOG_TARGET, "Broadcasting state: {:?}", state); - self.metrics.report_sync_broadcast(); + let data = NetworkData::StateBroadcast(state); - if let Err(e) = self.network.broadcast(data) { - warn!(target: LOG_TARGET, "Error sending broadcast: {}.", e); + match self.network.broadcast(data) { + Ok(()) => self.metrics.report_sync_broadcast(), + Err(e) => warn!(target: LOG_TARGET, "Error sending broadcast: {}.", e), } } @@ -172,10 +173,11 @@ where }; let (request, peers) = pre_request.with_state(state); trace!(target: LOG_TARGET, "Sending a request: {:?}", request); - self.metrics.report_sync_send_request(); let data = NetworkData::Request(request); - if let Err(e) = self.network.send_to_random(data, peers) { - warn!(target: LOG_TARGET, "Error sending request: {}.", e); + + match self.network.send_to_random(data, peers) { + Ok(()) => self.metrics.report_sync_send_request(), + Err(e) => warn!(target: LOG_TARGET, "Error sending request: {}.", e), } } @@ -186,9 +188,9 @@ where data, peer ); - self.metrics.report_sync_send_to(); - if let Err(e) = self.network.send_to(data, peer) { - warn!(target: LOG_TARGET, "Error sending response: {}.", e); + match self.network.send_to(data, peer) { + Ok(()) => self.metrics.report_sync_send_to(), + Err(e) => warn!(target: LOG_TARGET, "Error sending response: {}.", e), } } @@ -200,13 +202,15 @@ where state, peer ); - self.metrics.report_sync_handle_state(); match self.handler.handle_state(state, peer.clone()) { - Ok(action) => match action { - Response(data) => self.send_to(data, peer), - HighestJustified(block_id) => self.request_highest_justified(block_id), - Noop => (), - }, + Ok(action) => { + self.metrics.report_sync_handle_state(); + match action { + Response(data) => self.send_to(data, peer), + HighestJustified(block_id) => self.request_highest_justified(block_id), + Noop => (), + } + } Err(e) => match e { HandlerError::Verifier(e) => debug!( target: LOG_TARGET, @@ -260,10 +264,13 @@ where "Handling a justification {:?} from user.", justification, ); - self.metrics.report_sync_handle_justification_from_user(); match self.handler.handle_justification_from_user(justification) { - Ok(Some(id)) => self.request_highest_justified(id), - Ok(None) => (), + Ok(maybe_id) => { + self.metrics.report_sync_handle_justification_from_user(); + if let Some(id) = maybe_id { + self.request_highest_justified(id) + } + } Err(e) => match e { HandlerError::Verifier(e) => debug!( target: LOG_TARGET, @@ -319,7 +326,9 @@ where let mut limiter = MsgLimiter::new(&response_items); loop { match limiter.next_largest_msg() { - Ok(None) => break, + Ok(None) => { + break self.metrics.report_sync_handle_request(); + } Ok(Some(chunk)) => { self.send_to(NetworkData::RequestResponse(chunk.to_vec()), peer.clone()) } @@ -333,7 +342,10 @@ where } } } - Ok(Action::RequestBlock(id)) => self.request_block(id), + Ok(Action::RequestBlock(id)) => { + self.metrics.report_sync_handle_request(); + self.request_block(id) + } Err(e) => match e { HandlerError::Verifier(e) => debug!( target: LOG_TARGET, @@ -350,13 +362,13 @@ where fn handle_task(&mut self, task: RequestTask>) { trace!(target: LOG_TARGET, "Handling task {}.", task); - self.metrics.report_sync_handle_task(); if let TaskAction::Request(pre_request, (task, delay)) = task.process(self.handler.interest_provider()) { self.send_request(pre_request); self.tasks.schedule_in(task, delay); } + self.metrics.report_sync_handle_task(); } fn handle_chain_event(&mut self, event: ChainStatusNotification) { @@ -364,12 +376,12 @@ where match event { BlockImported(header) => { trace!(target: LOG_TARGET, "Handling a new imported block."); - self.metrics.report_sync_handle_block_imported(); - if let Err(e) = self.handler.block_imported(header) { - error!( + match self.handler.block_imported(header) { + Ok(()) => self.metrics.report_sync_handle_block_imported(), + Err(e) => error!( target: LOG_TARGET, "Error marking block as imported: {}.", e - ); + ), } } BlockFinalized(_) => { @@ -388,12 +400,13 @@ where "Handling an internal request for block {:?}.", id, ); - self.metrics.report_sync_handle_internal_request(); match self.handler.handle_internal_request(&id) { Ok(true) => { + self.metrics.report_sync_handle_internal_request(); self.request_block(id); } Ok(_) => { + self.metrics.report_sync_handle_internal_request(); debug!(target: LOG_TARGET, "Already requested block {:?}.", id); } Err(e) => match e {