Skip to content

Commit

Permalink
Added a verbose share() method to SharedRateLimiter (and to other…
Browse files Browse the repository at this point in the history
… types in its hierarchy). It is more readable this way - previous version was using `Clone`, which could be confusing regarding if bandwidth was shared or each node uses separate rate.
  • Loading branch information
fixxxedpoint committed Nov 14, 2024
1 parent 0f26764 commit 1fbf91b
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 47 deletions.
27 changes: 19 additions & 8 deletions clique/src/rate_limiting.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rate_limiter::{RateLimitedAsyncRead, SharingRateLimiter};
use rate_limiter::{RateLimitedAsyncRead, SharedRateLimiter};

use crate::{ConnectionInfo, Data, Dialer, Listener, PeerAddressInfo, Splittable, Splitted};

Expand All @@ -12,14 +12,25 @@ where
}

/// Implementation of the [Dialer] trait governing all returned [Dialer::Connection] instances by a rate-limiting wrapper.
#[derive(Clone)]
pub struct RateLimitingDialer<D> {
dialer: D,
rate_limiter: SharingRateLimiter,
rate_limiter: SharedRateLimiter,
}

impl<D> Clone for RateLimitingDialer<D>
where
D: Clone,
{
fn clone(&self) -> Self {
Self {
dialer: self.dialer.clone(),
rate_limiter: self.rate_limiter.share(),
}
}
}

