Skip to content

Commit

Permalink
A0-3058: Factor out clique metrics (#1362)
Browse files Browse the repository at this point in the history
# Description

First part of the metrics cleanup, for now in the clique network.

## Type of change

# Checklist:

---------

Co-authored-by: timorl <[email protected]>
  • Loading branch information
timorl and timorleph authored Aug 22, 2023
1 parent 978a3e7 commit b1a94a7
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 153 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ pub fn new_authority(
keystore: keystore_container.keystore(),
justification_rx,
metrics,
registry: prometheus_registry,
unit_creation_delay: aleph_config.unit_creation_delay(),
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
Expand Down
1 change: 1 addition & 0 deletions clique/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ log = { workspace = true }
lru = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
substrate-prometheus-endpoint = { workspace = true }
tiny-bip39 = { workspace = true }
tokio = { workspace = true, features = [
"sync",
Expand Down
5 changes: 5 additions & 0 deletions clique/src/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::channel::{mpsc, oneshot};
use log::{debug, info};

use crate::{
metrics::Metrics,
protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService},
Data, PublicKey, SecretKey, Splittable, LOG_TARGET,
};
Expand Down Expand Up @@ -41,6 +42,7 @@ async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
metrics: Metrics,
) -> Result<(), IncomingError<SK::PublicKey>> {
debug!(
target: LOG_TARGET,
Expand All @@ -55,6 +57,7 @@ async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
result_for_parent,
data_for_user,
authorization_requests_sender,
metrics,
)
.await?)
}
Expand All @@ -70,6 +73,7 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
authorization_requests_sender: mpsc::UnboundedSender<(SK::PublicKey, oneshot::Sender<bool>)>,
metrics: Metrics,
) {
let addr = stream.peer_address_info();
if let Err(e) = manage_incoming(
Expand All @@ -78,6 +82,7 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
result_for_parent,
data_for_user,
authorization_requests_sender,
metrics,
)
.await
{
Expand Down
38 changes: 30 additions & 8 deletions clique/src/manager/direction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ use std::{
ops::BitXor,
};

use crate::{Data, PublicKey};
use crate::{
metrics::{Event, Metrics},
Data, PublicKey,
};

/// Data about peers we know and whether we should connect to them or they to us. For the former
/// case also keeps the peers' addresses.
pub struct DirectedPeers<PK: PublicKey, A: Data> {
own_id: PK,
outgoing: HashMap<PK, A>,
incoming: HashSet<PK>,
metrics: Metrics,
}

/// Whether we should call the remote or the other way around. We xor the peer ids and based on the
Expand All @@ -29,11 +33,12 @@ fn should_we_call(own_id: &[u8], remote_id: &[u8]) -> bool {

impl<PK: PublicKey, A: Data> DirectedPeers<PK, A> {
/// Create a new set of peers directed using our own peer id.
pub fn new(own_id: PK) -> Self {
pub fn new(own_id: PK, metrics: Metrics) -> Self {
DirectedPeers {
own_id,
outgoing: HashMap::new(),
incoming: HashSet::new(),
metrics,
}
}

Expand All @@ -43,12 +48,21 @@ impl<PK: PublicKey, A: Data> DirectedPeers<PK, A> {
/// exactly when the peer is one with which we should attempt connections AND it was added for
/// the first time.
pub fn add_peer(&mut self, peer_id: PK, address: A) -> bool {
use Event::*;
match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) {
true => self.outgoing.insert(peer_id, address).is_none(),
true => match self.outgoing.insert(peer_id, address).is_none() {
true => {
self.metrics.report_event(NewOutgoing);
true
}
false => false,
},
false => {
// We discard the address here, as we will never want to call this peer anyway,
// so we don't need it.
self.incoming.insert(peer_id);
if self.incoming.insert(peer_id) {
self.metrics.report_event(NewIncoming);
}
false
}
}
Expand Down Expand Up @@ -77,21 +91,29 @@ impl<PK: PublicKey, A: Data> DirectedPeers<PK, A> {
/// Remove a peer from the list of peers that we want to stay connected with, whether the
/// connection was supposed to be incoming or outgoing.
pub fn remove_peer(&mut self, peer_id: &PK) {
self.incoming.remove(peer_id);
self.outgoing.remove(peer_id);
use Event::*;
if self.incoming.remove(peer_id) {
self.metrics.report_event(DelIncoming);
}
if self.outgoing.remove(peer_id).is_some() {
self.metrics.report_event(DelOutgoing);
}
}
}

#[cfg(test)]
mod tests {
use super::DirectedPeers;
use crate::mock::{key, MockPublicKey};
use crate::{
metrics::Metrics,
mock::{key, MockPublicKey},
};

type Address = String;

fn container_with_id() -> (DirectedPeers<MockPublicKey, Address>, MockPublicKey) {
let (id, _) = key();
let container = DirectedPeers::new(id.clone());
let container = DirectedPeers::new(id.clone(), Metrics::noop());
(container, id)
}

Expand Down
26 changes: 10 additions & 16 deletions clique/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use std::{

use futures::channel::mpsc;

use crate::{Data, PeerId, PublicKey};
use crate::{metrics::Metrics, Data, PeerId, PublicKey};

mod direction;
use direction::DirectedPeers;

use crate::metrics::NetworkCliqueMetrics;

/// Error during sending data through the Manager
#[derive(Debug, PartialEq, Eq)]
pub enum SendError {
Expand Down Expand Up @@ -76,13 +74,6 @@ impl<PK: PublicKey + PeerId> ManagerStatus<PK> {
}
}

pub fn update_metrics<M: NetworkCliqueMetrics>(&self, metrics: &M) {
metrics.set_incoming_connections(self.incoming_peers.len() as u64);
metrics.set_missing_incoming_connections(self.missing_incoming.len() as u64);
metrics.set_outgoing_connections(self.outgoing_peers.len() as u64);
metrics.set_missing_outgoing_connections(self.missing_outgoing.len() as u64);
}

fn wanted_incoming(&self) -> usize {
self.incoming_peers.len() + self.missing_incoming.len()
}
Expand Down Expand Up @@ -172,9 +163,9 @@ pub struct Manager<PK: PublicKey + PeerId, A: Data, D: Data> {

impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
/// Create a new Manager with empty list of peers.
pub fn new(own_id: PK) -> Self {
pub fn new(own_id: PK, metrics: Metrics) -> Self {
Manager {
wanted: DirectedPeers::new(own_id),
wanted: DirectedPeers::new(own_id, metrics),
have: HashMap::new(),
}
}
Expand Down Expand Up @@ -249,15 +240,18 @@ mod tests {
use futures::{channel::mpsc, StreamExt};

use super::{AddResult::*, Manager, SendError};
use crate::mock::{key, MockPublicKey};
use crate::{
metrics::Metrics,
mock::{key, MockPublicKey},
};

type Data = String;
type Address = String;

#[test]
fn add_remove() {
let (own_id, _) = key();
let mut manager = Manager::<MockPublicKey, Address, Data>::new(own_id);
let mut manager = Manager::<MockPublicKey, Address, Data>::new(own_id, Metrics::noop());
let (peer_id, _) = key();
let (peer_id_b, _) = key();
let address = String::from("43.43.43.43:43000");
Expand Down Expand Up @@ -286,10 +280,10 @@ mod tests {
async fn send_receive() {
let (mut connecting_id, _) = key();
let mut connecting_manager =
Manager::<MockPublicKey, Address, Data>::new(connecting_id.clone());
Manager::<MockPublicKey, Address, Data>::new(connecting_id.clone(), Metrics::noop());
let (mut listening_id, _) = key();
let mut listening_manager =
Manager::<MockPublicKey, Address, Data>::new(listening_id.clone());
Manager::<MockPublicKey, Address, Data>::new(listening_id.clone(), Metrics::noop());
let data = String::from("DATA");
let address = String::from("43.43.43.43:43000");
let (tx, _rx) = mpsc::unbounded();
Expand Down
118 changes: 88 additions & 30 deletions clique/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,99 @@
pub trait NetworkCliqueMetrics {
fn set_incoming_connections(&self, present: u64);
fn set_missing_incoming_connections(&self, missing: u64);
fn set_outgoing_connections(&self, present: u64);
fn set_missing_outgoing_connections(&self, missing: u64);
use substrate_prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};

#[derive(Clone)]
pub enum Metrics {
Prometheus {
incoming_connections: Gauge<U64>,
missing_incoming_connections: Gauge<U64>,
outgoing_connections: Gauge<U64>,
missing_outgoing_connections: Gauge<U64>,
},
Noop,
}

impl<M: NetworkCliqueMetrics> NetworkCliqueMetrics for Option<M> {
fn set_incoming_connections(&self, present: u64) {
if let Some(m) = self {
m.set_incoming_connections(present)
}
}
pub enum Event {
NewOutgoing,
NewIncoming,
DelOutgoing,
DelIncoming,
ConnectedOutgoing,
ConnectedIncoming,
DisconnectedOutgoing,
DisconnectedIncoming,
}

fn set_missing_incoming_connections(&self, missing: u64) {
if let Some(m) = self {
m.set_missing_incoming_connections(missing)
impl Metrics {
pub fn new(registry: Option<Registry>) -> Result<Self, PrometheusError> {
match registry {
Some(registry) => Ok(Metrics::Prometheus {
incoming_connections: register(
Gauge::new(
"clique_network_incoming_connections",
"present incoming connections",
)?,
&registry,
)?,
missing_incoming_connections: register(
Gauge::new(
"clique_network_missing_incoming_connections",
"difference between expected and present incoming connections",
)?,
&registry,
)?,
outgoing_connections: register(
Gauge::new(
"clique_network_outgoing_connections",
"present outgoing connections",
)?,
&registry,
)?,
missing_outgoing_connections: register(
Gauge::new(
"clique_network_missing_outgoing_connections",
"difference between expected and present outgoing connections",
)?,
&registry,
)?,
}),
None => Ok(Metrics::Noop),
}
}

fn set_outgoing_connections(&self, present: u64) {
if let Some(m) = self {
m.set_outgoing_connections(present)
}
pub fn noop() -> Self {
Metrics::Noop
}

fn set_missing_outgoing_connections(&self, missing: u64) {
if let Some(m) = self {
m.set_missing_outgoing_connections(missing)
pub fn report_event(&self, event: Event) {
use Event::*;
if let Metrics::Prometheus {
incoming_connections,
outgoing_connections,
missing_incoming_connections,
missing_outgoing_connections,
} = self
{
match event {
NewIncoming => missing_incoming_connections.inc(),
NewOutgoing => missing_outgoing_connections.inc(),
DelIncoming => missing_incoming_connections.dec(),
DelOutgoing => missing_outgoing_connections.dec(),
ConnectedIncoming => {
incoming_connections.inc();
missing_incoming_connections.dec();
}
ConnectedOutgoing => {
outgoing_connections.inc();
missing_outgoing_connections.dec();
}
DisconnectedIncoming => {
incoming_connections.dec();
missing_incoming_connections.inc();
}
DisconnectedOutgoing => {
outgoing_connections.dec();
missing_outgoing_connections.inc();
}
}
}
}
}

pub struct NoopMetrics;

impl NetworkCliqueMetrics for NoopMetrics {
fn set_incoming_connections(&self, _: u64) {}
fn set_missing_incoming_connections(&self, _: u64) {}
fn set_outgoing_connections(&self, _: u64) {}
fn set_missing_outgoing_connections(&self, _: u64) {}
}
Loading

0 comments on commit b1a94a7

Please sign in to comment.