Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move topics to application layer #42

Merged
merged 3 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() {
private_key,
})
// Subscribe to the global consensus topic
.subscribed_topics(vec![Topic::Global])
.subscribed_topics(vec![TestTopic::Global as u8])
.build()
.unwrap();

Expand Down Expand Up @@ -70,7 +70,7 @@ async fn main() {

// Send a broadcast message to the global topic
client
.send_broadcast_message(vec![Topic::Global], b"hello broadcast".to_vec())
.send_broadcast_message(vec![TestTopic::Global as u8], b"hello broadcast".to_vec())
.await
.unwrap();

Expand All @@ -84,7 +84,7 @@ async fn main() {
assert!(
message
== Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"hello broadcast".to_vec()
})
);
Expand Down
14 changes: 9 additions & 5 deletions cdn-broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::time::Duration;
use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_proto::message::{Broadcast, Message, Topic};
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Broadcast, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};

/// The function under bench for broadcasting a message to two users.
async fn broadcast_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: vec![0; 10000],
});

Expand All @@ -28,7 +29,7 @@ async fn broadcast_user(run: &TestRun) {
async fn broadcast_broker(run: &TestRun) {
// Allocate a rather large message
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: vec![0; 10000],
});

Expand All @@ -48,7 +49,7 @@ fn bench_broadcast_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![],
};

Expand All @@ -71,7 +72,10 @@ fn bench_broadcast_broker(c: &mut Criterion) {
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![]],
connected_brokers: vec![(vec![], vec![Topic::Global]), (vec![], vec![Topic::Global])],
connected_brokers: vec![
(vec![], vec![TestTopic::Global as u8]),
(vec![], vec![TestTopic::Global as u8]),
],
};

run_definition.into_run().await
Expand Down
15 changes: 8 additions & 7 deletions cdn-broker/benches/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::time::Duration;
use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_proto::message::{Direct, Message, Topic};
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Direct, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};

Expand Down Expand Up @@ -75,7 +76,7 @@ fn bench_direct_user_to_self(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global]],
connected_users: vec![vec![TestTopic::Global as u8]],
connected_brokers: vec![],
};

Expand All @@ -98,7 +99,7 @@ fn bench_direct_user_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![],
};

Expand All @@ -121,8 +122,8 @@ fn bench_direct_user_to_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_brokers: vec![(vec![2], vec![Topic::Global])],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
};

run_definition.into_run().await
Expand All @@ -144,8 +145,8 @@ fn bench_direct_broker_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_brokers: vec![(vec![2], vec![Topic::Global])],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
};

run_definition.into_run().await
Expand Down
2 changes: 1 addition & 1 deletion cdn-broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<Def: RunDef> Connections<Def> {
let removed = previous.difference(&now);

// Clone them
let differences = (added.cloned().collect(), removed.cloned().collect());
let differences = (added.copied().collect(), removed.copied().collect());

// Set the previous to the new one
*previous = now.clone();
Expand Down
31 changes: 16 additions & 15 deletions cdn-broker/src/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;

use cdn_proto::{
connection::{protocols::Connection, Bytes},
message::{Broadcast, Message, Topic},
def::TestTopic,
message::{Broadcast, Message},
};
use tokio::time::{sleep, timeout};

Expand All @@ -21,13 +22,13 @@ async fn test_broadcast_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![
vec![Topic::Global, Topic::DA],
vec![Topic::DA],
vec![Topic::Global],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
vec![TestTopic::DA as u8],
vec![TestTopic::Global as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![4], vec![Topic::Global, Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]),
(vec![5], vec![]),
],
};
Expand All @@ -40,7 +41,7 @@ async fn test_broadcast_user() {

// Create a broadcast message with the global topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"test broadcast global".to_vec(),
});

Expand All @@ -59,7 +60,7 @@ async fn test_broadcast_user() {

// Now we test the DA topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::DA],
topics: vec![TestTopic::DA as u8],
message: b"test broadcast DA".to_vec(),
});

Expand Down Expand Up @@ -87,13 +88,13 @@ async fn test_broadcast_broker() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![
vec![Topic::Global, Topic::DA],
vec![Topic::DA],
vec![Topic::Global],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
vec![TestTopic::DA as u8],
vec![TestTopic::Global as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![4], vec![Topic::Global, Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]),
(vec![5], vec![]),
],
};
Expand All @@ -106,7 +107,7 @@ async fn test_broadcast_broker() {

// Create a broadcast message with the global topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"test broadcast global".to_vec(),
});

