Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
feat: clean up partition limiter state on schedule (#72)
Browse files Browse the repository at this point in the history
* feat: clean up partition limiter state on schedule

* add randomness
  • Loading branch information
Ellie Huxtable authored Dec 13, 2023
1 parent 3c7479a commit 454b96d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
18 changes: 18 additions & 0 deletions capture/src/partition_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};
use metrics::gauge;
use rand::Rng;

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
Expand Down Expand Up @@ -49,6 +50,23 @@ impl PartitionLimiter {
gauge!("partition_limits_key_count", self.limiter.len() as f64);
}
}

/// Clean up the rate limiter state, once per minute. Ensure we don't use more memory than
/// necessary.
pub async fn clean_state(&self) {
// Give a small amount of randomness to the interval to ensure we don't have all replicas
// locking at the same time. The lock isn't going to be held for long, but this will reduce
// impact regardless.
let interval_secs = rand::thread_rng().gen_range(60..70);

let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;

self.limiter.retain_recent();
self.limiter.shrink_to_fit();
}
}
}

#[cfg(test)]
Expand Down
11 changes: 11 additions & 0 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ where
partition.report_metrics().await;
});
}

{
// Ensure that the rate limiter state does not grow unbounded

let partition = partition.clone();

tokio::spawn(async move {
partition.clean_state().await;
});
}

let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down

0 comments on commit 454b96d

Please sign in to comment.