impl<D> RateLimitingDialer<D> {
pub fn new(dialer: D, rate_limiter: SharingRateLimiter) -> Self {
pub fn new(dialer: D, rate_limiter: SharedRateLimiter) -> Self {
Self {
dialer,
rate_limiter,
Expand All @@ -45,7 +56,7 @@ where
let connection = self.dialer.connect(address).await?;
let (sender, receiver) = connection.split();
Ok(Splitted(
RateLimitedAsyncRead::new(receiver, self.rate_limiter.clone()),
RateLimitedAsyncRead::new(receiver, self.rate_limiter.share()),
sender,
))
}
Expand All @@ -54,11 +65,11 @@ where
/// Implementation of the [Listener] trait governing all returned [Listener::Connection] instances by a rate-limiting wrapper.
pub struct RateLimitingListener<L> {
listener: L,
rate_limiter: SharingRateLimiter,
rate_limiter: SharedRateLimiter,
}

impl<L> RateLimitingListener<L> {
pub fn new(listener: L, rate_limiter: SharingRateLimiter) -> Self {
pub fn new(listener: L, rate_limiter: SharedRateLimiter) -> Self {
Self {
listener,
rate_limiter,
Expand All @@ -81,7 +92,7 @@ where
let connection = self.listener.accept().await?;
let (sender, receiver) = connection.split();
Ok(Splitted(
RateLimitedAsyncRead::new(receiver, self.rate_limiter.clone()),
RateLimitedAsyncRead::new(receiver, self.rate_limiter.share()),
sender,
))
}
Expand Down
4 changes: 2 additions & 2 deletions finality-aleph/src/network/build/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::{atomic::AtomicBool, Arc};

use log::error;
use rate_limiter::SharingRateLimiter;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_network::{
config::{NetworkConfiguration, ProtocolId},
Expand Down Expand Up @@ -83,7 +83,7 @@ where
setup_base_protocol::<TP::Block>(genesis_hash);

let network_rate_limit = network_config.substrate_network_bit_rate;
let rate_limiter = SharingRateLimiter::new(network_rate_limit.into());
let rate_limiter = SharedRateLimiter::new(network_rate_limit.into());
let transport_builder = |config| transport::build_transport(rate_limiter, config);

let (
Expand Down
27 changes: 20 additions & 7 deletions finality-aleph/src/network/build/transport.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use libp2p::{core::muxing::StreamMuxer, PeerId, Transport};
use rate_limiter::{FuturesRateLimitedAsyncReadWrite, SharingRateLimiter};
use rate_limiter::{FuturesRateLimitedAsyncReadWrite, SharedRateLimiter};

struct RateLimitedStreamMuxer<SM> {
rate_limiter: SharingRateLimiter,
rate_limiter: SharedRateLimiter,
stream_muxer: SM,
}

impl<SM> RateLimitedStreamMuxer<SM> {
pub fn new(stream_muxer: SM, rate_limiter: SharingRateLimiter) -> Self {
pub fn new(stream_muxer: SM, rate_limiter: SharedRateLimiter) -> Self {
Self {
rate_limiter,
stream_muxer,
Expand Down Expand Up @@ -36,7 +36,7 @@ where
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::Substream, Self::Error>> {
let rate_limiter = self.rate_limiter.clone();
let rate_limiter = self.rate_limiter.share();
self.inner().poll_inbound(cx).map(|result| {
result.map(|substream| FuturesRateLimitedAsyncReadWrite::new(substream, rate_limiter))
})
Expand All @@ -46,7 +46,7 @@ where
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::Substream, Self::Error>> {
let rate_limiter = self.rate_limiter.clone();
let rate_limiter = self.rate_limiter.share();
self.inner().poll_outbound(cx).map(|result| {
result.map(|substream| FuturesRateLimitedAsyncReadWrite::new(substream, rate_limiter))
})
Expand All @@ -68,7 +68,7 @@ where
}

pub fn build_transport(
rate_limiter: SharingRateLimiter,
rate_limiter: SharedRateLimiter,
config: sc_network::transport::NetworkConfig,
) -> impl Transport<
Output = (
Expand All @@ -79,6 +79,19 @@ pub fn build_transport(
ListenerUpgrade = impl Send,
Error = impl Send,
> + Send {
struct ClonableSharedRateLimiter(SharedRateLimiter);
impl ClonableSharedRateLimiter {
fn share(&self) -> SharedRateLimiter {
self.0.share()
}
}
impl Clone for ClonableSharedRateLimiter {
fn clone(&self) -> Self {
Self(self.share())
}
}
let rate_limiter = ClonableSharedRateLimiter(rate_limiter);

sc_network::transport::build_transport(
config.keypair,
config.memory_only,
Expand All @@ -88,7 +101,7 @@ pub fn build_transport(
.map(move |(peer_id, stream_muxer), _| {
(
peer_id,
RateLimitedStreamMuxer::new(stream_muxer, rate_limiter),
RateLimitedStreamMuxer::new(stream_muxer, rate_limiter.share()),
)
})
}
6 changes: 3 additions & 3 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::{debug, error};
use network_clique::{RateLimitingDialer, RateLimitingListener, Service, SpawnHandleT};
use pallet_aleph_runtime_api::AlephSessionApi;
use primitives::TransactionHash;
use rate_limiter::SharingRateLimiter;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_keystore::{Keystore, LocalKeystore};
use sc_transaction_pool_api::TransactionPool;
Expand Down Expand Up @@ -109,8 +109,8 @@ where
.expect("we should have working networking");

let alephbft_rate_limiter =
SharingRateLimiter::new(rate_limiter_config.alephbft_network_bit_rate.into());
let dialer = RateLimitingDialer::new(dialer, alephbft_rate_limiter.clone());
SharedRateLimiter::new(rate_limiter_config.alephbft_network_bit_rate.into());
let dialer = RateLimitingDialer::new(dialer, alephbft_rate_limiter.share());
let listener = RateLimitingListener::new(listener, alephbft_rate_limiter);

let (validator_network_service, validator_network) = Service::new(
Expand Down
2 changes: 1 addition & 1 deletion rate-limiter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{future::BoxFuture, ready, FutureExt};
use rate_limiter::RateLimiterFacade;
use tokio::io::AsyncRead;

pub use crate::{rate_limiter::SharingRateLimiter, token_bucket::SharedTokenBucket};
pub use crate::{rate_limiter::SharedRateLimiter, token_bucket::SharedTokenBucket};

const LOG_TARGET: &str = "rate-limiter";

Expand Down
12 changes: 10 additions & 2 deletions rate-limiter/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::future::pending;

use crate::{token_bucket::SharedTokenBucket, RatePerSecond};

pub type SharingRateLimiter = RateLimiterFacade;
pub type SharedRateLimiter = RateLimiterFacade;

#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub enum Deadline {
Expand All @@ -21,7 +21,6 @@ impl From<Deadline> for Option<Instant> {
}
}

#[derive(Clone)]
pub enum RateLimiterFacade {
NoTraffic,
RateLimiter(SharedTokenBucket),
Expand All @@ -45,4 +44,13 @@ impl RateLimiterFacade {
),
}
}

pub fn share(&self) -> Self {
match self {
RateLimiterFacade::NoTraffic => RateLimiterFacade::NoTraffic,
RateLimiterFacade::RateLimiter(shared_token_bucket) => {
RateLimiterFacade::RateLimiter(shared_token_bucket.share())
}
}
}
}
55 changes: 31 additions & 24 deletions rate-limiter/src/token_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,6 @@ pub struct SharedBandwidthManager {
already_requested: Option<NonZeroRatePerSecond>,
}

impl Clone for SharedBandwidthManager {
fn clone(&self) -> Self {
Self {
max_rate: self.max_rate,
peers_count: self.peers_count.clone(),
already_requested: None,
}
}
}

impl SharedBandwidthManager {
/// Constructs a new instance of [SharedBandwidthManager] configured with a given rate that will be shared between all
/// calling consumers (clones of this instance).
Expand All @@ -209,6 +199,14 @@ impl SharedBandwidthManager {
}
}

pub fn share(&self) -> Self {
Self {
max_rate: self.max_rate,
peers_count: self.peers_count.clone(),
already_requested: None,
}
}

fn calculate_bandwidth(&mut self, active_children: Option<u64>) -> NonZeroRatePerSecond {
let active_children =
active_children.unwrap_or_else(|| self.peers_count.load(Ordering::Acquire));
Expand Down Expand Up @@ -263,7 +261,10 @@ struct AsyncTokenBucket<TP = TokioTimeProvider, SU = TokioSleepUntil> {
sleep_until: SU,
}

impl<TP, SU> AsyncTokenBucket<TP, SU> {
impl<TP, SU> AsyncTokenBucket<TP, SU>
where
TP: TimeProvider,
{
/// Constructs an instance of [AsyncTokenBucket] using given [TokenBucket]
/// and implementation of the [SleepUntil] trait.
pub fn new(token_bucket: TokenBucket<TP>, sleep_until: SU) -> Self {
Expand All @@ -273,12 +274,7 @@ impl<TP, SU> AsyncTokenBucket<TP, SU> {
sleep_until,
}
}
}

impl<TP, SU> AsyncTokenBucket<TP, SU>
where
TP: TimeProvider,
{
/// Accounts `requested` units. A next call to [AsyncTokenBucket::wait] will
/// account these units while calculating necessary delay.
pub fn rate_limit(&mut self, requested: u64) {
Expand Down Expand Up @@ -317,7 +313,6 @@ where
/// 1/n) ≈ bandwidth * (ln n + O(1))`. This can happen when each instance of [TokenBucket] tries to spend slightly more data
/// than its initially acquired bandwidth, but small enough so none of them other instances receives a notification about
/// ongoing bandwidth change.
#[derive(Clone)]
pub struct SharedTokenBucket<TP = TokioTimeProvider, SU = TokioSleepUntil> {
shared_bandwidth: SharedBandwidthManager,
rate_limiter: AsyncTokenBucket<TP, SU>,
Expand All @@ -344,6 +339,18 @@ impl<TP, SU> SharedTokenBucket<TP, SU> {
}
}

pub fn share(&self) -> Self
where
TP: Clone,
SU: Clone,
{
Self {
shared_bandwidth: self.shared_bandwidth.share(),
rate_limiter: self.rate_limiter.clone(),
need_to_notify_parent: false,
}
}

fn request_bandwidth(&mut self) -> NonZeroRatePerSecond {
self.need_to_notify_parent = true;
self.shared_bandwidth.request_bandwidth()
Expand Down Expand Up @@ -428,8 +435,8 @@ mod tests {
async fn basic_checks_of_shared_bandwidth_manager() {
let rate = 10.try_into().expect("10 > 0 qed");
let mut bandwidth_share = SharedBandwidthManager::new(rate);
let mut cloned_bandwidth_share = bandwidth_share.clone();
let mut another_cloned_bandwidth_share = cloned_bandwidth_share.clone();
let mut cloned_bandwidth_share = bandwidth_share.share();
let mut another_cloned_bandwidth_share = cloned_bandwidth_share.share();

// only one consumer, so it should get whole bandwidth
assert_eq!(bandwidth_share.request_bandwidth(), rate);
Expand Down Expand Up @@ -844,7 +851,7 @@ mod tests {

let mut rate_limiter =
SharedTokenBucket::<_, _>::from((limit_per_second, time_provider, sleep_until));
let mut rate_limiter_cloned = rate_limiter.clone();
let mut rate_limiter_cloned = rate_limiter.share();

let total_data_sent = thread::scope(|s| {
let first_handle = s.spawn(|| {
Expand Down Expand Up @@ -946,7 +953,7 @@ mod tests {
SharedTracingSleepUntil::new(),
));

let rate_limiter_cloned = rate_limiter.clone();
let rate_limiter_cloned = rate_limiter.share();

let (rate_limiter, deadline) = RateLimiter::rate_limit(rate_limiter, 5).await;
assert_eq!(deadline, Some(now + Duration::from_millis(500)));
Expand Down Expand Up @@ -976,7 +983,7 @@ mod tests {

*time_to_return.write() = now + Duration::from_secs(1);

let rate_limiter_cloned = rate_limiter.clone();
let rate_limiter_cloned = rate_limiter.share();

let (rate_limiter, deadline) = RateLimiter::rate_limit(rate_limiter, 1).await;
assert_eq!(deadline, None);
Expand Down Expand Up @@ -1117,8 +1124,8 @@ mod tests {

let mut rate_limiters: Vec<_> = repeat(())
.scan((0usize, rate_limiter), |(id, rate_limiter), _| {
let new_rate_limiter = rate_limiter.clone();
let new_state = rate_limiter.clone();
let new_rate_limiter = rate_limiter.share();
let new_state = rate_limiter.share();
let limiter_id = *id;
*rate_limiter = new_state;
*id += 1;
Expand Down

0 comments on commit 1fbf91b

Please sign in to comment.