Expand All @@ -124,7 +125,7 @@ async fn test_broadcast_broker() {

// Now we test the DA topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::DA],
topics: vec![TestTopic::DA as u8],
message: b"test broadcast DA.".to_vec(),
});

Expand Down
24 changes: 17 additions & 7 deletions cdn-broker/src/tests/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;

use cdn_proto::{
connection::{protocols::Connection, Bytes},
message::{Direct, Message, Topic},
def::TestTopic,
message::{Direct, Message},
};
use tokio::time::{sleep, timeout};

Expand All @@ -21,9 +22,12 @@ use crate::{assert_received, send_message_as};
async fn test_direct_user_to_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]],
connected_users: vec![
vec![TestTopic::Global as u8],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
],
connected_brokers: vec![
(vec![2], vec![Topic::DA]),
(vec![2], vec![TestTopic::DA as u8]),
(vec![3], vec![]),
(vec![4], vec![]),
],
Expand Down Expand Up @@ -73,9 +77,12 @@ async fn test_direct_user_to_user() {
async fn test_direct_user_to_broker() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]],
connected_users: vec![
vec![TestTopic::Global as u8],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![2], vec![]),
(vec![4], vec![]),
],
Expand Down Expand Up @@ -109,9 +116,12 @@ async fn test_direct_user_to_broker() {
async fn test_direct_broker_to_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]],
connected_users: vec![
vec![TestTopic::Global as u8],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![2], vec![]),
(vec![4], vec![]),
],
Expand Down
7 changes: 5 additions & 2 deletions cdn-client/src/binaries/bad-connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use std::time::Duration;

use cdn_client::{Client, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic};
use cdn_proto::{
crypto::signature::KeyPair,
def::{ProductionClientConnection, TestTopic},
};
use clap::Parser;
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
Expand Down Expand Up @@ -52,7 +55,7 @@ async fn main() {
public_key,
private_key,
},
subscribed_topics: vec![Topic::Global],
subscribed_topics: vec![TestTopic::Global as u8],
use_local_authority: true,
};

Expand Down
9 changes: 6 additions & 3 deletions cdn-client/src/binaries/bad-sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! This is useful for testing the broker's ability to handle many messages.

use cdn_client::{Client, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic};
use cdn_proto::{
crypto::signature::KeyPair,
def::{ProductionClientConnection, TestTopic},
};
use clap::Parser;
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
Expand Down Expand Up @@ -48,7 +51,7 @@ async fn main() {
public_key,
private_key,
},
subscribed_topics: vec![Topic::Global],
subscribed_topics: vec![TestTopic::Global as u8],
use_local_authority: true,
};

Expand Down Expand Up @@ -77,7 +80,7 @@ async fn main() {

// Send a direct message to ourselves
if let Err(e) = client
.send_broadcast_message(vec![Topic::Global], message.clone())
.send_broadcast_message(vec![TestTopic::Global as u8], message.clone())
.await
{
println!("failed to send broadcast message: {e:?}");
Expand Down
10 changes: 5 additions & 5 deletions cdn-client/src/binaries/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::time::Duration;
use cdn_client::{Client, Config};
use cdn_proto::{
crypto::signature::{KeyPair, Serializable},
def::ProductionClientConnection,
message::{Broadcast, Direct, Message, Topic},
def::{ProductionClientConnection, TestTopic},
message::{Broadcast, Direct, Message},
};
use clap::Parser;
use jf_primitives::signatures::{
Expand Down Expand Up @@ -56,7 +56,7 @@ async fn main() {
public_key,
private_key,
},
subscribed_topics: vec![Topic::Global],
subscribed_topics: vec![TestTopic::Global as u8],
use_local_authority: true,
};

Expand Down Expand Up @@ -91,7 +91,7 @@ async fn main() {

// Send a broadcast message to the global topic
client
.send_broadcast_message(vec![Topic::Global], b"hello broadcast".to_vec())
.send_broadcast_message(vec![TestTopic::Global as u8], b"hello broadcast".to_vec())
.await
.expect("failed to send message");
info!("broadcasted \"hello broadcast\" to ourselves");
Expand All @@ -106,7 +106,7 @@ async fn main() {
assert!(
message
== Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"hello broadcast".to_vec()
})
);
Expand Down
1 change: 1 addition & 0 deletions cdn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ description = "Contains the common protocol definition and common code for the b

[build-dependencies]
rcgen.workspace = true
capnpc = "0.19"

[features]
metrics = ["dep:prometheus"]
Expand Down
Loading