Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4318: rate-limiter for all #1857

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
9e967ae
revamped TokenBucket for rate-limiter:
fixxxedpoint Nov 13, 2024
f896b4c
rate_limiter:
fixxxedpoint Nov 13, 2024
ab74c45
RateLimitedAsyncRead and Write moved to rate_limiter crate
fixxxedpoint Nov 13, 2024
0b17e45
custom implementation of Transport with rate-limiting (only read) for…
fixxxedpoint Nov 13, 2024
03142e2
intergrated rate-limiting for substrate [`sc_client::network::Network…
fixxxedpoint Nov 13, 2024
2eb7b72
increased limits for the rate-limiter, both for alephbft and substrate
fixxxedpoint Nov 13, 2024
fc806fc
- s/substrate_bit_rate/substrate_network_bit_rate
fixxxedpoint Nov 13, 2024
0a41d22
renamed local variable in `build/transport.rs` - it no longer referen…
fixxxedpoint Nov 13, 2024
eb6dd52
removed unecessary pub-use to [`TokenBucket`] in rate_limiter/lib.rs
fixxxedpoint Nov 13, 2024
b5937b8
removed unecessary `pub` in `pub mod transport;` in [`finality-aleph/…
fixxxedpoint Nov 13, 2024
d0b0ce5
removed newline in [`finality-aleph/src/network/build/mod.rs`]
fixxxedpoint Nov 13, 2024
d9fdce1
added `impl Default for DefaultTimeProvider` in [`token_bucket.rs`]
fixxxedpoint Nov 13, 2024
6ce1663
removed unecessary `.into()` conversion of [`SleepingRateLimiter`] in…
fixxxedpoint Nov 13, 2024
019f9a9
- `build_transport` moved to [`finality-aleph/src/network/build/trans…
fixxxedpoint Nov 13, 2024
0525c17
Default macro for DefaultTimeProvider - review changes
fixxxedpoint Nov 13, 2024
2dd96c2
using `parking_lot::Mutex` in rate_limiter instead of `std::sync::Mutex`
fixxxedpoint Nov 13, 2024
bbe8e8a
simplified TokenBucket algorithm: using only `requested` field instea…
fixxxedpoint Nov 13, 2024
e69894b
TokenBucket per connection instead of a shared one
fixxxedpoint Nov 13, 2024
3fe9404
using new API of `substrate::NetworkWorker::new_with_custom_transport`
fixxxedpoint Nov 13, 2024
7e680da
TokenBucket return None when rate_limit is 0 instead of some arbitrar…
fixxxedpoint Nov 13, 2024
563de02
default rate-limit for the `sync` network is now 1 MiB per connection…
fixxxedpoint Nov 13, 2024
b049419
fmt for rate_limiter.rs
fixxxedpoint Nov 13, 2024
f88d6ac
fixed unit-tests for token-bucket after recent changes of its api
fixxxedpoint Nov 13, 2024
4fa4dc0
rust-fmt after changes in rate-limiter
fixxxedpoint Nov 13, 2024
33eb851
added type alias for `Box<dyn sc_network::config::NotificationService…
fixxxedpoint Nov 13, 2024
f791005
s/substrate_network_bit_rate_per_connection/bit_rate_per_connection i…
fixxxedpoint Nov 13, 2024
e9cf1de
better readability for TokenBucket.rs
fixxxedpoint Nov 13, 2024
a2f1eff
better readability in TokenBucket.rs ++
fixxxedpoint Nov 13, 2024
6fdecaa
rust-fmt in `bin/node/src/service.rs`
fixxxedpoint Nov 13, 2024
a379641
using `enum Deadline {Never, Instant}` instead of `Option<Option<Inst…
fixxxedpoint Nov 13, 2024
dbeeca2
rust-fmt in tests for TokenBucket
fixxxedpoint Nov 13, 2024
f0dc89c
- simpler version of the TokenBucket (rate-limiter)
fixxxedpoint Nov 13, 2024
374a45f
added bunch of unit-tests for TokenBucket and SharedTokenBucket
fixxxedpoint Nov 13, 2024
ba07522
- integration of the new SharedTokenBucket with both Substrate-based …
fixxxedpoint Nov 13, 2024
ba90a89
added ALEPHBFT_NETWORK_BIT_RATE SUBSTRATE_NETWORK_BIT_RATE params to …
fixxxedpoint Nov 13, 2024
6c7404f
slightly refactored token_bucket.rs
fixxxedpoint Nov 13, 2024
1f5b787
fixed and updated unit tests in token_bucket.rs
fixxxedpoint Nov 13, 2024
b11b80c
rust-fmt for token_bucket and related
fixxxedpoint Nov 13, 2024
fc27ed5
added more docs in token_bucket.rs
fixxxedpoint Nov 13, 2024
fa131da
more docs for token_bucket.rs
fixxxedpoint Nov 13, 2024
c8db058
bumped version of rust-toolchain in nix/versions.nix
fixxxedpoint Nov 13, 2024
e2e0bb3
improved docs in token_bucket.rs
fixxxedpoint Nov 13, 2024
0f26764
removed no longer necessary types in rate-limiter crate - helpers rel…
fixxxedpoint Nov 13, 2024
1fbf91b
Added a verbose `share()` method to `SharedRateLimiter` (and to other…
fixxxedpoint Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
716 changes: 354 additions & 362 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 13 additions & 5 deletions bin/node/src/aleph_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub struct AlephCli {
#[clap(long, default_value_t = false)]
enable_pruning: bool,

/// Maximum bit-rate per node in bytes per second of the alephbft validator network.
#[clap(long, default_value_t = 64 * 1024)]
alephbft_bit_rate_per_connection: u64,
/// Maximum bit-rate in bits per second of the alephbft validator network.
#[clap(long, default_value_t = 768 * 1024)]
alephbft_network_bit_rate: u64,

/// Maximum bit-rate in bits per second of the substrate network.
#[clap(long, default_value_t = 5*1024*1024)]
substrate_network_bit_rate: u64,

/// Don't spend some extra time to collect more debugging data (e.g. validator network details).
/// By default collecting is enabled, as the impact on performance is negligible, if any.
Expand Down Expand Up @@ -93,8 +97,12 @@ impl AlephCli {
self.enable_pruning
}

pub fn alephbft_bit_rate_per_connection(&self) -> u64 {
self.alephbft_bit_rate_per_connection
pub fn alephbft_network_bit_rate(&self) -> u64 {
self.alephbft_network_bit_rate
}

pub fn substrate_network_bit_rate(&self) -> u64 {
self.substrate_network_bit_rate
}

pub fn no_collection_of_extra_debugging_data(&self) -> bool {
Expand Down
15 changes: 8 additions & 7 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,8 @@ fn get_proposer_factory(

fn get_rate_limit_config(aleph_config: &AlephCli) -> RateLimiterConfig {
RateLimiterConfig {
alephbft_bit_rate_per_connection: aleph_config
.alephbft_bit_rate_per_connection()
.try_into()
.unwrap_or(usize::MAX),
alephbft_network_bit_rate: aleph_config.alephbft_network_bit_rate(),
substrate_network_bit_rate: aleph_config.substrate_network_bit_rate(),
}
}

Expand Down Expand Up @@ -296,6 +294,11 @@ pub fn new_authority(
)?;

let import_queue_handle = BlockImporter::new(service_components.import_queue.service());
let rate_limiter_config = get_rate_limit_config(&aleph_config);
let network_config = finality_aleph::SubstrateNetworkConfig {
substrate_network_bit_rate: rate_limiter_config.substrate_network_bit_rate,
network_config: config.network.clone(),
};

let BuildNetworkOutput {
network,
Expand All @@ -305,7 +308,7 @@ pub fn new_authority(
tx_handler_controller,
system_rpc_tx,
} = build_network(
&config.network,
network_config,
config.protocol_id(),
service_components.client.clone(),
major_sync,
Expand Down Expand Up @@ -370,8 +373,6 @@ pub fn new_authority(
.spawn_essential_handle()
.spawn_blocking("aura", None, aura);

let rate_limiter_config = get_rate_limit_config(&aleph_config);

let AlephRuntimeVars {
millisecs_per_block,
session_period,
Expand Down
63 changes: 28 additions & 35 deletions clique/src/rate_limiting.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,36 @@
use rate_limiter::{RateLimiter, SleepingRateLimiter};
use tokio::io::AsyncRead;
use rate_limiter::{RateLimitedAsyncRead, SharedRateLimiter};

use crate::{ConnectionInfo, Data, Dialer, Listener, PeerAddressInfo, Splittable, Splitted};

pub struct RateLimitedAsyncRead<Read> {
rate_limiter: RateLimiter,
read: Read,
}

impl<Read> RateLimitedAsyncRead<Read> {
pub fn new(read: Read, rate_limiter: RateLimiter) -> Self {
Self { rate_limiter, read }
}
}

impl<Read: AsyncRead + Unpin> AsyncRead for RateLimitedAsyncRead<Read> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.get_mut();
let read = std::pin::Pin::new(&mut this.read);
this.rate_limiter.rate_limit(read, cx, buf)
}
}

impl<Read: ConnectionInfo> ConnectionInfo for RateLimitedAsyncRead<Read> {
impl<Read> ConnectionInfo for RateLimitedAsyncRead<Read>
where
Read: ConnectionInfo,
{
fn peer_address_info(&self) -> PeerAddressInfo {
self.read.peer_address_info()
self.inner().peer_address_info()
}
}

/// Implementation of the [Dialer] trait governing all returned [Dialer::Connection] instances by a rate-limiting wrapper.
#[derive(Clone)]
pub struct RateLimitingDialer<D> {
dialer: D,
rate_limiter: SleepingRateLimiter,
rate_limiter: SharedRateLimiter,
}

impl<D> Clone for RateLimitingDialer<D>
where
D: Clone,
{
fn clone(&self) -> Self {
Self {
dialer: self.dialer.clone(),
rate_limiter: self.rate_limiter.share(),
}
}
}

impl<D> RateLimitingDialer<D> {
pub fn new(dialer: D, rate_limiter: SleepingRateLimiter) -> Self {
pub fn new(dialer: D, rate_limiter: SharedRateLimiter) -> Self {
Self {
dialer,
rate_limiter,
Expand All @@ -66,7 +56,7 @@ where
let connection = self.dialer.connect(address).await?;
let (sender, receiver) = connection.split();
Ok(Splitted(
RateLimitedAsyncRead::new(receiver, RateLimiter::new(self.rate_limiter.clone())),
RateLimitedAsyncRead::new(receiver, self.rate_limiter.share()),
sender,
))
}
Expand All @@ -75,11 +65,11 @@ where
/// Implementation of the [Listener] trait governing all returned [Listener::Connection] instances by a rate-limiting wrapper.
pub struct RateLimitingListener<L> {
listener: L,
rate_limiter: SleepingRateLimiter,
rate_limiter: SharedRateLimiter,
}

impl<L> RateLimitingListener<L> {
pub fn new(listener: L, rate_limiter: SleepingRateLimiter) -> Self {
pub fn new(listener: L, rate_limiter: SharedRateLimiter) -> Self {
Self {
listener,
rate_limiter,
Expand All @@ -88,7 +78,10 @@ impl<L> RateLimitingListener<L> {
}

#[async_trait::async_trait]
impl<L: Listener + Send> Listener for RateLimitingListener<L> {
impl<L> Listener for RateLimitingListener<L>
where
L: Listener + Send,
{
type Connection = Splitted<
RateLimitedAsyncRead<<L::Connection as Splittable>::Receiver>,
<L::Connection as Splittable>::Sender,
Expand All @@ -99,7 +92,7 @@ impl<L: Listener + Send> Listener for RateLimitingListener<L> {
let connection = self.listener.accept().await?;
let (sender, receiver) = connection.split();
Ok(Splitted(
RateLimitedAsyncRead::new(receiver, RateLimiter::new(self.rate_limiter.clone())),
RateLimitedAsyncRead::new(receiver, self.rate_limiter.share()),
sender,
))
}
Expand Down
10 changes: 10 additions & 0 deletions docker/docker_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ MAX_RUNTIME_INSTANCES=${MAX_RUNTIME_INSTANCES:-8}
BACKUP_PATH=${BACKUP_PATH:-${BASE_PATH}/backup-stash}
DATABASE_ENGINE=${DATABASE_ENGINE:-}
PRUNING_ENABLED=${PRUNING_ENABLED:-false}
ALEPHBFT_NETWORK_BIT_RATE=${ALEPHBFT_NETWORK_BIT_RATE:-}
SUBSTRATE_NETWORK_BIT_RATE=${SUBSTRATE_NETWORK_BIT_RATE:-}

if [[ "true" == "$PURGE_BEFORE_START" ]]; then
echo "Purging chain (${CHAIN}) at path ${BASE_PATH}"
Expand Down Expand Up @@ -141,4 +143,12 @@ if [[ -n "${MAX_SUBSCRIPTIONS_PER_CONNECTION:-}" ]]; then
ARGS+=(--rpc-max-subscriptions-per-connection ${MAX_SUBSCRIPTIONS_PER_CONNECTION})
fi

if [[ -n "${ALEPHBFT_NETWORK_BIT_RATE}" ]]; then
ARGS+=(--alephbft-network-bit-rate ${ALEPHBFT_NETWORK_BIT_RATE})
fi

if [[ -n "${SUBSTRATE_NETWORK_BIT_RATE}" ]]; then
ARGS+=(--substrate-network-bit-rate ${SUBSTRATE_NETWORK_BIT_RATE})
fi

echo "${CUSTOM_ARGS}" | xargs aleph-node "${ARGS[@]}"
1 change: 1 addition & 0 deletions finality-aleph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ serde = { workspace = true }
static_assertions = { workspace = true }
tiny-bip39 = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
libp2p = { workspace = true }

substrate-prometheus-endpoint = { workspace = true }

Expand Down
9 changes: 6 additions & 3 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ pub use crate::{
justification::AlephJustification,
network::{
address_cache::{ValidatorAddressCache, ValidatorAddressingInfo},
build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId,
build_network, BuildNetworkOutput, ProtocolNetwork, SubstrateNetworkConfig,
SubstratePeerId,
},
nodes::run_validator_node,
session::SessionPeriod,
Expand Down Expand Up @@ -250,8 +251,10 @@ type Hasher = abft::HashWrapper<BlakeTwo256>;

#[derive(Clone)]
pub struct RateLimiterConfig {
/// Maximum bit-rate per node in bytes per second of the alephbft validator network.
pub alephbft_bit_rate_per_connection: usize,
/// Maximum bit-rate in bits per second of the alephbft validator network.
pub alephbft_network_bit_rate: u64,
/// Maximum bit-rate in bits per second of the substrate network (shared by sync, gossip, etc.).
pub substrate_network_bit_rate: u64,
}

pub struct AlephConfig<C, T> {
Expand Down
15 changes: 13 additions & 2 deletions finality-aleph/src/network/build/base.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use libp2p::{core::StreamMuxer, PeerId, Transport};
use sc_client_api::Backend;
use sc_network::{
config::{
Expand All @@ -8,6 +9,7 @@ use sc_network::{
},
error::Error as NetworkError,
peer_store::PeerStore,
transport::NetworkConfig,
NetworkService, NetworkWorker,
};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
Expand Down Expand Up @@ -69,8 +71,9 @@ type BaseNetworkOutput<B> = (
);

/// Create a base network with all the protocols already included. Also spawn (almost) all the necessary services.
pub fn network<B, BE, C>(
pub fn network<B, BE, C, T, SM>(
network_config: &NetworkConfiguration,
transport_builder: impl FnOnce(NetworkConfig) -> T,
protocol_id: ProtocolId,
client: Arc<C>,
spawn_handle: &SpawnTaskHandle,
Expand All @@ -82,6 +85,13 @@ where
B::Header: Header<Number = BlockNumber>,
BE: Backend<B>,
C: ClientForAleph<B, BE>,
T: Transport<Output = (PeerId, SM)> + Send + Unpin + 'static,
T::Dial: Send,
T::ListenerUpgrade: Send,
T::Error: Send + Sync,
SM: StreamMuxer + Unpin + Send + 'static,
SM::Substream: Unpin + Send,
SM::Error: Send + Sync,
{
let mut full_network_config = FullNetworkConfiguration::new(network_config);
let genesis_hash = client
Expand Down Expand Up @@ -135,7 +145,8 @@ where
block_announce_config: base_protocol_config,
};

let network_service = NetworkWorker::new(network_params)?;
let network_service =
NetworkWorker::new_with_custom_transport(network_params, transport_builder)?;
let network = network_service.service().clone();
spawn_handle.spawn_blocking("network-worker", SPAWN_CATEGORY, network_service.run());
Ok((network, networks, transactions_prototype))
Expand Down
21 changes: 18 additions & 3 deletions finality-aleph/src/network/build/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{atomic::AtomicBool, Arc};

use log::error;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_network::{
config::{NetworkConfiguration, ProtocolId},
Expand Down Expand Up @@ -28,6 +29,7 @@ mod base;
mod own_protocols;
mod rpc;
mod transactions;
mod transport;

use base::network as base_network;
use own_protocols::Networks;
Expand All @@ -47,10 +49,17 @@ pub struct NetworkOutput<TP: TransactionPool + 'static> {
pub system_rpc_tx: TracingUnboundedSender<RpcRequest<TP::Block>>,
}

pub struct SubstrateNetworkConfig {
/// Maximum bit-rate in bits per second of the substrate network (shared by sync, gossip, etc.).
pub substrate_network_bit_rate: u64,
/// Configuration of the network service.
pub network_config: NetworkConfiguration,
}

/// Start everything necessary to run the inter-node network and return the interfaces for it.
/// This includes everything in the base network, the base protocol service, and services for handling transactions and RPCs.
pub fn network<TP, BE, C>(
network_config: &NetworkConfiguration,
network_config: SubstrateNetworkConfig,
protocol_id: ProtocolId,
client: Arc<C>,
major_sync: Arc<AtomicBool>,
Expand All @@ -72,6 +81,11 @@ where
.expect("Genesis block exists.");
let (base_protocol_config, events_from_network) =
setup_base_protocol::<TP::Block>(genesis_hash);

let network_rate_limit = network_config.substrate_network_bit_rate;
let rate_limiter = SharedRateLimiter::new(network_rate_limit.into());
let transport_builder = |config| transport::build_transport(rate_limiter, config);

let (
network,
Networks {
Expand All @@ -80,7 +94,8 @@ where
},
transaction_prototype,
) = base_network(
network_config,
&network_config.network_config,
transport_builder,
protocol_id,
client.clone(),
spawn_handle,
Expand All @@ -91,7 +106,7 @@ where
let (base_service, syncing_service) = BaseProtocolService::new(
major_sync,
genesis_hash,
network_config,
&network_config.network_config,
protocol_names,
network.clone(),
events_from_network,
Expand Down
Loading