Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-2814: port metrics from release-11 #1316

Merged
merged 4 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions finality-aleph/src/aggregation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Module to glue legacy and current version of the aggregator;

use std::{fmt::Debug, hash::Hash, marker::PhantomData, time::Instant};
use std::{marker::PhantomData, time::Instant};

use current_aleph_aggregator::NetworkError as CurrentNetworkError;
use legacy_aleph_aggregator::NetworkError as LegacyNetworkError;
Expand All @@ -9,7 +9,7 @@ use sp_runtime::traits::Block;
use crate::{
abft::SignatureSet,
crypto::Signature,
metrics::Checkpoint,
metrics::{Checkpoint, Key},
mpsc,
network::{
data::{Network, SendError},
Expand Down Expand Up @@ -175,13 +175,13 @@ impl<D: Data, N: Network<D>> NetworkWrapper<D, N> {
}
}

impl<H: Debug + Hash + Eq + Debug + Copy> legacy_aleph_aggregator::Metrics<H> for Metrics<H> {
impl<H: Key> legacy_aleph_aggregator::Metrics<H> for Metrics<H> {
fn report_aggregation_complete(&mut self, h: H) {
self.report_block(h, Instant::now(), Checkpoint::Aggregating);
}
}

impl<H: Debug + Hash + Eq + Debug + Copy> current_aleph_aggregator::Metrics<H> for Metrics<H> {
impl<H: Key> current_aleph_aggregator::Metrics<H> for Metrics<H> {
fn report_aggregation_complete(&mut self, h: H) {
self.report_block(h, Instant::now(), Checkpoint::Aggregating);
}
Expand Down
199 changes: 189 additions & 10 deletions finality-aleph/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,41 @@ use log::{trace, warn};
use lru::LruCache;
use parking_lot::Mutex;
use sc_service::Arc;
use substrate_prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
use substrate_prometheus_endpoint::{
exponential_buckets, prometheus::HistogramTimer, register, Counter, Gauge, Histogram,
HistogramOpts, Opts, PrometheusError, Registry, U64,
};

use crate::Protocol;

// How many entries (block hash + timestamp) we keep in memory per one checkpoint type.
// Each entry takes 32B (Hash) + 16B (Instant), so a limit of 5000 gives ~234kB (per checkpoint).
// Notice that some issues like finalization stall may lead to incomplete metrics
// (e.g. when the gap between checkpoints for a block grows over `MAX_BLOCKS_PER_CHECKPOINT`).
const MAX_BLOCKS_PER_CHECKPOINT: usize = 5000;

pub trait Key: Hash + Eq + Debug + Copy {}
impl<T: Hash + Eq + Debug + Copy> Key for T {}
pub trait Key: Hash + Eq + Debug + Copy + Send + 'static {}
impl<T: Hash + Eq + Debug + Copy + Send + 'static> Key for T {}

const LOG_TARGET: &str = "aleph-metrics";

struct Inner<H: Key> {
prev: HashMap<Checkpoint, Checkpoint>,
gauges: HashMap<Checkpoint, Gauge<U64>>,
starts: HashMap<Checkpoint, LruCache<H, Instant>>,
sync_broadcast_counter: Counter<U64>,
sync_send_request_counter: Counter<U64>,
sync_send_to_counter: Counter<U64>,
sync_handle_state_counter: Counter<U64>,
sync_handle_request_response_counter: Counter<U64>,
sync_handle_request_counter: Counter<U64>,
sync_handle_task_counter: Counter<U64>,
sync_handle_block_imported_counter: Counter<U64>,
sync_handle_block_finalized_counter: Counter<U64>,
sync_handle_state_response_counter: Counter<U64>,
sync_handle_justification_from_user_counter: Counter<U64>,
sync_handle_internal_request_counter: Counter<U64>,
network_send_times: HashMap<Protocol, Histogram>,
}

impl<H: Key> Inner<H> {
Expand Down Expand Up @@ -71,10 +89,23 @@ impl<H: Key> Inner<H> {
}
}
}

fn start_sending_in(&self, protocol: Protocol) -> HistogramTimer {
self.network_send_times[&protocol].start_timer()
}
}

fn protocol_name(protocol: Protocol) -> String {
use Protocol::*;
match protocol {
Authentication => "authentication",
BlockSync => "block_sync",
}
.to_string()
}

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub(crate) enum Checkpoint {
pub enum Checkpoint {
Importing,
Imported,
Ordering,
Expand Down Expand Up @@ -117,30 +148,178 @@ impl<H: Key> Metrics<H> {
);
}

use Protocol::*;
let mut network_send_times = HashMap::new();
for key in [Authentication, BlockSync] {
network_send_times.insert(
key,
register(
Histogram::with_opts(HistogramOpts {
common_opts: Opts {
namespace: "gossip_network".to_string(),
subsystem: protocol_name(key),
name: "send_duration".to_string(),
help: "How long did it take for substrate to send a message."
.to_string(),
const_labels: Default::default(),
variable_labels: Default::default(),
},
buckets: exponential_buckets(0.001, 1.26, 30)?,
})?,
registry,
)?,
);
}

let inner = Some(Arc::new(Mutex::new(Inner {
prev,
gauges,
starts: keys
.iter()
.map(|k| (*k, LruCache::new(MAX_BLOCKS_PER_CHECKPOINT)))
.collect(),
sync_broadcast_counter: register(
Counter::new("aleph_sync_broadcast", "no help")?,
registry,
)?,
sync_send_request_counter: register(
Counter::new("aleph_sync_send_request", "no help")?,
registry,
)?,
sync_send_to_counter: register(
Counter::new("aleph_sync_send_to", "no help")?,
registry,
)?,
sync_handle_state_counter: register(
Counter::new("aleph_sync_handle_state", "no help")?,
registry,
)?,
sync_handle_request_response_counter: register(
Counter::new("aleph_sync_handle_request_response", "no help")?,
registry,
)?,
sync_handle_request_counter: register(
Counter::new("aleph_sync_handle_request", "no help")?,
registry,
)?,
sync_handle_task_counter: register(
Counter::new("aleph_sync_handle_task", "no help")?,
registry,
)?,
sync_handle_block_imported_counter: register(
Counter::new("aleph_sync_handle_block_imported", "no help")?,
registry,
)?,
sync_handle_block_finalized_counter: register(
Counter::new("aleph_sync_handle_block_finalized", "no help")?,
registry,
)?,
sync_handle_justification_from_user_counter: register(
Counter::new("aleph_sync_handle_justification_from_user", "no help")?,
registry,
)?,
sync_handle_state_response_counter: register(
Counter::new("aleph_sync_handle_state_response", "no help")?,
registry,
)?,
sync_handle_internal_request_counter: register(
Counter::new("aleph_sync_handle_internal_request", "no help")?,
registry,
)?,
network_send_times,
})));

Ok(Metrics { inner })
}

pub(crate) fn report_block(
&self,
hash: H,
checkpoint_time: Instant,
checkpoint_type: Checkpoint,
) {
pub fn report_block(&self, hash: H, checkpoint_time: Instant, checkpoint_type: Checkpoint) {
if let Some(inner) = &self.inner {
inner
.lock()
.report_block(hash, checkpoint_time, checkpoint_type);
}
}

pub fn report_sync_broadcast(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_broadcast_counter.inc();
}
}

pub fn report_sync_send_request(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_send_request_counter.inc();
}
}

pub fn report_sync_send_to(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_send_to_counter.inc();
}
}

pub fn report_sync_handle_state(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_state_counter.inc();
}
}

pub fn report_sync_handle_request_response(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_request_response_counter.inc();
}
}

pub fn report_sync_handle_request(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_request_counter.inc();
}
}

pub fn report_sync_handle_task(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_task_counter.inc();
}
}

pub fn report_sync_handle_block_imported(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_block_imported_counter.inc();
}
}

pub fn report_sync_handle_block_finalized(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_block_finalized_counter.inc();
}
}

pub fn report_sync_handle_justification_from_user(&self) {
if let Some(inner) = &self.inner {
inner
.lock()
.sync_handle_justification_from_user_counter
.inc();
}
}

pub fn report_sync_handle_state_response(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_state_response_counter.inc();
}
}

pub fn report_sync_handle_internal_request(&self) {
if let Some(inner) = &self.inner {
inner.lock().sync_handle_internal_request_counter.inc();
}
}

pub fn start_sending_in(&self, protocol: Protocol) -> Option<HistogramTimer> {
self.inner
.as_ref()
.map(|inner| inner.lock().start_sending_in(protocol))
}
}

#[cfg(test)]
Expand Down
Loading
Loading