Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce message hooking #58

Merged
merged 6 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion cdn-broker/src/binaries/bad-broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
use std::time::Duration;

use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use cdn_proto::{
crypto::signature::KeyPair,
def::{NoMessageHook, ProductionRunDef},
error::Result,
};
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -76,6 +80,9 @@ async fn main() -> Result<()> {
private_bind_endpoint: format!("0.0.0.0:{private_port}"),
private_advertise_endpoint: format!("local_ip:{private_port}"),
global_memory_pool_size: None,

user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create new `Broker`
Expand Down
9 changes: 8 additions & 1 deletion cdn-broker/src/binaries/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
//! The following is the main `Broker` binary, which just instantiates and runs
//! a `Broker` object.
use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use cdn_proto::{
crypto::signature::KeyPair,
def::{NoMessageHook, ProductionRunDef},
error::Result,
};
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -111,6 +115,9 @@ async fn main() -> Result<()> {
private_bind_endpoint: args.private_bind_endpoint,
private_advertise_endpoint: args.private_advertise_endpoint,
global_memory_pool_size: Some(args.global_memory_pool_size),

user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create new `Broker`
Expand Down
19 changes: 18 additions & 1 deletion cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use cdn_proto::{
bail,
connection::{limiter::Limiter, protocols::Protocol as _},
crypto::tls::{generate_cert_from_ca, load_ca},
def::{Listener, Protocol, RunDef, Scheme},
def::{Listener, MessageHook, Protocol, RunDef, Scheme},
discovery::{BrokerIdentifier, DiscoveryClient},
error::{Error, Result},
util::AbortOnDropHandle,
Expand Down Expand Up @@ -74,6 +74,12 @@ pub struct Config<R: RunDef> {
/// tries to allocate more than this amount until some memory is freed.
/// Default is 1GB.
pub global_memory_pool_size: Option<usize>,

/// The hook we use when receiving incoming messages from users
pub user_message_hook: MessageHook<R::User>,

/// The hook we use when receiving incoming messages from brokers
pub broker_message_hook: MessageHook<R::Broker>,
}

/// The broker `Inner` that we use to share common data between broker tasks.
Expand All @@ -93,6 +99,12 @@ struct Inner<R: RunDef> {

/// The shared limiter that we use for all connections.
limiter: Limiter,

/// The hook we use when receiving incoming messages from users
user_message_hook: MessageHook<R::User>,

/// The hook we use when receiving incoming messages from brokers
broker_message_hook: MessageHook<R::Broker>,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
Expand Down Expand Up @@ -136,6 +148,9 @@ impl<R: RunDef> Broker<R> {
ca_key_path,

global_memory_pool_size,

user_message_hook,
broker_message_hook,
} = config;

// Get the local IP address so we can replace in
Expand Down Expand Up @@ -233,6 +248,8 @@ impl<R: RunDef> Broker<R> {
keypair,
connections: Arc::from(RwLock::from(Connections::new(identity))),
limiter,
user_message_hook,
broker_message_hook,
}),
metrics_bind_endpoint,
user_listener,
Expand Down
7 changes: 7 additions & 0 deletions cdn-broker/src/reexports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub mod discovery {

pub mod def {
pub use cdn_proto::def::{ConnectionDef, RunDef, Topic};
pub mod hook {
pub use cdn_proto::def::{HookResult, MessageHook, MessageHookDef, NoMessageHook};
}
}

pub mod crypto {
Expand All @@ -32,6 +35,10 @@ pub mod error {
pub use cdn_proto::error::{Error, Result};
}

pub mod message {
pub use cdn_proto::message::{Broadcast, Direct, Message};
}

/// This is not guarded by `![cfg(test)]` because we use the same functions
/// when doing benchmarks.
pub mod tests {
Expand Down
16 changes: 14 additions & 2 deletions cdn-broker/src/tasks/broker/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{sync::Arc, time::Duration};
use cdn_proto::{
authenticate_with_broker, bail,
connection::{auth::broker::BrokerAuth, protocols::Connection, Bytes, UserPublicKey},
def::RunDef,
def::{HookResult, MessageHookDef, RunDef},
discovery::BrokerIdentifier,
error::{Error, Result},
message::{Message, Topic},
Expand Down Expand Up @@ -123,12 +123,24 @@ impl<Def: RunDef> Inner<Def> {
broker_identifier: &BrokerIdentifier,
connection: Connection,
) -> Result<()> {
// Clone the hook
let mut local_message_hook = self.broker_message_hook.clone();

loop {
// Receive a message from the broker
let raw_message = connection.recv_message_raw().await?;

// Attempt to deserialize the message
let message = Message::deserialize(&raw_message)?;
let mut message = Message::deserialize(&raw_message)?;

// Call the hook for the broker and handle the result
match local_message_hook.on_message_received(&mut message) {
Ok(HookResult::SkipMessage) => continue,
Ok(HookResult::ProcessMessage) => (),
Err(err) => {
Err(Error::Connection(format!("hook failed: {err}")))?;
}
}

match message {
// If we receive a direct message from a broker, we want to send it to the user with that key
Expand Down
16 changes: 14 additions & 2 deletions cdn-broker/src/tasks/user/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use std::time::Duration;

use cdn_proto::connection::{protocols::Connection, UserPublicKey};
use cdn_proto::def::{RunDef, Topic as _};
use cdn_proto::def::{HookResult, MessageHookDef, RunDef, Topic as _};
use cdn_proto::error::{Error, Result};
use cdn_proto::util::mnemonic;
use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message};
Expand Down Expand Up @@ -97,12 +97,24 @@ impl<Def: RunDef> Inner<Def> {
public_key: &UserPublicKey,
connection: Connection,
) -> Result<()> {
// Clone the hook
let mut local_message_hook = self.user_message_hook.clone();

loop {
// Receive a message from the user
let raw_message = connection.recv_message_raw().await?;

// Attempt to deserialize the message
let message = Message::deserialize(&raw_message)?;
let mut message = Message::deserialize(&raw_message)?;

// Call the hook for the user and handle the result
match local_message_hook.on_message_received(&mut message) {
Ok(HookResult::SkipMessage) => continue,
Ok(HookResult::ProcessMessage) => (),
Err(err) => {
Err(Error::Connection(format!("hook failed: {err}")))?;
}
}

match message {
// If we get a direct message from a user, send it to both users and brokers.
Expand Down
4 changes: 3 additions & 1 deletion cdn-broker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cdn_proto::{
signature::KeyPair,
tls::{generate_cert_from_ca, LOCAL_CA_CERT, LOCAL_CA_KEY},
},
def::TestingRunDef,
def::{NoMessageHook, TestingRunDef},
discovery::BrokerIdentifier,
message::{Message, Topic},
};
Expand Down Expand Up @@ -240,6 +240,8 @@ async fn new_broker_under_test<B: Protocol, U: Protocol>() -> Broker<TestingRunD
global_memory_pool_size: None,
ca_cert_path: None,
ca_key_path: None,
user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create and return the broker
Expand Down
2 changes: 1 addition & 1 deletion cdn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ harness = false


[dependencies]
redis = { version = "0.26", default-features = false, features = [
redis = { version = "0.25", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
Expand Down
2 changes: 1 addition & 1 deletion cdn-proto/src/connection/auth/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl<R: RunDef> BrokerAuth<R> {
let public_key_bytes = bail!(
keypair.public_key.serialize(),
Serialize,
"failed to serialize publi key"
"failed to serialize public key"
);

// We authenticate to the marshal with a key
Expand Down
8 changes: 4 additions & 4 deletions cdn-proto/src/connection/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ use prometheus::{register_gauge, register_histogram, Gauge, Histogram};
lazy_static! {
// The total number of bytes sent
pub static ref BYTES_SENT: Option<Gauge> =
register_gauge!("total_bytes_sent", "the total number of bytes sent").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_gauge!("total_bytes_sent", "the total number of bytes sent").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();

// The total number of bytes received
pub static ref BYTES_RECV: Option<Gauge> =
register_gauge!("total_bytes_recv", "the total number of bytes received").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_gauge!("total_bytes_recv", "the total number of bytes received").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();

// Per-message latency
pub static ref LATENCY: Option<Histogram> =
register_histogram!("latency", "message delivery latency").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_histogram!("latency", "message delivery latency").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();

// The per-message latency over the last 30 seconds
pub static ref RUNNING_LATENCY: Option<Gauge> =
register_gauge!("running_latency", "average tail latency over the last 30s").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_gauge!("running_latency", "average tail latency over the last 30s").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();
}
36 changes: 35 additions & 1 deletion cdn-proto/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::crypto::signature::SignatureScheme;
use crate::discovery::embedded::Embedded;
use crate::discovery::{redis::Redis, DiscoveryClient};
use crate::error::{Error, Result};
use crate::message::Message;
use anyhow::Result as AnyhowResult;

/// An implementation of `Topic` for testing purposes.
#[repr(u8)]
Expand Down Expand Up @@ -55,12 +57,39 @@ pub trait RunDef: 'static {
type Topic: Topic;
}

/// This trait defines the connection configuration for a single CDN component.
/// This trait defines the connection configuration for a single CDN component
pub trait ConnectionDef: 'static {
type Scheme: SignatureScheme;
type Protocol: ProtocolType;
type MessageHook: MessageHookDef;
}

/// The result of a message hooking operation
pub enum HookResult {
/// Skip processing the message
SkipMessage,

/// Process the message
ProcessMessage,
}

/// This trait defines a hook that we use to perform additional actions on receiving a message
pub trait MessageHookDef: Send + Sync + 'static + Clone {
/// The hook that is called when a message is received. If an error is returned, the connection
/// will be closed.
///
/// # Errors
/// Is supposed to return an error if the other end should be disconnected.
fn on_message_received(&mut self, _message: &mut Message) -> AnyhowResult<HookResult> {
Ok(HookResult::ProcessMessage)
}
}

/// The no-op hook
#[derive(Clone)]
pub struct NoMessageHook;
impl MessageHookDef for NoMessageHook {}

/// The production run configuration.
/// Uses the real network protocols and Redis for discovery.
pub struct ProductionRunDef;
Expand All @@ -77,6 +106,7 @@ pub struct ProductionBrokerConnection;
impl ConnectionDef for ProductionBrokerConnection {
type Scheme = BLS;
type Protocol = Tcp;
type MessageHook = NoMessageHook;
}

/// The production user connection configuration.
Expand All @@ -85,6 +115,7 @@ pub struct ProductionUserConnection;
impl ConnectionDef for ProductionUserConnection {
type Scheme = BLS;
type Protocol = Quic;
type MessageHook = NoMessageHook;
}

/// The production client connection configuration.
Expand All @@ -95,6 +126,7 @@ pub struct ProductionClientConnection;
impl ConnectionDef for ProductionClientConnection {
type Scheme = Scheme<<ProductionRunDef as RunDef>::User>;
type Protocol = Protocol<<ProductionRunDef as RunDef>::User>;
type MessageHook = NoMessageHook;
}

/// The testing run configuration.
Expand All @@ -117,11 +149,13 @@ pub struct TestingConnection<P: ProtocolType> {
impl<P: ProtocolType> ConnectionDef for TestingConnection<P> {
type Scheme = BLS;
type Protocol = P;
type MessageHook = NoMessageHook;
}

// Type aliases to automatically disambiguate usage
pub type Scheme<A> = <A as ConnectionDef>::Scheme;
pub type PublicKey<A> = <Scheme<A> as SignatureScheme>::PublicKey;
pub type MessageHook<A> = <A as ConnectionDef>::MessageHook;

// Type aliases to automatically disambiguate usage
pub type Protocol<A> = <A as ConnectionDef>::Protocol;
Expand Down
4 changes: 3 additions & 1 deletion tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use cdn_marshal::{Config as MarshalConfig, Marshal};
use cdn_proto::{
connection::protocols::memory::Memory,
crypto::signature::{KeyPair, Serializable, SignatureScheme},
def::{TestingConnection, TestingRunDef},
def::{NoMessageHook, TestingConnection, TestingRunDef},
discovery::{embedded::Embedded, BrokerIdentifier, DiscoveryClient},
message::Topic,
};
Expand Down Expand Up @@ -78,6 +78,8 @@ async fn new_broker(key: u64, public_ep: &str, private_ep: &str, discovery_ep: &
public_advertise_endpoint: public_ep.to_string(),
public_bind_endpoint: public_ep.to_string(),
global_memory_pool_size: None,
user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create broker
Expand Down