From a0a038fafa3fe4540adf98ebf832ca9adf199c0d Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 6 May 2024 11:23:17 -0400 Subject: [PATCH 1/3] move topics to application layer --- Cargo.lock | 10 + README.md | 6 +- cdn-broker/benches/broadcast.rs | 14 +- cdn-broker/benches/direct.rs | 15 +- cdn-broker/src/connections/mod.rs | 2 +- cdn-broker/src/tests/broadcast.rs | 31 +-- cdn-broker/src/tests/direct.rs | 24 ++- cdn-client/src/binaries/bad-connector.rs | 7 +- cdn-client/src/binaries/bad-sender.rs | 9 +- cdn-client/src/binaries/client.rs | 10 +- cdn-proto/Cargo.toml | 1 + cdn-proto/schema/messages.capnp | 23 +-- cdn-proto/schema/messages_capnp.rs | 232 +++++++++-------------- cdn-proto/src/def.rs | 9 + cdn-proto/src/message.rs | 124 ++---------- tests/src/tests/basic_connect.rs | 7 +- tests/src/tests/double_connect.rs | 10 +- tests/src/tests/whitelist.rs | 10 +- 18 files changed, 216 insertions(+), 328 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5301238..5246481 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -782,6 +782,15 @@ dependencies = [ "embedded-io", ] +[[package]] +name = "capnpc" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ba30e0f08582d53c2f3710cf4bb65ff562614b1ba86906d7391adffe189ec" +dependencies = [ + "capnp", +] + [[package]] name = "cast" version = "0.3.0" @@ -854,6 +863,7 @@ dependencies = [ "ark-serialize", "async-trait", "capnp", + "capnpc", "criterion", "derivative", "jf-primitives", diff --git a/README.md b/README.md index c97e8fb..890e68b 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ async fn main() { private_key, }) // Subscribe to the global consensus topic - .subscribed_topics(vec![Topic::Global]) + .subscribed_topics(vec![TestTopic::Global as u8]) .build() .unwrap(); @@ -70,7 +70,7 @@ async fn main() { // Send a broadcast message to the global topic client - .send_broadcast_message(vec![Topic::Global], b"hello broadcast".to_vec()) + .send_broadcast_message(vec![TestTopic::Global as u8], b"hello broadcast".to_vec()) .await .unwrap(); @@ -84,7 +84,7 @@ async fn main() { assert!( message == Message::Broadcast(Broadcast { - topics: vec![Topic::Global], + topics: vec![TestTopic::Global as u8], message: b"hello broadcast".to_vec() }) ); diff --git a/cdn-broker/benches/broadcast.rs b/cdn-broker/benches/broadcast.rs index d1bb007..4688303 100644 --- a/cdn-broker/benches/broadcast.rs +++ b/cdn-broker/benches/broadcast.rs @@ -6,7 +6,8 @@ use std::time::Duration; use cdn_broker::reexports::tests::{TestDefinition, TestRun}; use cdn_broker::{assert_received, send_message_as}; use cdn_proto::connection::{protocols::Connection as _, Bytes}; -use cdn_proto::message::{Broadcast, Message, Topic}; +use cdn_proto::def::TestTopic; +use cdn_proto::message::{Broadcast, Message}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use pprof::criterion::{Output, PProfProfiler}; @@ -14,7 +15,7 @@ use pprof::criterion::{Output, PProfProfiler}; async fn broadcast_user(run: &TestRun) { // Allocate a rather large message let message = Message::Broadcast(Broadcast { - topics: vec![Topic::Global], + topics: vec![TestTopic::Global as u8], message: vec![0; 10000], }); @@ -28,7 +29,7 @@ async fn broadcast_user(run: &TestRun) { async fn broadcast_broker(run: &TestRun) { // Allocate a rather large message let message = Message::Broadcast(Broadcast { - topics: vec![Topic::Global], + topics: vec![TestTopic::Global as u8], message: vec![0; 10000], }); @@ -48,7 +49,7 @@ fn bench_broadcast_user(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global]], + connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], connected_brokers: vec![], }; @@ -71,7 +72,10 @@ fn bench_broadcast_broker(c: &mut Criterion) { let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { connected_users: vec![vec![]], - connected_brokers: vec![(vec![], vec![Topic::Global]), (vec![], vec![Topic::Global])], + connected_brokers: vec![ + (vec![], vec![TestTopic::Global as u8]), + (vec![], vec![TestTopic::Global as u8]), + ], }; run_definition.into_run().await diff --git a/cdn-broker/benches/direct.rs b/cdn-broker/benches/direct.rs index 648ffcd..d94edbe 100644 --- a/cdn-broker/benches/direct.rs +++ b/cdn-broker/benches/direct.rs @@ -6,7 +6,8 @@ use std::time::Duration; use cdn_broker::reexports::tests::{TestDefinition, TestRun}; use cdn_broker::{assert_received, send_message_as}; use cdn_proto::connection::{protocols::Connection as _, Bytes}; -use cdn_proto::message::{Direct, Message, Topic}; +use cdn_proto::def::TestTopic; +use cdn_proto::message::{Direct, Message}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use pprof::criterion::{Output, PProfProfiler}; @@ -75,7 +76,7 @@ fn bench_direct_user_to_self(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global]], + connected_users: vec![vec![TestTopic::Global as u8]], connected_brokers: vec![], }; @@ -98,7 +99,7 @@ fn bench_direct_user_to_user(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global]], + connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], connected_brokers: vec![], }; @@ -121,8 +122,8 @@ fn bench_direct_user_to_broker(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global]], - connected_brokers: vec![(vec![2], vec![Topic::Global])], + connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], + connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])], }; run_definition.into_run().await @@ -144,8 +145,8 @@ fn bench_direct_broker_to_user(c: &mut Criterion) { // Set up our broker under test let run = benchmark_runtime.block_on(async move { let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global]], - connected_brokers: vec![(vec![2], vec![Topic::Global])], + connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]], + connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])], }; run_definition.into_run().await diff --git a/cdn-broker/src/connections/mod.rs b/cdn-broker/src/connections/mod.rs index db6b3f1..b6049e3 100644 --- a/cdn-broker/src/connections/mod.rs +++ b/cdn-broker/src/connections/mod.rs @@ -203,7 +203,7 @@ impl Connections { let removed = previous.difference(&now); // Clone them - let differences = (added.cloned().collect(), removed.cloned().collect()); + let differences = (added.copied().collect(), removed.copied().collect()); // Set the previous to the new one *previous = now.clone(); diff --git a/cdn-broker/src/tests/broadcast.rs b/cdn-broker/src/tests/broadcast.rs index ce35d93..c2d867b 100644 --- a/cdn-broker/src/tests/broadcast.rs +++ b/cdn-broker/src/tests/broadcast.rs @@ -5,7 +5,8 @@ use std::time::Duration; use cdn_proto::{ connection::{protocols::Connection, Bytes}, - message::{Broadcast, Message, Topic}, + def::TestTopic, + message::{Broadcast, Message}, }; use tokio::time::{sleep, timeout}; @@ -21,13 +22,13 @@ async fn test_broadcast_user() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![Topic::Global, Topic::DA], - vec![Topic::DA], - vec![Topic::Global], + vec![TestTopic::Global as u8, TestTopic::DA as u8], + vec![TestTopic::DA as u8], + vec![TestTopic::Global as u8], ], connected_brokers: vec![ - (vec![3], vec![Topic::DA]), - (vec![4], vec![Topic::Global, Topic::DA]), + (vec![3], vec![TestTopic::DA as u8]), + (vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]), (vec![5], vec![]), ], }; @@ -40,7 +41,7 @@ async fn test_broadcast_user() { // Create a broadcast message with the global topic let message = Message::Broadcast(Broadcast { - topics: vec![Topic::Global], + topics: vec![TestTopic::Global as u8], message: b"test broadcast global".to_vec(), }); @@ -59,7 +60,7 @@ async fn test_broadcast_user() { // Now we test the DA topic let message = Message::Broadcast(Broadcast { - topics: vec![Topic::DA], + topics: vec![TestTopic::DA as u8], message: b"test broadcast DA".to_vec(), }); @@ -87,13 +88,13 @@ async fn test_broadcast_broker() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { connected_users: vec![ - vec![Topic::Global, Topic::DA], - vec![Topic::DA], - vec![Topic::Global], + vec![TestTopic::Global as u8, TestTopic::DA as u8], + vec![TestTopic::DA as u8], + vec![TestTopic::Global as u8], ], connected_brokers: vec![ - (vec![3], vec![Topic::DA]), - (vec![4], vec![Topic::Global, Topic::DA]), + (vec![3], vec![TestTopic::DA as u8]), + (vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]), (vec![5], vec![]), ], }; @@ -106,7 +107,7 @@ async fn test_broadcast_broker() { // Create a broadcast message with the global topic let message = Message::Broadcast(Broadcast { - topics: vec![Topic::Global], + topics: vec![TestTopic::Global as u8], message: b"test broadcast global".to_vec(), }); @@ -124,7 +125,7 @@ async fn test_broadcast_broker() { // Now we test the DA topic let message = Message::Broadcast(Broadcast { - topics: vec![Topic::DA], + topics: vec![TestTopic::DA as u8], message: b"test broadcast DA.".to_vec(), }); diff --git a/cdn-broker/src/tests/direct.rs b/cdn-broker/src/tests/direct.rs index 2a96db6..de2f166 100644 --- a/cdn-broker/src/tests/direct.rs +++ b/cdn-broker/src/tests/direct.rs @@ -5,7 +5,8 @@ use std::time::Duration; use cdn_proto::{ connection::{protocols::Connection, Bytes}, - message::{Direct, Message, Topic}, + def::TestTopic, + message::{Direct, Message}, }; use tokio::time::{sleep, timeout}; @@ -21,9 +22,12 @@ use crate::{assert_received, send_message_as}; async fn test_direct_user_to_user() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]], + connected_users: vec![ + vec![TestTopic::Global as u8], + vec![TestTopic::Global as u8, TestTopic::DA as u8], + ], connected_brokers: vec![ - (vec![2], vec![Topic::DA]), + (vec![2], vec![TestTopic::DA as u8]), (vec![3], vec![]), (vec![4], vec![]), ], @@ -73,9 +77,12 @@ async fn test_direct_user_to_user() { async fn test_direct_user_to_broker() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]], + connected_users: vec![ + vec![TestTopic::Global as u8], + vec![TestTopic::Global as u8, TestTopic::DA as u8], + ], connected_brokers: vec![ - (vec![3], vec![Topic::DA]), + (vec![3], vec![TestTopic::DA as u8]), (vec![2], vec![]), (vec![4], vec![]), ], @@ -109,9 +116,12 @@ async fn test_direct_user_to_broker() { async fn test_direct_broker_to_user() { // This run definition: 3 brokers, 6 users let run_definition = TestDefinition { - connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]], + connected_users: vec![ + vec![TestTopic::Global as u8], + vec![TestTopic::Global as u8, TestTopic::DA as u8], + ], connected_brokers: vec![ - (vec![3], vec![Topic::DA]), + (vec![3], vec![TestTopic::DA as u8]), (vec![2], vec![]), (vec![4], vec![]), ], diff --git a/cdn-client/src/binaries/bad-connector.rs b/cdn-client/src/binaries/bad-connector.rs index 64cf64c..fc9351a 100644 --- a/cdn-client/src/binaries/bad-connector.rs +++ b/cdn-client/src/binaries/bad-connector.rs @@ -4,7 +4,10 @@ use std::time::Duration; use cdn_client::{Client, Config}; -use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic}; +use cdn_proto::{ + crypto::signature::KeyPair, + def::{ProductionClientConnection, TestTopic}, +}; use clap::Parser; use jf_primitives::signatures::{ bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme, @@ -52,7 +55,7 @@ async fn main() { public_key, private_key, }, - subscribed_topics: vec![Topic::Global], + subscribed_topics: vec![TestTopic::Global as u8], use_local_authority: true, }; diff --git a/cdn-client/src/binaries/bad-sender.rs b/cdn-client/src/binaries/bad-sender.rs index baac8c3..52c569c 100644 --- a/cdn-client/src/binaries/bad-sender.rs +++ b/cdn-client/src/binaries/bad-sender.rs @@ -2,7 +2,10 @@ //! This is useful for testing the broker's ability to handle many messages. use cdn_client::{Client, Config}; -use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic}; +use cdn_proto::{ + crypto::signature::KeyPair, + def::{ProductionClientConnection, TestTopic}, +}; use clap::Parser; use jf_primitives::signatures::{ bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme, @@ -48,7 +51,7 @@ async fn main() { public_key, private_key, }, - subscribed_topics: vec![Topic::Global], + subscribed_topics: vec![TestTopic::Global as u8], use_local_authority: true, }; @@ -77,7 +80,7 @@ async fn main() { // Send a direct message to ourselves if let Err(e) = client - .send_broadcast_message(vec![Topic::Global], message.clone()) + .send_broadcast_message(vec![TestTopic::Global as u8], message.clone()) .await { println!("failed to send broadcast message: {e:?}"); diff --git a/cdn-client/src/binaries/client.rs b/cdn-client/src/binaries/client.rs index a3a74d1..3c090d6 100644 --- a/cdn-client/src/binaries/client.rs +++ b/cdn-client/src/binaries/client.rs @@ -7,8 +7,8 @@ use std::time::Duration; use cdn_client::{Client, Config}; use cdn_proto::{ crypto::signature::{KeyPair, Serializable}, - def::ProductionClientConnection, - message::{Broadcast, Direct, Message, Topic}, + def::{ProductionClientConnection, TestTopic}, + message::{Broadcast, Direct, Message}, }; use clap::Parser; use jf_primitives::signatures::{ @@ -56,7 +56,7 @@ async fn main() { public_key, private_key, }, - subscribed_topics: vec![Topic::Global], + subscribed_topics: vec![TestTopic::Global as u8], use_local_authority: true, }; @@ -91,7 +91,7 @@ async fn main() { // Send a broadcast message to the global topic client - .send_broadcast_message(vec![Topic::Global], b"hello broadcast".to_vec()) + .send_broadcast_message(vec![TestTopic::Global as u8], b"hello broadcast".to_vec()) .await .expect("failed to send message"); info!("broadcasted \"hello broadcast\" to ourselves"); @@ -106,7 +106,7 @@ async fn main() { assert!( message == Message::Broadcast(Broadcast { - topics: vec![Topic::Global], + topics: vec![TestTopic::Global as u8], message: b"hello broadcast".to_vec() }) ); diff --git a/cdn-proto/Cargo.toml b/cdn-proto/Cargo.toml index bffdfd2..0170c17 100644 --- a/cdn-proto/Cargo.toml +++ b/cdn-proto/Cargo.toml @@ -6,6 +6,7 @@ description = "Contains the common protocol definition and common code for the b [build-dependencies] rcgen.workspace = true +capnpc = "0.19" [features] metrics = ["dep:prometheus"] diff --git a/cdn-proto/schema/messages.capnp b/cdn-proto/schema/messages.capnp index 19c234a..3fa2c16 100644 --- a/cdn-proto/schema/messages.capnp +++ b/cdn-proto/schema/messages.capnp @@ -17,32 +17,15 @@ struct Message { broadcast @4 :Broadcast; # A message denoting we'd like to subscribe to some topics - subscribe @5 :List(Topic); + subscribe @5 :List(UInt8); # A message denoting we'd like to unsubscribe from some topics - unsubscribe @6 :List(Topic); + unsubscribe @6 :List(UInt8); # A message containing a map which we use to converge on user connection state userSync @7: Data; } } -# An enum for users to specify topics for subscription and unsubscription. -# Also used on the sending side, where messages can be marked with -# a topic and propagated to the interested users. -enum Topic { - # The global consensus topic. All conseneus participants should be subscribed - # to this. - global @0; - - # The DA-specfic topic. Only participants in the DA committee should want to - # be subscribed to this. - da @1; - - # The topic with transactions. Only soon-to-be-leaders should want to be - # subscribed to this. - transactions @2; -} - # This message is used to authenticate the client to a marshal or a broker # to a broker. It contains a way of proving identity of the sender. struct AuthenticateWithKey { @@ -85,7 +68,7 @@ struct Direct { # vector of topics to denote interest. struct Broadcast { # The topics to sent the message to - topics @0: List(Topic); + topics @0: List(UInt8); # The actual message data message @1: Data; } \ No newline at end of file diff --git a/cdn-proto/schema/messages_capnp.rs b/cdn-proto/schema/messages_capnp.rs index c1d9290..93a4500 100644 --- a/cdn-proto/schema/messages_capnp.rs +++ b/cdn-proto/schema/messages_capnp.rs @@ -1,6 +1,6 @@ // @generated by the capnpc-rust plugin to the Cap'n Proto schema compiler. // DO NOT EDIT. -// source: messages.capnp +// source: cdn-proto/schema/messages.capnp pub mod message { @@ -282,12 +282,12 @@ pub mod message { !self.builder.is_pointer_field_null(0) } #[inline] - pub fn set_subscribe(&mut self, value: impl ::capnp::traits::SetterInput<::capnp::enum_list::Owned>) -> ::capnp::Result<()> { + pub fn set_subscribe(&mut self, value: impl ::capnp::traits::SetterInput<::capnp::primitive_list::Owned>) -> ::capnp::Result<()> { self.builder.set_data_field::(0, 5); ::capnp::traits::SetterInput::set_pointer_builder(self.builder.reborrow().get_pointer_field(0), value, false) } #[inline] - pub fn init_subscribe(self, size: u32) -> ::capnp::enum_list::Builder<'a,crate::messages_capnp::Topic> { + pub fn init_subscribe(self, size: u32) -> ::capnp::primitive_list::Builder<'a,u8> { self.builder.set_data_field::(0, 5); ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(0), size) } @@ -297,12 +297,12 @@ pub mod message { !self.builder.is_pointer_field_null(0) } #[inline] - pub fn set_unsubscribe(&mut self, value: impl ::capnp::traits::SetterInput<::capnp::enum_list::Owned>) -> ::capnp::Result<()> { + pub fn set_unsubscribe(&mut self, value: impl ::capnp::traits::SetterInput<::capnp::primitive_list::Owned>) -> ::capnp::Result<()> { self.builder.set_data_field::(0, 6); ::capnp::traits::SetterInput::set_pointer_builder(self.builder.reborrow().get_pointer_field(0), value, false) } #[inline] - pub fn init_unsubscribe(self, size: u32) -> ::capnp::enum_list::Builder<'a,crate::messages_capnp::Topic> { + pub fn init_unsubscribe(self, size: u32) -> ::capnp::primitive_list::Builder<'a,u8> { self.builder.set_data_field::(0, 6); ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(0), size) } @@ -383,22 +383,24 @@ pub mod message { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 155] = [ + pub static ENCODED_NODE: [::capnp::Word; 157] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(207, 214, 176, 234, 122, 81, 72, 204), - ::capnp::word(15, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(32, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), ::capnp::word(1, 0, 7, 0, 0, 0, 8, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 186, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 66, 1, 0, 0), + ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(25, 0, 0, 0, 199, 1, 0, 0), + ::capnp::word(33, 0, 0, 0, 199, 1, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 77), - ::capnp::word(101, 115, 115, 97, 103, 101, 0, 0), + ::capnp::word(99, 100, 110, 45, 112, 114, 111, 116), + ::capnp::word(111, 47, 115, 99, 104, 101, 109, 97), + ::capnp::word(47, 109, 101, 115, 115, 97, 103, 101), + ::capnp::word(115, 46, 99, 97, 112, 110, 112, 58), + ::capnp::word(77, 101, 115, 115, 97, 103, 101, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(32, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 255, 255, 0, 0, 0, 0), @@ -510,8 +512,8 @@ pub mod message { ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(15, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(62, 140, 150, 52, 165, 220, 58, 222), + ::capnp::word(6, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), @@ -523,8 +525,8 @@ pub mod message { ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(15, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(62, 140, 150, 52, 165, 220, 58, 222), + ::capnp::word(6, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), @@ -547,8 +549,8 @@ pub mod message { 2 => ::introspect(), 3 => ::introspect(), 4 => ::introspect(), - 5 => <::capnp::enum_list::Owned as ::capnp::introspect::Introspect>::introspect(), - 6 => <::capnp::enum_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 5 => <::capnp::primitive_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 6 => <::capnp::primitive_list::Owned as ::capnp::introspect::Introspect>::introspect(), 7 => <::capnp::data::Owned as ::capnp::introspect::Introspect>::introspect(), _ => panic!("invalid field index {}", index), } @@ -577,78 +579,8 @@ pub mod message { Unsubscribe(A6), UserSync(A7), } - pub type WhichReader<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result<::capnp::enum_list::Reader<'a,crate::messages_capnp::Topic>>,::capnp::Result<::capnp::enum_list::Reader<'a,crate::messages_capnp::Topic>>,::capnp::Result<::capnp::data::Reader<'a>>>; - pub type WhichBuilder<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result<::capnp::enum_list::Builder<'a,crate::messages_capnp::Topic>>,::capnp::Result<::capnp::enum_list::Builder<'a,crate::messages_capnp::Topic>>,::capnp::Result<::capnp::data::Builder<'a>>>; -} - -#[repr(u16)] -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Topic { - Global = 0, - Da = 1, - Transactions = 2, -} - -impl ::capnp::introspect::Introspect for Topic { - fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Enum(::capnp::introspect::RawEnumSchema { encoded_node: &topic::ENCODED_NODE, annotation_types: topic::get_annotation_types }).into() } -} -impl <'a> ::core::convert::From for ::capnp::dynamic_value::Reader<'a> { - fn from(e: Topic) -> Self { ::capnp::dynamic_value::Enum::new(e.into(), ::capnp::introspect::RawEnumSchema { encoded_node: &topic::ENCODED_NODE, annotation_types: topic::get_annotation_types }.into()).into() } -} -impl ::core::convert::TryFrom for Topic { - type Error = ::capnp::NotInSchema; - fn try_from(value: u16) -> ::core::result::Result>::Error> { - match value { - 0 => ::core::result::Result::Ok(Self::Global), - 1 => ::core::result::Result::Ok(Self::Da), - 2 => ::core::result::Result::Ok(Self::Transactions), - n => ::core::result::Result::Err(::capnp::NotInSchema(n)), - } - } -} -impl From for u16 { - #[inline] - fn from(x: Topic) -> u16 { x as u16 } -} -impl ::capnp::traits::HasTypeId for Topic { - const TYPE_ID: u64 = 0xde3a_dca5_3496_8c3eu64; -} -mod topic { -pub static ENCODED_NODE: [::capnp::Word; 30] = [ - ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), - ::capnp::word(62, 140, 150, 52, 165, 220, 58, 222), - ::capnp::word(15, 0, 0, 0, 2, 0, 0, 0), - ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 170, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 7, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(25, 0, 0, 0, 79, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 84), - ::capnp::word(111, 112, 105, 99, 0, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), - ::capnp::word(12, 0, 0, 0, 1, 0, 2, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 58, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(1, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 26, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(2, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(13, 0, 0, 0, 106, 0, 0, 0), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(103, 108, 111, 98, 97, 108, 0, 0), - ::capnp::word(100, 97, 0, 0, 0, 0, 0, 0), - ::capnp::word(116, 114, 97, 110, 115, 97, 99, 116), - ::capnp::word(105, 111, 110, 115, 0, 0, 0, 0), -]; -pub fn get_annotation_types(child_index: Option, index: u32) -> ::capnp::introspect::Type { - panic!("invalid annotation indices ({:?}, {}) ", child_index, index) -} + pub type WhichReader<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result<::capnp::primitive_list::Reader<'a,u8>>,::capnp::Result<::capnp::primitive_list::Reader<'a,u8>>,::capnp::Result<::capnp::data::Reader<'a>>>; + pub type WhichBuilder<'a,> = Which<::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result>,::capnp::Result<::capnp::primitive_list::Builder<'a,u8>>,::capnp::Result<::capnp::primitive_list::Builder<'a,u8>>,::capnp::Result<::capnp::data::Builder<'a>>>; } pub mod authenticate_with_key { @@ -837,24 +769,26 @@ pub mod authenticate_with_key { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 67] = [ + pub static ENCODED_NODE: [::capnp::Word; 69] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(11, 157, 74, 113, 66, 194, 12, 157), - ::capnp::word(15, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(32, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 26, 1, 0, 0), - ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 162, 1, 0, 0), + ::capnp::word(45, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 175, 0, 0, 0), + ::capnp::word(41, 0, 0, 0, 175, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 65), - ::capnp::word(117, 116, 104, 101, 110, 116, 105, 99), - ::capnp::word(97, 116, 101, 87, 105, 116, 104, 75), - ::capnp::word(101, 121, 0, 0, 0, 0, 0, 0), + ::capnp::word(99, 100, 110, 45, 112, 114, 111, 116), + ::capnp::word(111, 47, 115, 99, 104, 101, 109, 97), + ::capnp::word(47, 109, 101, 115, 115, 97, 103, 101), + ::capnp::word(115, 46, 99, 97, 112, 110, 112, 58), + ::capnp::word(65, 117, 116, 104, 101, 110, 116, 105), + ::capnp::word(99, 97, 116, 101, 87, 105, 116, 104), + ::capnp::word(75, 101, 121, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(12, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -1068,24 +1002,26 @@ pub mod authenticate_with_permit { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 34] = [ + pub static ENCODED_NODE: [::capnp::Word; 36] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(200, 102, 23, 3, 90, 37, 178, 230), - ::capnp::word(15, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(32, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), ::capnp::word(0, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 50, 1, 0, 0), - ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 186, 1, 0, 0), + ::capnp::word(45, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 63, 0, 0, 0), + ::capnp::word(41, 0, 0, 0, 63, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 65), - ::capnp::word(117, 116, 104, 101, 110, 116, 105, 99), - ::capnp::word(97, 116, 101, 87, 105, 116, 104, 80), - ::capnp::word(101, 114, 109, 105, 116, 0, 0, 0), + ::capnp::word(99, 100, 110, 45, 112, 114, 111, 116), + ::capnp::word(111, 47, 115, 99, 104, 101, 109, 97), + ::capnp::word(47, 109, 101, 115, 115, 97, 103, 101), + ::capnp::word(115, 46, 99, 97, 112, 110, 112, 58), + ::capnp::word(65, 117, 116, 104, 101, 110, 116, 105), + ::capnp::word(99, 97, 116, 101, 87, 105, 116, 104), + ::capnp::word(80, 101, 114, 109, 105, 116, 0, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(4, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -1288,24 +1224,26 @@ pub mod authenticate_response { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 49] = [ + pub static ENCODED_NODE: [::capnp::Word; 51] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(190, 25, 35, 89, 245, 47, 214, 169), - ::capnp::word(15, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(32, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 34, 1, 0, 0), - ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 170, 1, 0, 0), + ::capnp::word(45, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 119, 0, 0, 0), + ::capnp::word(41, 0, 0, 0, 119, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 65), - ::capnp::word(117, 116, 104, 101, 110, 116, 105, 99), - ::capnp::word(97, 116, 101, 82, 101, 115, 112, 111), - ::capnp::word(110, 115, 101, 0, 0, 0, 0, 0), + ::capnp::word(99, 100, 110, 45, 112, 114, 111, 116), + ::capnp::word(111, 47, 115, 99, 104, 101, 109, 97), + ::capnp::word(47, 109, 101, 115, 115, 97, 103, 101), + ::capnp::word(115, 46, 99, 97, 112, 110, 112, 58), + ::capnp::word(65, 117, 116, 104, 101, 110, 116, 105), + ::capnp::word(99, 97, 116, 101, 82, 101, 115, 112), + ::capnp::word(111, 110, 115, 101, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(8, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -1536,22 +1474,24 @@ pub mod direct { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 48] = [ + pub static ENCODED_NODE: [::capnp::Word; 50] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(203, 205, 16, 235, 198, 157, 125, 219), - ::capnp::word(15, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(32, 0, 0, 0, 1, 0, 0, 0), ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 178, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 58, 1, 0, 0), + ::capnp::word(37, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(25, 0, 0, 0, 119, 0, 0, 0), + ::capnp::word(33, 0, 0, 0, 119, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 68), - ::capnp::word(105, 114, 101, 99, 116, 0, 0, 0), + ::capnp::word(99, 100, 110, 45, 112, 114, 111, 116), + ::capnp::word(111, 47, 115, 99, 104, 101, 109, 97), + ::capnp::word(47, 109, 101, 115, 115, 97, 103, 101), + ::capnp::word(115, 46, 99, 97, 112, 110, 112, 58), + ::capnp::word(68, 105, 114, 101, 99, 116, 0, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(8, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -1671,7 +1611,7 @@ pub mod broadcast { self.reader.total_size() } #[inline] - pub fn get_topics(self) -> ::capnp::Result<::capnp::enum_list::Reader<'a,crate::messages_capnp::Topic>> { + pub fn get_topics(self) -> ::capnp::Result<::capnp::primitive_list::Reader<'a,u8>> { ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None) } #[inline] @@ -1741,15 +1681,15 @@ pub mod broadcast { self.builder.as_reader().total_size() } #[inline] - pub fn get_topics(self) -> ::capnp::Result<::capnp::enum_list::Builder<'a,crate::messages_capnp::Topic>> { + pub fn get_topics(self) -> ::capnp::Result<::capnp::primitive_list::Builder<'a,u8>> { ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None) } #[inline] - pub fn set_topics(&mut self, value: impl ::capnp::traits::SetterInput<::capnp::enum_list::Owned>) -> ::capnp::Result<()> { + pub fn set_topics(&mut self, value: impl ::capnp::traits::SetterInput<::capnp::primitive_list::Owned>) -> ::capnp::Result<()> { ::capnp::traits::SetterInput::set_pointer_builder(self.builder.reborrow().get_pointer_field(0), value, false) } #[inline] - pub fn init_topics(self, size: u32) -> ::capnp::enum_list::Builder<'a,crate::messages_capnp::Topic> { + pub fn init_topics(self, size: u32) -> ::capnp::primitive_list::Builder<'a,u8> { ::capnp::traits::FromPointerBuilder::init_pointer(self.builder.get_pointer_field(0), size) } #[inline] @@ -1783,23 +1723,25 @@ pub mod broadcast { impl Pipeline { } mod _private { - pub static ENCODED_NODE: [::capnp::Word; 52] = [ + pub static ENCODED_NODE: [::capnp::Word; 54] = [ ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), ::capnp::word(212, 57, 13, 134, 255, 209, 90, 159), - ::capnp::word(15, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(32, 0, 0, 0, 1, 0, 0, 0), ::capnp::word(47, 245, 10, 45, 6, 155, 224, 194), ::capnp::word(2, 0, 7, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(21, 0, 0, 0, 202, 0, 0, 0), - ::capnp::word(33, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 82, 1, 0, 0), + ::capnp::word(41, 0, 0, 0, 7, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(29, 0, 0, 0, 119, 0, 0, 0), + ::capnp::word(37, 0, 0, 0, 119, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(109, 101, 115, 115, 97, 103, 101, 115), - ::capnp::word(46, 99, 97, 112, 110, 112, 58, 66), - ::capnp::word(114, 111, 97, 100, 99, 97, 115, 116), - ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(99, 100, 110, 45, 112, 114, 111, 116), + ::capnp::word(111, 47, 115, 99, 104, 101, 109, 97), + ::capnp::word(47, 109, 101, 115, 115, 97, 103, 101), + ::capnp::word(115, 46, 99, 97, 112, 110, 112, 58), + ::capnp::word(66, 114, 111, 97, 100, 99, 97, 115), + ::capnp::word(116, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), ::capnp::word(8, 0, 0, 0, 3, 0, 4, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), @@ -1821,8 +1763,8 @@ pub mod broadcast { ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 3, 0, 1, 0), - ::capnp::word(15, 0, 0, 0, 0, 0, 0, 0), - ::capnp::word(62, 140, 150, 52, 165, 220, 58, 222), + ::capnp::word(6, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), ::capnp::word(14, 0, 0, 0, 0, 0, 0, 0), @@ -1839,7 +1781,7 @@ pub mod broadcast { ]; pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { match index { - 0 => <::capnp::enum_list::Owned as ::capnp::introspect::Introspect>::introspect(), + 0 => <::capnp::primitive_list::Owned as ::capnp::introspect::Introspect>::introspect(), 1 => <::capnp::data::Owned as ::capnp::introspect::Introspect>::introspect(), _ => panic!("invalid field index {}", index), } diff --git a/cdn-proto/src/def.rs b/cdn-proto/src/def.rs index 687f66f..705cf2c 100644 --- a/cdn-proto/src/def.rs +++ b/cdn-proto/src/def.rs @@ -11,8 +11,15 @@ use crate::crypto::signature::SignatureScheme; use crate::discovery::embedded::Embedded; use crate::discovery::{redis::Redis, DiscoveryClient}; +#[repr(u8)] +pub enum TestTopic { + Global = 0, + DA = 1, +} + /// This trait defines the run configuration for all CDN components. pub trait RunDef: 'static { + const SUPPORTED_TOPICS: &'static [u8]; type Broker: ConnectionDef; type User: ConnectionDef; type DiscoveryClientType: DiscoveryClient; @@ -29,6 +36,7 @@ pub trait ConnectionDef: 'static { /// Uses the real network protocols and Redis for discovery. pub struct ProductionRunDef; impl RunDef for ProductionRunDef { + const SUPPORTED_TOPICS: &'static [u8] = &[TestTopic::Global as u8, TestTopic::DA as u8]; type Broker = ProductionBrokerConnection; type User = ProductionUserConnection; type DiscoveryClientType = Redis; @@ -67,6 +75,7 @@ impl ConnectionDef for ProductionClientConnection { /// Uses in-memory protocols and an embedded discovery client. pub struct TestingRunDef; impl RunDef for TestingRunDef { + const SUPPORTED_TOPICS: &'static [u8] = &[TestTopic::Global as u8, TestTopic::DA as u8]; type Broker = TestingConnection; type User = TestingConnection; type DiscoveryClientType = Embedded; diff --git a/cdn-proto/src/message.rs b/cdn-proto/src/message.rs index 28a5b22..1bcf1cb 100644 --- a/cdn-proto/src/message.rs +++ b/cdn-proto/src/message.rs @@ -2,15 +2,13 @@ //! messages sent to/from a broker or user. //! TODO: clean up. Maybe use Cap'n'Proto messages directly. -use std::{fmt::Display, result::Result as StdResult}; - use capnp::{ message::ReaderOptions, serialize::{self, write_message_segments_to_words}, }; use crate::{ - bail, bail_option, + bail, error::{Error, Result}, messages_capnp::{ self, authenticate_response, authenticate_with_key, authenticate_with_permit, broadcast, @@ -18,6 +16,9 @@ use crate::{ }, }; +/// A type alias for a `Topic` to disambiguate it from `Vec` +pub type Topic = u8; + /// This is a helper macro for serializing `CapnProto` values. macro_rules! serialize { // Rule to serialize a `Topic`. @@ -32,33 +33,11 @@ macro_rules! checked_to_u32 { }; } -macro_rules! try_get { - ($message: expr, $i: expr) => { - bail!( - bail_option!( - $message.try_get($i), - Deserialize, - "failed to deserialize message" - ), - Deserialize, - "not in schema" - ) - }; -} - /// This is a helper macro for deserializing `CapnProto` values. macro_rules! deserialize { // Rule to deserialize a `Topic`. We need to unwrap quite a few times. - ($func_name:expr, Topic) => { - bail!( - $func_name, - Deserialize, - format!("failed to deserialize topic") - ) - .into_iter() - .filter_map(|topic| topic.ok()) - .map(|topic| topic.into()) - .collect() + ($func_name:expr, Vec) => { + $func_name.into_iter().map(|topic| topic.into()).collect() }; ($message: expr, List) => {{ @@ -166,8 +145,7 @@ impl Message { let mut message: broadcast::Builder = root.init_broadcast(); // Serialize topics - let serialized_topics: Vec = - serialize!(to_serialize.topics.clone(), Topic); + let serialized_topics: Vec = serialize!(to_serialize.topics.clone(), Topic); // Set each field bail!( @@ -192,7 +170,7 @@ impl Message { let mut message = root.init_subscribe(checked_to_u32!(to_serialize.len())); for (i, topic) in to_serialize.iter().enumerate() { - message.set(checked_to_u32!(i), topic.clone().into()); + message.set(checked_to_u32!(i), *topic); } } @@ -201,7 +179,7 @@ impl Message { let mut message = root.init_unsubscribe(checked_to_u32!(to_serialize.len())); for (i, topic) in to_serialize.iter().enumerate() { - message.set(checked_to_u32!(i), topic.clone().into()); + message.set(checked_to_u32!(i), *topic); } } @@ -280,8 +258,14 @@ impl Message { let message = bail!(maybe_message, Deserialize, "failed to deserialize message"); + let topics = bail!( + message.get_topics(), + Deserialize, + "failed to deserialize topics" + ); + Self::Broadcast(Broadcast { - topics: deserialize!(message.get_topics(), Topic), + topics: deserialize!(topics, Vec), message: deserialize!(message.get_message(), Vec), }) } @@ -289,13 +273,13 @@ impl Message { let message = bail!(maybe_message, Deserialize, "failed to deserialize message"); - Self::Subscribe(deserialize!(message, List)) + Self::Subscribe(deserialize!(message, Vec)) } messages_capnp::message::Unsubscribe(maybe_message) => { let message = bail!(maybe_message, Deserialize, "failed to deserialize message"); - Self::Unsubscribe(deserialize!(message, List)) + Self::Unsubscribe(deserialize!(message, Vec)) } messages_capnp::message::UserSync(maybe_message) => { @@ -309,72 +293,6 @@ impl Message { } } -#[derive(PartialEq, Clone, Hash, Eq, Debug)] -/// An enum for users to specify topics for subscription and unsubscription. -/// Also used on the sending side, where messages can be marked with -/// a topic and propagated to the interested users. -pub enum Topic { - /// The global consensus topic. All conseneus participants should be subscribed - /// to this. - Global, - /// The DA-specfic topic. Only participants in the DA committee should want to - /// be subscribed to this. - DA, - /// The topic with transactions. Only soon-to-be-leaders should want to be subscribed to this. - Transactions, -} - -/// We need this because it allows conversions to and from the Cap'n' Proto version -/// of a `Topic` -impl From for Topic { - fn from(value: messages_capnp::Topic) -> Self { - match value { - messages_capnp::Topic::Global => Self::Global, - messages_capnp::Topic::Da => Self::DA, - messages_capnp::Topic::Transactions => Self::Transactions, - } - } -} - -/// We need this because it allows conversions to and from the Cap'n' Proto version -/// of a `Topic` -impl From for messages_capnp::Topic { - fn from(value: Topic) -> Self { - match value { - Topic::Global => Self::Global, - Topic::DA => Self::Da, - Topic::Transactions => Self::Transactions, - } - } -} - -/// We need this to allows conversions to and from a `String` -impl TryInto for String { - type Error = Error; - - fn try_into(self) -> StdResult { - match self.as_str() { - "DA" => Ok(Topic::DA), - "Global" => Ok(Topic::Global), - "Transactions" => Ok(Topic::Transactions), - _ => Err(Error::Parse( - "failed to parse topic: did not exist".to_string(), - )), - } - } -} - -/// We need this to convert a `Topic` to a `String` -impl Display for Topic { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Global => write!(f, "Global"), - Self::DA => write!(f, "DA"), - Self::Transactions => write!(f, "Transactions"), - } - } -} - /// This message is used to authenticate the client to a marshal or a broker /// to a broker. It contains a way of proving identity of the sender. #[derive(Eq, PartialEq, Debug, Clone)] @@ -504,15 +422,15 @@ mod tests { // `Broadcast` message assert_serialize_deserialize!(Message::Broadcast(Broadcast { - topics: vec![Topic::DA, Topic::Global], + topics: vec![0, 1], message: vec![0, 1, 2], })); // `Subscribe` message - assert_serialize_deserialize!(Message::Subscribe(vec![Topic::DA, Topic::Global],)); + assert_serialize_deserialize!(Message::Subscribe(vec![0, 1])); // `Unsubscribe` message - assert_serialize_deserialize!(Message::Unsubscribe(vec![Topic::DA, Topic::Global],)); + assert_serialize_deserialize!(Message::Unsubscribe(vec![0, 1])); // `UserSync` message assert_serialize_deserialize!(Message::UserSync(vec![0u8, 1u8])); diff --git a/tests/src/tests/basic_connect.rs b/tests/src/tests/basic_connect.rs index f2f5e55..f71174f 100644 --- a/tests/src/tests/basic_connect.rs +++ b/tests/src/tests/basic_connect.rs @@ -1,4 +1,7 @@ -use cdn_proto::message::{Direct, Message}; +use cdn_proto::{ + def::TestTopic, + message::{Direct, Message}, +}; use crate::tests::*; @@ -15,7 +18,7 @@ async fn test_end_to_end_connection() { new_marshal("8082", &discovery_endpoint).await; // Create and get the handle to a new client - let client = new_client(0, vec![Topic::Global], "8082"); + let client = new_client(0, vec![TestTopic::Global as u8], "8082"); let client_public_key = keypair_from_seed(0).1; // Send a message to ourself diff --git a/tests/src/tests/double_connect.rs b/tests/src/tests/double_connect.rs index 6488796..7221f3a 100644 --- a/tests/src/tests/double_connect.rs +++ b/tests/src/tests/double_connect.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use cdn_proto::discovery::BrokerIdentifier; +use cdn_proto::{def::TestTopic, discovery::BrokerIdentifier}; use tokio::time::{sleep, timeout}; use super::*; @@ -19,8 +19,8 @@ async fn test_double_connect_same_broker() { new_marshal("8088", &discovery_endpoint).await; // Create 2 clients with the same keypair - let client1 = new_client(1, vec![Topic::Global], "8088"); - let client2 = new_client(1, vec![Topic::Global], "8088"); + let client1 = new_client(1, vec![TestTopic::Global as u8], "8088"); + let client2 = new_client(1, vec![TestTopic::Global as u8], "8088"); // Assert both clients are connected let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { @@ -65,8 +65,8 @@ async fn test_double_connect_different_broker() { new_marshal("8094", &discovery_endpoint).await; // Create 2 clients with the same keypair - let client1 = new_client(1, vec![Topic::Global], "8094"); - let client2 = new_client(1, vec![Topic::Global], "8094"); + let client1 = new_client(1, vec![TestTopic::Global as u8], "8094"); + let client2 = new_client(1, vec![TestTopic::Global as u8], "8094"); // Wait a little sleep(Duration::from_millis(50)).await; diff --git a/tests/src/tests/whitelist.rs b/tests/src/tests/whitelist.rs index 20534d2..f400390 100644 --- a/tests/src/tests/whitelist.rs +++ b/tests/src/tests/whitelist.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use cdn_proto::connection::UserPublicKey; +use cdn_proto::{connection::UserPublicKey, def::TestTopic}; use tokio::time::timeout; use crate::tests::*; @@ -19,11 +19,11 @@ async fn test_whitelist() { // Create a client with keypair 1 let client1_public_key: UserPublicKey = Arc::from(serialized_public_key_from_seed(1)); - let client1 = new_client(1, vec![Topic::Global], "8085"); + let client1 = new_client(1, vec![TestTopic::Global as u8], "8085"); // Create a client with keypair 2 let client2_public_key: UserPublicKey = Arc::from(serialized_public_key_from_seed(2)); - let client2 = new_client(2, vec![Topic::Global], "8085"); + let client2 = new_client(2, vec![TestTopic::Global as u8], "8085"); // Assert both clients can connect let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { @@ -54,8 +54,8 @@ async fn test_whitelist() { .is_ok_and(|x| !x)); // Recreate clients - let client1 = new_client(1, vec![Topic::Global], "8085"); - let client2 = new_client(2, vec![Topic::Global], "8085"); + let client1 = new_client(1, vec![TestTopic::Global as u8], "8085"); + let client2 = new_client(2, vec![TestTopic::Global as u8], "8085"); // Assert we can connect as client1 let Ok(()) = timeout(Duration::from_secs(1), client1.ensure_initialized()).await else { From 442d1da916a4f35882dd2dffdacd28808ba20dff Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 6 May 2024 14:13:30 -0400 Subject: [PATCH 2/3] add comment for test topic --- cdn-proto/src/def.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cdn-proto/src/def.rs b/cdn-proto/src/def.rs index 705cf2c..2f42997 100644 --- a/cdn-proto/src/def.rs +++ b/cdn-proto/src/def.rs @@ -11,6 +11,7 @@ use crate::crypto::signature::SignatureScheme; use crate::discovery::embedded::Embedded; use crate::discovery::{redis::Redis, DiscoveryClient}; +/// The test topics for the CDN. #[repr(u8)] pub enum TestTopic { Global = 0, @@ -19,7 +20,6 @@ pub enum TestTopic { /// This trait defines the run configuration for all CDN components. pub trait RunDef: 'static { - const SUPPORTED_TOPICS: &'static [u8]; type Broker: ConnectionDef; type User: ConnectionDef; type DiscoveryClientType: DiscoveryClient; @@ -36,7 +36,6 @@ pub trait ConnectionDef: 'static { /// Uses the real network protocols and Redis for discovery. pub struct ProductionRunDef; impl RunDef for ProductionRunDef { - const SUPPORTED_TOPICS: &'static [u8] = &[TestTopic::Global as u8, TestTopic::DA as u8]; type Broker = ProductionBrokerConnection; type User = ProductionUserConnection; type DiscoveryClientType = Redis; @@ -75,7 +74,6 @@ impl ConnectionDef for ProductionClientConnection { /// Uses in-memory protocols and an embedded discovery client. pub struct TestingRunDef; impl RunDef for TestingRunDef { - const SUPPORTED_TOPICS: &'static [u8] = &[TestTopic::Global as u8, TestTopic::DA as u8]; type Broker = TestingConnection; type User = TestingConnection; type DiscoveryClientType = Embedded; From 6d8c530a8e44b9d2d66e2026bbb53166003e4b95 Mon Sep 17 00:00:00 2001 From: Rob Date: Mon, 6 May 2024 14:50:30 -0400 Subject: [PATCH 3/3] add to serialization tests --- cdn-proto/src/message.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdn-proto/src/message.rs b/cdn-proto/src/message.rs index 1bcf1cb..997a85c 100644 --- a/cdn-proto/src/message.rs +++ b/cdn-proto/src/message.rs @@ -422,15 +422,15 @@ mod tests { // `Broadcast` message assert_serialize_deserialize!(Message::Broadcast(Broadcast { - topics: vec![0, 1], + topics: vec![0, 1, 99], message: vec![0, 1, 2], })); // `Subscribe` message - assert_serialize_deserialize!(Message::Subscribe(vec![0, 1])); + assert_serialize_deserialize!(Message::Subscribe(vec![0, 1, 99])); // `Unsubscribe` message - assert_serialize_deserialize!(Message::Unsubscribe(vec![0, 1])); + assert_serialize_deserialize!(Message::Unsubscribe(vec![0, 1, 99])); // `UserSync` message assert_serialize_deserialize!(Message::UserSync(vec![0u8, 1u8]));