Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

middleware -> limiter #56

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
mod metrics;
use cdn_proto::{
bail,
connection::{middleware::Middleware, protocols::Protocol as _},
connection::{limiter::Limiter, protocols::Protocol as _},
crypto::tls::{generate_cert_from_ca, load_ca},
def::{Listener, Protocol, RunDef, Scheme},
discovery::{BrokerIdentifier, DiscoveryClient},
Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct Config<R: RunDef> {
/// The discovery endpoint. We use this to maintain consistency between brokers and marshals.
pub discovery_endpoint: String,

/// The underlying (public) verification key, used to authenticate with other brokers.
pub keypair: KeyPair<Scheme<R::Broker>>,

/// An optional TLS CA cert path. If not specified, will use the local one.
Expand Down Expand Up @@ -90,8 +91,8 @@ struct Inner<R: RunDef> {
/// state or send messages.
connections: Arc<RwLock<Connections>>,

/// The shared middleware that we use for all connections.
middleware: Middleware,
/// The shared limiter that we use for all connections.
limiter: Limiter,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
Expand Down Expand Up @@ -221,8 +222,8 @@ impl<R: RunDef> Broker<R> {
})
.transpose()?;

// Create the globally shared middleware
let middleware = Middleware::new(global_memory_pool_size, None);
// Create the globally shared limiter
let limiter = Limiter::new(global_memory_pool_size, None);

// Create and return `Self` as wrapping an `Inner` (with things that we need to share)
Ok(Self {
Expand All @@ -231,7 +232,7 @@ impl<R: RunDef> Broker<R> {
identity: identity.clone(),
keypair,
connections: Arc::from(RwLock::from(Connections::new(identity))),
middleware,
limiter,
}),
metrics_bind_endpoint,
user_listener,
Expand Down
2 changes: 1 addition & 1 deletion cdn-broker/src/tasks/broker/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<Def: RunDef> Inner<Def> {
let connection =
// Our TCP protocol is unsecured, so the cert we use does not matter.
// Time out is at protocol level
match Protocol::<Def::Broker>::connect(&to_connect_endpoint, true, inner.middleware.clone()).await
match Protocol::<Def::Broker>::connect(&to_connect_endpoint, true, inner.limiter.clone()).await
{
Ok(connection) => connection,
Err(err) => {
Expand Down
4 changes: 1 addition & 3 deletions cdn-broker/src/tasks/broker/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ impl<Def: RunDef> Inner<Def> {
let inner = self.clone();
spawn(async move {
// Finalize the connection
let Ok(connection) = unfinalized_connection
.finalize(inner.middleware.clone())
.await
let Ok(connection) = unfinalized_connection.finalize(inner.limiter.clone()).await
else {
return;
};
Expand Down
4 changes: 1 addition & 3 deletions cdn-broker/src/tasks/user/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ impl<Def: RunDef> Inner<Def> {
let inner = self.clone();
spawn(async move {
// Finalize the connection
let Ok(connection) = unfinalized_connection
.finalize(inner.middleware.clone())
.await
let Ok(connection) = unfinalized_connection.finalize(inner.limiter.clone()).await
else {
return;
};
Expand Down
6 changes: 3 additions & 3 deletions cdn-broker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;

use cdn_proto::{
connection::{
middleware::Middleware,
limiter::Limiter,
protocols::{Connection, Listener, Protocol, UnfinalizedConnection},
UserPublicKey,
},
Expand Down Expand Up @@ -190,14 +190,14 @@ async fn gen_connection_pairs<P: Protocol>(num: usize) -> Vec<(Connection, Conne
// Spawn a task to connect the user to the broker
let bind_endpoint_ = bind_endpoint.clone();
let unfinalized_outgoing_connection =
spawn(async move { P::connect(&bind_endpoint_, true, Middleware::none()).await });
spawn(async move { P::connect(&bind_endpoint_, true, Limiter::none()).await });

// Accept the connection from the user
let incoming_connection = listener
.accept()
.await
.expect("failed to accept connection")
.finalize(Middleware::none())
.finalize(Limiter::none())
.await
.expect("failed to finalize connection");

Expand Down
11 changes: 5 additions & 6 deletions cdn-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{collections::HashSet, sync::Arc, time::Duration};
use cdn_proto::{
connection::{
auth::user::UserAuth,
middleware::Middleware,
limiter::Limiter,
protocols::{Connection, Protocol as _},
},
crypto::signature::KeyPair,
Expand Down Expand Up @@ -80,13 +80,12 @@ impl<C: ConnectionDef> Inner<C> {
/// - If the connection failed
/// - If authentication failed
async fn connect(self: &Arc<Self>) -> Result<Connection> {
// Create the middleware we will use for all connections
let middleware = Middleware::new(None, Some(1));
// Create the limiter we will use for all connections
let limiter = Limiter::new(None, Some(1));

// Make the connection to the marshal
let connection = bail!(
Protocol::<C>::connect(&self.endpoint, self.use_local_authority, middleware.clone())
.await,
Protocol::<C>::connect(&self.endpoint, self.use_local_authority, limiter.clone()).await,
Connection,
"failed to connect to endpoint"
);
Expand All @@ -100,7 +99,7 @@ impl<C: ConnectionDef> Inner<C> {

// Make the connection to the broker
let connection = bail!(
Protocol::<C>::connect(&broker_endpoint, self.use_local_authority, middleware).await,
Protocol::<C>::connect(&broker_endpoint, self.use_local_authority, limiter).await,
Connection,
"failed to connect to broker"
);
Expand Down
16 changes: 8 additions & 8 deletions cdn-marshal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod handlers;
use cdn_proto::{
bail,
connection::{
middleware::Middleware,
limiter::Limiter,
protocols::{Listener as _, Protocol as _, UnfinalizedConnection},
},
crypto::tls::{generate_cert_from_ca, load_ca},
Expand Down Expand Up @@ -73,8 +73,8 @@ pub struct Marshal<R: RunDef> {
/// metrics are not exposed.
metrics_bind_endpoint: Option<SocketAddr>,

// The middleware to use for the connection
middleware: Middleware,
// The limiter to use for the connection
limiter: Limiter,
}

impl<R: RunDef> Marshal<R> {
Expand Down Expand Up @@ -131,15 +131,15 @@ impl<R: RunDef> Marshal<R> {
})
.transpose()?;

// Create the middleware
let middleware = Middleware::new(global_memory_pool_size, None);
// Create the limiter
let limiter = Limiter::new(global_memory_pool_size, None);

// Create `Self` from the `Listener`
Ok(Self {
listener: Arc::from(listener),
metrics_bind_endpoint,
discovery_client,
middleware,
limiter,
})
}

Expand All @@ -165,10 +165,10 @@ impl<R: RunDef> Marshal<R> {

// Create a task to handle the connection
let discovery_client = self.discovery_client.clone();
let middleware = self.middleware.clone();
let limiter = self.limiter.clone();
spawn(async move {
// Finalize the connection
let Ok(connection) = unfinalized_connection.finalize(middleware).await else {
let Ok(connection) = unfinalized_connection.finalize(limiter).await else {
return;
};

Expand Down
6 changes: 3 additions & 3 deletions cdn-proto/benches/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use cdn_proto::{
connection::{
middleware::Middleware,
limiter::Limiter,
protocols::{quic::Quic, tcp::Tcp, Connection, Listener, Protocol, UnfinalizedConnection},
Bytes,
},
Expand Down Expand Up @@ -70,13 +70,13 @@ fn set_up_bench<Proto: Protocol>(message_size: usize) -> (Runtime, Connection, C

// Finalize the connection
unfinalized_connection
.finalize(Middleware::none())
.finalize(Limiter::none())
.await
.expect("failed to finalize connection")
});

// Attempt to connect
let conn1 = Proto::connect(&format!("127.0.0.1:{port}"), true, Middleware::none())
let conn1 = Proto::connect(&format!("127.0.0.1:{port}"), true, Limiter::none())
.await
.expect("failed to connect to listener");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use self::pool::AllocationPermit;

pub mod pool;

/// Shared middleware for all connections.
/// Shared limiter for all connections.
#[derive(Clone)]
pub struct Middleware {
pub struct Limiter {
/// The global memory pool to check with before allocating.
global_memory_pool: Option<MemoryPool>,

/// Per connection, the size of the channel buffer.
connection_message_pool_size: Option<usize>,
}

impl Middleware {
/// Create a new middleware with a global memory pool of `global_memory_pool_size` bytes
impl Limiter {
/// Create a new limiter with a global memory pool of `global_memory_pool_size` bytes
/// and a connection message pool size of `connection_message_pool_size` bytes.
///
/// If the global memory pool is not set, it will not be used.
Expand All @@ -37,11 +37,11 @@ impl Middleware {
}
}

/// Create a new middleware with no global memory pool and no connection message pool size.
/// Create a new limiter with no global memory pool and no connection message pool size.
/// This means an unbounded channel will be used for connections and no global memory pool
/// will be checked.
pub const fn none() -> Self {
// Create a new middleware with no global memory pool and no connection message pool size.
// Create a new limiter with no global memory pool and no connection message pool size.
Self {
global_memory_pool: None,
connection_message_pool_size: None,
Expand Down
4 changes: 2 additions & 2 deletions cdn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
use std::sync::Arc;

pub mod auth;
pub mod middleware;
pub mod limiter;
pub mod protocols;

use self::middleware::pool::Allocation;
use self::limiter::pool::Allocation;

/// Some type aliases to help with readability
pub type Bytes = Allocation<Vec<u8>>;
Expand Down
13 changes: 6 additions & 7 deletions cdn-proto/src/connection/protocols/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::{
use super::{Connection, Listener, Protocol, SoftClose, UnfinalizedConnection};
use crate::{
bail,
connection::middleware::Middleware,
connection::limiter::Limiter,
error::{Error, Result},
};

Expand All @@ -47,7 +47,7 @@ impl Protocol for Memory {
async fn connect(
remote_endpoint: &str,
_use_local_authority: bool,
middleware: Middleware,
limiter: Limiter,
) -> Result<Connection> {
// If the peer is not listening, return an error
// Get or initialize the channels as a static value
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Protocol for Memory {
);

// Convert the streams into a `Connection`
let connection = Connection::from_streams(send_to_them, receive_from_them, middleware);
let connection = Connection::from_streams(send_to_them, receive_from_them, limiter);

// Return our connection
Ok(connection)
Expand Down Expand Up @@ -121,10 +121,9 @@ pub struct UnfinalizedMemoryConnection {
#[async_trait]
impl UnfinalizedConnection for UnfinalizedMemoryConnection {
/// Prepares the `MemoryConnection` for usage by `Arc()ing` things.
async fn finalize(self, middleware: Middleware) -> Result<Connection> {
async fn finalize(self, limiter: Limiter) -> Result<Connection> {
// Convert the streams into a `Connection`
let connection =
Connection::from_streams(self.send_stream, self.receive_stream, middleware);
let connection = Connection::from_streams(self.send_stream, self.receive_stream, limiter);

// Return our connection
Ok(connection)
Expand Down Expand Up @@ -196,7 +195,7 @@ impl Memory {
let (sender, receiver) = duplex(8192);

// Convert the streams into a `Connection`
Connection::from_streams(sender, receiver, Middleware::none())
Connection::from_streams(sender, receiver, Limiter::none())
}
}

Expand Down
Loading