Skip to content

Commit

Permalink
add latency calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Jun 10, 2024
1 parent 2d7902d commit ff68ff2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
6 changes: 5 additions & 1 deletion cdn-proto/src/connection/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Feature-gated connection specific metrics

use lazy_static::lazy_static;
use prometheus::{register_gauge, Gauge};
use prometheus::{register_gauge, register_histogram, Gauge, Histogram};

lazy_static! {
// The total number of bytes sent
Expand All @@ -11,4 +11,8 @@ lazy_static! {
// The total number of bytes received
pub static ref BYTES_RECV: Gauge =
register_gauge!("total_bytes_recv", "the total number of bytes received").unwrap();

// Per-message latency
pub static ref LATENCY: Histogram =
register_histogram!("message_latency", "message delivery latency").unwrap();
}
16 changes: 13 additions & 3 deletions cdn-proto/src/connection/middleware/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
//! receive a message, we await on allocating it. When we are done sending it out to everyone,
//! we drop the `Parc`, allowing for re-allocation.

use std::{ops::Deref, sync::Arc};
use std::{ops::Deref, sync::Arc, time::Instant};

use anyhow::Result;
use derivative::Derivative;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use crate::connection::metrics;

/// A global memory arena that tracks but does not allocate memory.
/// Allows for asynchronous capping of memory usage.
#[derive(Clone)]
Expand All @@ -28,7 +30,15 @@ impl MemoryPool {
/// An acquired permit that allows for allocation of a memory region
/// of a particular size.
#[allow(dead_code)]
pub struct AllocationPermit(OwnedSemaphorePermit);
pub struct AllocationPermit(OwnedSemaphorePermit, Instant);

/// When dropped, log the time of allocation to deallocation
/// as latency.
impl Drop for AllocationPermit {
fn drop(&mut self) {
metrics::LATENCY.observe(self.1.elapsed().as_secs_f64());
}
}

impl MemoryPool {
/// Asynchronously allocate `n` bytes from the global pool, waiting
Expand All @@ -39,7 +49,7 @@ impl MemoryPool {
pub async fn alloc(&self, n: u32) -> Result<AllocationPermit> {
// Acquire many permits to the underlying semaphore
let permit = self.0.clone().acquire_many_owned(n).await?;
Ok(AllocationPermit(permit))
Ok(AllocationPermit(permit, Instant::now()))
}
}

Expand Down

0 comments on commit ff68ff2

Please sign in to comment.