diff --git a/.cargo/audit.toml b/.cargo/audit.toml index b2cf2d010b..e333111d63 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -43,4 +43,11 @@ ignore = [ # https://rustsec.org/advisories/RUSTSEC-2022-0008 "RUSTSEC-2022-0008", + # ed25519-dalek 1.0.1 + # Dependency of libp2p + # Referenced in this issue: https://github.com/libp2p/rust-libp2p/issues/4327 + # https://rustsec.org/advisories/RUSTSEC-2022-0093 + "RUSTSEC-2022-0093" + + ] diff --git a/Cargo.lock b/Cargo.lock index 35c5c56ee9..5172571abf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -719,9 +719,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" dependencies = [ "event-listener", ] @@ -843,9 +843,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.72" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", @@ -3592,9 +3592,9 @@ dependencies = [ [[package]] name = "libp2p" -version = "0.52.1" +version = "0.52.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38039ba2df4f3255842050845daef4a004cc1f26da03dbc645535088b51910ef" +checksum = "ca4894076bfa3051e4f1725747308861af1e6641213640aeeb784f583e40e7d9" dependencies = [ "bytes 1.4.0", "futures", @@ -3753,9 +3753,9 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.45.0" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e378da62e8c9251f6e885ed173a561663f29b251e745586cf6ae6150b295c37" +checksum = "2d157562dba6017193e5285acf6b1054759e83540bfd79f75b69d6ce774c88da" dependencies = [ "asynchronous-codec", "base64 0.21.2", @@ -3829,9 +3829,9 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.3" +version = "0.44.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2584b0c27f879a1cca4b753fd96874109e5a2f46bd6e30924096456c2ba9b2" +checksum = "fc125f83d8f75322c79e4ade74677d299b34aa5c9d9b5251c03ec28c683cb765" dependencies = [ "arrayvec", "asynchronous-codec", @@ -3880,9 +3880,9 @@ dependencies = [ [[package]] name = "libp2p-metrics" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3787ea81798dcc5bf1d8b40a8e8245cf894b168d04dd70aa48cb3ff2fff141d2" +checksum = "239ba7d28f8d0b5d77760dc6619c05c7e88e74ec8fbbe97f856f20a56745e620" dependencies = [ "instant", "libp2p-core", @@ -4013,9 +4013,9 @@ dependencies = [ [[package]] name = "libp2p-relay" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47b6da29ec69430dd4f1f6d271dd62d6fc26f50f6c2a7ea97eeadc6ff23ff08a" +checksum = "cdb07202cdf103486709fda5d9d10a0297a8ba01c212b1e19b7943c45c1bd7d6" dependencies = [ "asynchronous-codec", "bytes 1.4.0", @@ -4061,9 +4061,9 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.25.0" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20bd837798cdcce4283d2675f08bcd3756a650d56eab4d4367e1b3f27eed6887" +checksum = "49e2cb9befb57e55f53d9463a6ea9b1b8a09a48174ad7be149c9cbebaa5e8e9b" dependencies = [ "async-trait", "futures", @@ -4079,9 +4079,9 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.43.2" +version = "0.43.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43106820057e0f65c77b01a3873593f66e676da4e40c70c3a809b239109f1d30" +checksum = "28016944851bd73526d3c146aabf0fa9bbe27c558f080f9e5447da3a1772c01a" dependencies = [ "async-std", "either", @@ -4179,9 +4179,9 @@ dependencies = [ [[package]] name = "libp2p-yamux" -version = "0.44.0" +version = "0.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a9b42ab6de15c6f076d8fb11dc5f48d899a10b55a2e16b12be9012a05287b0" +checksum = "8eedcb62824c4300efb9cfd4e2a6edaf3ca097b9e68b36dabe45a44469fd6a85" dependencies = [ "futures", "libp2p-core", @@ -7145,11 +7145,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "40de3a2ba249dcb097e01be5e67a5ff53cf250397715a071a81543e8a832a920" dependencies = [ - "autocfg", "backtrace", "bytes 1.4.0", "libc", @@ -7158,7 +7157,7 @@ dependencies = [ "parking_lot", "pin-project-lite 0.2.11", "signal-hook-registry", - "socket2 0.4.9", + "socket2 0.5.3", "tokio-macros", "tracing", "windows-sys", @@ -8106,14 +8105,15 @@ dependencies = [ [[package]] name = "yamux" -version = "0.10.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d9ba232399af1783a58d8eb26f6b5006fbefe2dc9ef36bd283324792d03ea5" +checksum = "0329ef377816896f014435162bb3711ea7a07729c23d0960e6f8048b21b8fe91" dependencies = [ "futures", "log", "nohash-hasher", "parking_lot", + "pin-project", "rand 0.8.5", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index e4f2cb849d..c3af0f135b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,9 +144,9 @@ ark-ed-on-bls12-381 = "0.4.0" ark-serialize = { version = "0.3.0", features = ["derive"] } ark-std = { version = "0.4.0" } async-compatibility-layer = { git = "https://github.com/EspressoSystems/async-compatibility-layer.git", tag = "1.3.0", default-features = false, features = [ "logging-utils" ] } -async-lock = "2.7" +async-lock = "2.8" async-std = { version = "1.12", optional = true } -async-trait = "0.1.71" +async-trait = "0.1.73" bimap = "0.6.3" bincode = "1.3.3" blake3 = { version = "1.4.1", optional = true, features = ["traits-preview"] } @@ -173,7 +173,7 @@ jf-primitives = { git = "https://github.com/EspressoSystems/jellyfish", branch = libp2p-swarm-derive = { version = "=0.33.0" } libp2p-networking = { path = "./libp2p-networking", version = "0.1.0", default-features = false } libp2p-identity = "0.2.0" -libp2p = { version = "0.52.0", default-features = false, features = [ +libp2p = { version = "0.52.2", default-features = false, features = [ "macros", "autonat", "deflate", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 2f97cbbfbe..b359110e66 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -28,9 +28,9 @@ channel-async-std = [ [dependencies] async-compatibility-layer = { git = "https://github.com/EspressoSystems/async-compatibility-layer.git", tag = "1.3.0", default-features = false, features = [ "logging-utils" ] } -async-lock = "2.7" +async-lock = "2.8" async-std = { version = "1.12", optional = true } -async-trait = "0.1.71" +async-trait = "0.1.73" # TODO ed: Delete this dependency after https://github.com/EspressoSystems/HotShot/issues/614 is finished bincode = "1.3.3" commit = { git = "https://github.com/EspressoSystems/commit", tag = "0.2.2" } diff --git a/consensus/src/da_member.rs b/consensus/src/da_member.rs index 59acb0068a..2b1bf1ce34 100644 --- a/consensus/src/da_member.rs +++ b/consensus/src/da_member.rs @@ -13,15 +13,13 @@ use hotshot_types::{ certificate::QuorumCertificate, data::SequencingLeaf, message::{ - ConsensusMessageType, Message, ProcessedCommitteeConsensusMessage, - ProcessedGeneralConsensusMessage, ProcessedSequencingMessage, SequencingMessage, + ConsensusMessageType, ProcessedCommitteeConsensusMessage, ProcessedGeneralConsensusMessage, + ProcessedSequencingMessage, SequencingMessage, }, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::{CommitteeExchangeType, ConsensusExchange}, node_implementation::{ CommitteeEx, CommitteeProposalType, CommitteeVote, NodeImplementation, NodeType, - SequencingExchangesType, }, signature_key::SignatureKey, }, @@ -34,15 +32,13 @@ use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone)] pub struct DAMember< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, ConsensusMessage = SequencingMessage, >, -> where - I::Exchanges: SequencingExchangesType>, -{ +> { /// ID of node. pub id: u64, /// Reference to consensus. DA committee member will require a write lock on this. @@ -67,15 +63,13 @@ pub struct DAMember< impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, ConsensusMessage = SequencingMessage, >, > DAMember -where - I::Exchanges: SequencingExchangesType>, { /// DA committee member task that spins until a valid DA proposal can be signed or timeout is /// hit. diff --git a/consensus/src/leader.rs b/consensus/src/leader.rs index f2d411656f..b7872eafe2 100644 --- a/consensus/src/leader.rs +++ b/consensus/src/leader.rs @@ -1,12 +1,13 @@ //! Contains the [`ValidatingLeader`] struct used for the leader step in the hotstuff consensus algorithm. -use crate::{CommitmentMap, Consensus, ValidatingConsensusApi}; +use crate::{CommitmentMap, Consensus}; use async_compatibility_layer::{ art::{async_sleep, async_timeout}, async_primitives::subscribable_rwlock::{ReadView, SubscribableRwLock}, }; use async_lock::RwLock; use commit::Committable; +use hotshot_types::message::Message; use hotshot_types::{ certificate::QuorumCertificate, data::{ValidatingLeaf, ValidatingProposal}, @@ -14,221 +15,15 @@ use hotshot_types::{ traits::{ consensus_type::validating_consensus::ValidatingConsensus, election::SignedCertificate, - node_implementation::{ - NodeImplementation, NodeType, QuorumProposalType, QuorumVoteType, ValidatingQuorumEx, - }, + node_implementation::{NodeImplementation, NodeType, QuorumProposalType, QuorumVoteType}, signature_key::SignatureKey, Block, State, }, }; -use hotshot_types::{message::Message, traits::node_implementation::ValidatingExchangesType}; use hotshot_types::{ - message::{Proposal, ValidatingMessage}, + message::Proposal, traits::election::{ConsensusExchange, QuorumExchangeType}, }; use std::marker::PhantomData; use std::{sync::Arc, time::Instant}; use tracing::{error, info, instrument, warn}; -/// This view's validating leader -#[derive(Debug, Clone)] -pub struct ValidatingLeader< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, -> where - I::Exchanges: ValidatingExchangesType>, -{ - /// id of node - pub id: u64, - /// Reference to consensus. Validating leader will require a read lock on this. - pub consensus: Arc>>>, - /// The `high_qc` per spec - pub high_qc: QuorumCertificate>, - /// The view number we're running on - pub cur_view: TYPES::Time, - /// Lock over the transactions list - pub transactions: Arc>>, - /// Limited access to the consensus protocol - pub api: A, - - /// the quorum exchange - pub exchange: Arc>, - - /// needed for type checking - pub _pd: PhantomData, -} - -impl< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, - > ValidatingLeader -where - I::Exchanges: ValidatingExchangesType>, - ValidatingQuorumEx: ConsensusExchange< - TYPES, - Message, - Proposal = ValidatingProposal>, - >, -{ - /// Run one view of the leader task - #[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Validating ValidatingLeader Task", level = "error")] - pub async fn run_view(self) -> QuorumCertificate> { - let pk = self.api.public_key(); - info!("Validating leader task started!"); - - let task_start_time = Instant::now(); - let parent_view_number = &self.high_qc.view_number(); - let consensus = self.consensus.read().await; - let mut reached_decided = false; - - let Some(parent_view) = consensus.state_map.get(parent_view_number) else { - warn!("Couldn't find high QC parent in state map."); - return self.high_qc; - }; - let Some(leaf) = parent_view.get_leaf_commitment() else { - warn!( - ?parent_view_number, - ?parent_view, - "Parent of high QC points to a view without a proposal" - ); - return self.high_qc; - }; - let Some(leaf) = consensus.saved_leaves.get(&leaf) else { - warn!("Failed to find high QC parent."); - return self.high_qc; - }; - if leaf.view_number == consensus.last_decided_view { - reached_decided = true; - } - let parent_leaf = leaf.clone(); - - let original_parent_hash = parent_leaf.commit(); - let starting_state = &parent_leaf.state; - - let mut previous_used_txns = parent_leaf.deltas.contained_transactions(); - - let mut next_parent_hash = original_parent_hash; - - if !reached_decided { - while let Some(next_parent_leaf) = consensus.saved_leaves.get(&next_parent_hash) { - if next_parent_leaf.view_number <= consensus.last_decided_view { - break; - } - let next_parent_txns = next_parent_leaf.deltas.contained_transactions(); - for next_parent_txn in next_parent_txns { - previous_used_txns.insert(next_parent_txn); - } - next_parent_hash = next_parent_leaf.parent_commitment; - } - // TODO do some sort of sanity check on the view number that it matches decided - } - - let passed_time = task_start_time - Instant::now(); - async_sleep(self.api.propose_min_round_time() - passed_time).await; - - let receiver = self.transactions.subscribe().await; - let mut block = ::StateType::next_block(Some(starting_state.clone())); - - // Wait until we have min_transactions for the block or we hit propose_max_round_time - while task_start_time.elapsed() < self.api.propose_max_round_time() { - let txns = self.transactions.cloned().await; - let unclaimed_txns: Vec<_> = txns - .iter() - .filter(|(txn_hash, _txn)| !previous_used_txns.contains(txn_hash)) - .collect(); - - let time_past = task_start_time.elapsed(); - if unclaimed_txns.len() < self.api.min_transactions() - && (time_past < self.api.propose_max_round_time()) - { - let duration = self.api.propose_max_round_time() - time_past; - let result = async_timeout(duration, receiver.recv()).await; - match result { - Err(_) => { - // Fall through below to updating new block - info!("propose_max_round_time passed, sending transactions we have so far"); - } - Ok(Err(e)) => { - // Something unprecedented is wrong, and `transactions` has been dropped - error!("Channel receiver error for SubscribableRwLock {:?}", e); - return self.high_qc; - } - Ok(Ok(_)) => continue, - } - } - - // Add unclaimed transactions to the new block - for (_txn_hash, txn) in &unclaimed_txns { - let new_block_check = block.add_transaction_raw(txn); - if let Ok(new_block) = new_block_check { - if starting_state.validate_block(&new_block, &self.cur_view) { - block = new_block; - continue; - } - } - } - break; - } - - consensus - .metrics - .proposal_wait_duration - .add_point(task_start_time.elapsed().as_secs_f64()); - - let proposal_build_start = Instant::now(); - - if let Ok(new_state) = starting_state.append(&block, &self.cur_view) { - let leaf = ValidatingLeaf { - view_number: self.cur_view, - height: parent_leaf.height + 1, - justify_qc: self.high_qc.clone(), - parent_commitment: original_parent_hash, - deltas: block, - state: new_state, - rejected: Vec::new(), - timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(), - proposer_id: pk.to_bytes(), - }; - let signature = self - .exchange - .sign_validating_or_commitment_proposal::(&leaf.commit()); - let data: ValidatingProposal> = leaf.into(); - let message = - ValidatingMessage::(GeneralConsensusMessage::Proposal(Proposal { - data, - signature, - })); - consensus - .metrics - .proposal_build_duration - .add_point(proposal_build_start.elapsed().as_secs_f64()); - info!("Sending out proposal {:?}", message); - - if let Err(e) = self - .api - .send_broadcast_message::, QuorumVoteType>( - message.clone(), - ) - .await - { - consensus.metrics.failed_to_send_messages.add(1); - warn!(?message, ?e, "Could not broadcast leader proposal"); - } else { - consensus.metrics.outgoing_broadcast_messages.add(1); - } - } else { - error!("Could not append state in high qc for proposal. Failed to send out proposal."); - } - - self.high_qc.clone() - } -} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f6f1e9294a..a8291d344c 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -12,9 +12,6 @@ #![allow(clippy::module_name_repetitions)] mod da_member; -mod leader; -mod next_leader; -mod replica; mod sequencing_leader; mod sequencing_replica; pub mod traits; @@ -23,13 +20,10 @@ pub mod utils; use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; pub use da_member::DAMember; pub use hotshot_types::traits::node_implementation::ViewQueue; -pub use leader::ValidatingLeader; -pub use next_leader::NextValidatingLeader; -pub use replica::Replica; pub use sequencing_leader::{ConsensusLeader, ConsensusNextLeader, DALeader}; pub use sequencing_replica::SequencingReplica; use std::collections::HashSet; -pub use traits::{ConsensusSharedApi, SequencingConsensusApi, ValidatingConsensusApi}; +pub use traits::{ConsensusSharedApi, SequencingConsensusApi}; pub use utils::{View, ViewInner}; use commit::{Commitment, Committable}; diff --git a/consensus/src/next_leader.rs b/consensus/src/next_leader.rs index 64f590b423..abd447a6ff 100644 --- a/consensus/src/next_leader.rs +++ b/consensus/src/next_leader.rs @@ -1,7 +1,6 @@ //! Contains the [`NextValidatingLeader`] struct used for the next leader step in the hotstuff consensus algorithm. use crate::ConsensusMetrics; -use crate::ValidatingConsensusApi; use async_compatibility_layer::channel::UnboundedReceiver; use async_lock::Mutex; use either::Either; @@ -9,14 +8,12 @@ use hotshot_types::data::ValidatingLeaf; use hotshot_types::message::Message; use hotshot_types::message::ProcessedGeneralConsensusMessage; use hotshot_types::traits::election::ConsensusExchange; -use hotshot_types::traits::node_implementation::{ - NodeImplementation, NodeType, ValidatingExchangesType, ValidatingQuorumEx, -}; +use hotshot_types::traits::node_implementation::{NodeImplementation, NodeType}; use hotshot_types::traits::signature_key::SignatureKey; use hotshot_types::vote::VoteAccumulator; use hotshot_types::{ certificate::QuorumCertificate, - message::{ConsensusMessageType, InternalTrigger, ValidatingMessage}, + message::{ConsensusMessageType, InternalTrigger}, traits::consensus_type::validating_consensus::ValidatingConsensus, vote::QuorumVote, }; @@ -27,140 +24,3 @@ use std::{ sync::Arc, }; use tracing::{info, instrument, warn}; - -/// The next view's validating leader -#[derive(custom_debug::Debug, Clone)] -pub struct NextValidatingLeader< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, -> where - I::Exchanges: ValidatingExchangesType>, -{ - /// id of node - pub id: u64, - /// generic_qc before starting this - pub generic_qc: QuorumCertificate>, - /// channel through which the leader collects votes - #[allow(clippy::type_complexity)] - pub vote_collection_chan: - Arc>>>, - /// The view number we're running on - pub cur_view: TYPES::Time, - /// Limited access to the consensus protocol - pub api: A, - - /// quorum exchange - pub exchange: Arc>, - /// Metrics for reporting stats - #[debug(skip)] - pub metrics: Arc, - - /// needed to type check - pub _pd: PhantomData, -} - -impl< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, - > NextValidatingLeader -where - I::Exchanges: ValidatingExchangesType>, - ValidatingQuorumEx: ConsensusExchange< - TYPES, - Message, - Certificate = QuorumCertificate>, - Commitment = ValidatingLeaf, - >, -{ - /// Run one view of the next leader task - /// # Panics - /// While we are unwrapping, this function can logically never panic - /// unless there is a bug in std - #[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Next Validating ValidatingLeader Task", level = "error")] - pub async fn run_view(self) -> QuorumCertificate> { - info!("Next validating leader task started!"); - - let vote_collection_start = Instant::now(); - - let mut qcs = HashSet::>>::new(); - qcs.insert(self.generic_qc.clone()); - - let mut accumlator = VoteAccumulator { - total_vote_outcomes: HashMap::new(), - yes_vote_outcomes: HashMap::new(), - no_vote_outcomes: HashMap::new(), - viewsync_precommit_vote_outcomes: HashMap::new(), - success_threshold: self.exchange.success_threshold(), - failure_threshold: self.exchange.failure_threshold(), - }; - - let lock = self.vote_collection_chan.lock().await; - while let Ok(msg) = lock.recv().await { - // If the message is for a different view number, skip it. - if Into::>::into(msg.clone()).view_number() != self.cur_view { - continue; - } - match msg { - ProcessedGeneralConsensusMessage::Vote(vote_message, sender) => { - match vote_message { - QuorumVote::Yes(vote) | QuorumVote::No(vote) => { - if vote.signature.0 - != ::to_bytes(&sender) - { - continue; - } - match self.exchange.accumulate_vote( - &vote.signature.0, - &vote.signature.1, - vote.leaf_commitment, - vote.vote_data, - vote.vote_token.clone(), - self.cur_view, - accumlator, - None, - ) { - Either::Left(acc) => { - accumlator = acc; - } - Either::Right(qc) => { - self.metrics - .vote_validate_duration - .add_point(vote_collection_start.elapsed().as_secs_f64()); - return qc; - } - } - } - QuorumVote::Timeout(vote) => { - qcs.insert(vote.justify_qc); - } - } - } - ProcessedGeneralConsensusMessage::InternalTrigger(trigger) => match trigger { - InternalTrigger::Timeout(_) => { - self.api.send_next_leader_timeout(self.cur_view).await; - break; - } - }, - ProcessedGeneralConsensusMessage::Proposal(_p, _sender) => { - warn!("The next leader has received an unexpected proposal!"); - } - ProcessedGeneralConsensusMessage::ViewSyncCertificate(_) => todo!(), - ProcessedGeneralConsensusMessage::ViewSyncVote(_) => todo!(), - } - } - - qcs.into_iter() - .max_by_key(hotshot_types::traits::election::SignedCertificate::view_number) - .unwrap() - } -} diff --git a/consensus/src/replica.rs b/consensus/src/replica.rs index fb8a1e9269..e69de29bb2 100644 --- a/consensus/src/replica.rs +++ b/consensus/src/replica.rs @@ -1,549 +0,0 @@ -//! Contains the [`Replica`] struct used for the replica step in the hotstuff consensus algorithm. - -use crate::{ - utils::{Terminator, View, ViewInner}, - Consensus, ValidatingConsensusApi, -}; -use async_compatibility_layer::channel::UnboundedReceiver; -use async_lock::{Mutex, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; -use bincode::Options; -use commit::Committable; -use hotshot_types::traits::election::ConsensusExchange; -use hotshot_types::traits::election::QuorumExchangeType; -use hotshot_types::traits::node_implementation::{ - NodeImplementation, QuorumProposalType, ValidatingExchangesType, ValidatingQuorumEx, -}; -use hotshot_types::{ - certificate::QuorumCertificate, - data::{ValidatingLeaf, ValidatingProposal}, - message::{ - ConsensusMessageType, InternalTrigger, Message, ProcessedGeneralConsensusMessage, - ValidatingMessage, - }, - traits::{ - consensus_type::validating_consensus::ValidatingConsensus, node_implementation::NodeType, - signature_key::SignatureKey, Block, State, - }, - vote::QuorumVote, -}; -use hotshot_utils::bincode::bincode_opts; -use std::marker::PhantomData; -use std::ops::Bound::{Excluded, Included}; -use std::{collections::HashSet, sync::Arc}; -use tracing::{error, info, instrument, warn}; - -/// This view's replica -#[derive(Debug, Clone)] -pub struct Replica< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, -> where - I::Exchanges: ValidatingExchangesType>, -{ - /// id of node - pub id: u64, - /// Reference to consensus. Replica will require a write lock on this. - pub consensus: Arc>>>, - /// channel for accepting leader proposals and timeouts messages - #[allow(clippy::type_complexity)] - pub proposal_collection_chan: - Arc>>>, - /// view number this view is executing in - pub cur_view: TYPES::Time, - /// genericQC from the pseudocode - pub high_qc: QuorumCertificate>, - /// hotshot consensus api - pub api: A, - - /// quorum exchange - pub exchange: Arc>, - - /// neeeded to typecheck - pub _pd: PhantomData, -} - -impl< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, - > Replica -where - I::Exchanges: ValidatingExchangesType>, - ValidatingQuorumEx: ConsensusExchange< - TYPES, - Message, - Proposal = ValidatingProposal>, - Certificate = QuorumCertificate>, - Commitment = ValidatingLeaf, - >, -{ - /// portion of the replica task that spins until a valid QC can be signed or - /// timeout is hit. - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Replica Task", level = "error")] - #[allow(clippy::type_complexity)] - async fn find_valid_msg<'a>( - &self, - view_leader_key: TYPES::SignatureKey, - consensus: RwLockUpgradableReadGuard<'a, Consensus>>, - ) -> ( - RwLockUpgradableReadGuard<'a, Consensus>>, - Option>, - ) { - let lock = self.proposal_collection_chan.lock().await; - let mut invalid_qcs = 0; - let leaf = loop { - let msg = lock.recv().await; - info!("recv-ed message {:?}", msg.clone()); - if let Ok(msg) = msg { - // stale/newer view messages should never reach this specific task's receive channel - if Into::>::into(msg.clone()).view_number() != self.cur_view - { - continue; - } - match msg { - ProcessedGeneralConsensusMessage::Proposal(p, sender) => { - if view_leader_key != sender { - continue; - } - - let Some(parent) = consensus.saved_leaves.get(&p.data.parent_commitment) - else { - warn!("Proposal's parent missing from storage"); - continue; - }; - - let justify_qc = p.data.justify_qc; - - // go no further if the parent view number does not - // match the justify_qc. We can't accept this - if parent.view_number != justify_qc.view_number { - warn!( - "Inconsistency in recv-ed proposal. The parent's view number, {:?} did not match the justify_qc view number, {:?}", - parent.view_number, justify_qc.view_number - ); - continue; - } - - // check that the chain height is correct - if p.data.height != parent.height + 1 { - warn!( - "Incorrect height in recv-ed proposal. The parent's height, {}, did not follow from the proposal's height, {}", - parent.height, p.data.height - ); - continue; - } - - // check that the justify_qc is valid - if !self - .exchange - .is_valid_cert(&justify_qc, justify_qc.leaf_commitment) - { - invalid_qcs += 1; - warn!("Invalid justify_qc in proposal! Skipping proposal."); - continue; - } - - // check that we can indeed create the state - let leaf = if let Ok(state) = - parent.state.append(&p.data.deltas, &self.cur_view) - { - // check the commitment - if state.commit() != p.data.state_commitment { - warn!("Rejected proposal! After applying deltas to parent state, resulting commitment did not match proposal's"); - continue; - } - ValidatingLeaf::new( - state, - p.data.deltas, - p.data.parent_commitment, - justify_qc.clone(), - self.cur_view, - p.data.height, - Vec::new(), - time::OffsetDateTime::now_utc().unix_timestamp_nanos(), - p.data.proposer_id, - ) - } else { - warn!("State of proposal didn't match parent + deltas"); - continue; - }; - - if !view_leader_key.validate(&p.signature, leaf.commit().as_ref()) { - warn!(?p.signature, "Could not verify proposal."); - continue; - } - - let liveness_check = justify_qc.view_number > consensus.locked_view; - - // check if proposal extends from the locked leaf - let outcome = consensus.visit_leaf_ancestors( - parent.view_number, - Terminator::Inclusive(consensus.locked_view), - false, - |leaf| { - // if leaf view no == locked view no then we're done, report success by - // returning true - leaf.view_number != consensus.locked_view - }, - ); - - let safety_check = outcome.is_ok(); - - if let Err(e) = outcome { - self.api.send_view_error(self.cur_view, Arc::new(e)).await; - } - - // NOTE safenode check is here - // if !safenode, continue - // if !(safety_check || liveness_check) - // if !safety_check && !liveness_check - if !safety_check && !liveness_check { - warn!("Failed safety check and liveness check"); - continue; - } - - let leaf_commitment = leaf.commit(); - let vote_token = self.exchange.make_vote_token(self.cur_view); - - match vote_token { - Err(e) => { - error!( - "Failed to generate vote token for {:?} {:?}", - self.cur_view, e - ); - } - Ok(None) => { - info!("We were not chosen for committee on {:?}", self.cur_view); - } - Ok(Some(vote_token)) => { - info!("We were chosen for committee on {:?}", self.cur_view); - - // Generate and send vote - // TODO ED Will remove the below code once actual tests are in place - // let message = if self.id % 2 == 0 { - // self.exchange.create_no_message( - // leaf.justify_qc.commit(), - // leaf_commitment, - // self.cur_view, - // vote_token, - // ) - // } else if self.id % 5 == 0 { - // self.exchange.create_timeout_message( - // leaf.justify_qc.clone(), - // self.cur_view, - // vote_token, - // ) - // } else { - // self.exchange.create_yes_message( - // leaf.justify_qc.commit(), - // leaf_commitment, - // self.cur_view, - // vote_token, - // ) - // }; - - let message = self.exchange.create_yes_message( - leaf.justify_qc.commit(), - leaf_commitment, - self.cur_view, - vote_token, - ); - - let next_leader = self.exchange.get_leader(self.cur_view + 1); - - info!("Sending vote to next leader {:?}", message); - if self - .api - .send_direct_message::, QuorumVote>>(next_leader, ValidatingMessage(message)) - .await - .is_err() - { - consensus.metrics.failed_to_send_messages.add(1); - warn!("Failed to send vote to next leader"); - } else { - consensus.metrics.outgoing_direct_messages.add(1); - } - } - } - break leaf; - } - ProcessedGeneralConsensusMessage::InternalTrigger(trigger) => { - match trigger { - InternalTrigger::Timeout(_) => { - let next_leader = self.exchange.get_leader(self.cur_view + 1); - - consensus.metrics.number_of_timeouts.add(1); - - let vote_token = self.exchange.make_vote_token(self.cur_view); - - match vote_token { - Err(e) => { - error!( - "Failed to generate vote token for {:?} {:?}", - self.cur_view, e - ); - } - Ok(None) => { - info!( - "We were not chosen for committee on {:?}", - self.cur_view - ); - } - Ok(Some(vote_token)) => { - let timeout_msg = self.exchange.create_timeout_message( - self.high_qc.clone(), - self.cur_view, - vote_token, - ); - warn!( - "Timed out! Sending timeout to next leader {:?}", - timeout_msg - ); - - // send timedout message to the next leader - if let Err(e) = self - .api - .send_direct_message::, QuorumVote>>( - next_leader.clone(), - ValidatingMessage(timeout_msg), - ) - .await - { - consensus.metrics.failed_to_send_messages.add(1); - warn!( - ?next_leader, - ?e, - "Could not send time out message to next_leader" - ); - } else { - consensus.metrics.outgoing_direct_messages.add(1); - } - - // exits from entire function - self.api.send_replica_timeout(self.cur_view).await; - } - } - return (consensus, None); - } - } - } - ProcessedGeneralConsensusMessage::Vote(_, _) => { - // should only be for leader, never replica - warn!("Replica receieved a vote message. This is not what the replica expects. Skipping."); - continue; - } - ProcessedGeneralConsensusMessage::ViewSyncCertificate(_) => unimplemented!(), - ProcessedGeneralConsensusMessage::ViewSyncVote(_) => unimplemented!(), - } - } - // fall through logic if we did not receive successfully from channel - warn!("Replica did not receive successfully from channel. Terminating Replica."); - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.invalid_qc += invalid_qcs; - self.api.send_replica_timeout(self.cur_view).await; - return (RwLockWriteGuard::downgrade_to_upgradable(consensus), None); - }; - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.invalid_qc += invalid_qcs; - ( - RwLockWriteGuard::downgrade_to_upgradable(consensus), - Some(leaf), - ) - } - - /// run one view of replica - /// returns the `high_qc` - #[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Replica Task", level = "error")] - pub async fn run_view(self) -> QuorumCertificate> { - info!("Replica task started!"); - let consensus = self.consensus.upgradable_read().await; - let view_leader_key = self.exchange.get_leader(self.cur_view); - - let (consensus, maybe_leaf) = self.find_valid_msg(view_leader_key, consensus).await; - - let Some(leaf) = maybe_leaf else { - // we either timed out or for some reason - // could not accept a proposal - return self.high_qc; - }; - - let mut new_anchor_view = consensus.last_decided_view; - let mut new_locked_view = consensus.locked_view; - let mut last_view_number_visited = self.cur_view; - let mut new_commit_reached: bool = false; - let mut new_decide_reached = false; - let mut new_decide_qc = None; - let mut leaf_views = Vec::new(); - let mut included_txns = HashSet::new(); - let old_anchor_view = consensus.last_decided_view; - let parent_view = leaf.justify_qc.view_number; - let mut current_chain_length = 0usize; - if parent_view + 1 == self.cur_view { - current_chain_length += 1; - if let Err(e) = consensus.visit_leaf_ancestors( - parent_view, - Terminator::Exclusive(old_anchor_view), - true, - |leaf| { - if !new_decide_reached { - if last_view_number_visited == leaf.view_number + 1 { - last_view_number_visited = leaf.view_number; - current_chain_length += 1; - if current_chain_length == 2 { - new_locked_view = leaf.view_number; - new_commit_reached = true; - // The next leaf in the chain, if there is one, is decided, so this - // leaf's justify_qc would become the QC for the decided chain. - new_decide_qc = Some(leaf.justify_qc.clone()); - } else if current_chain_length == 3 { - new_anchor_view = leaf.view_number; - new_decide_reached = true; - } - } else { - // nothing more to do here... we don't have a new chain extension - return false; - } - } - // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above - if new_decide_reached { - leaf_views.push(leaf.clone()); - let txns = leaf.deltas.contained_transactions(); - for txn in txns { - included_txns.insert(txn); - } - } - true - }, - ) { - self.api.send_view_error(self.cur_view, Arc::new(e)).await; - } - } - let high_qc = leaf.justify_qc.clone(); - - let included_txns_set: HashSet<_> = if new_decide_reached { - included_txns - } else { - HashSet::new() - }; - - // promote lock here - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.state_map.insert( - self.cur_view, - View { - view_inner: ViewInner::Leaf { - leaf: leaf.commit(), - }, - }, - ); - - consensus.metrics.number_of_views_since_last_commit.set( - consensus - .state_map - .range(( - Excluded(consensus.last_decided_view), - Included(self.cur_view), - )) - .count(), - ); - - consensus.saved_leaves.insert(leaf.commit(), leaf.clone()); - if new_commit_reached { - consensus.locked_view = new_locked_view; - } - #[allow(clippy::cast_precision_loss)] - if new_decide_reached { - let num_views_since_last_anchor = - (*self.cur_view - *consensus.last_decided_view) as f64; - let views_seen = consensus - .state_map - .range(( - Excluded(consensus.last_decided_view), - Included(self.cur_view), - )) - .count(); - // A count of all veiws we saw that aren't in the current chain (so won't be commited) - consensus - .metrics - .discarded_views_per_decide_event - .add_point((views_seen - current_chain_length) as f64); - // An empty view is one we didn't see a leaf for but we moved past that view number - consensus - .metrics - .empty_views_per_decide_event - .add_point(num_views_since_last_anchor - views_seen as f64); - consensus - .metrics - .number_of_views_per_decide_event - .add_point(num_views_since_last_anchor); - consensus - .metrics - .invalid_qc_views - .add_point(consensus.invalid_qc as f64); - - let mut included_txn_size = 0; - consensus - .transactions - .modify(|txns| { - *txns = txns - .drain() - .filter(|(txn_hash, txn)| { - if included_txns_set.contains(txn_hash) { - included_txn_size += - bincode_opts().serialized_size(txn).unwrap_or_default(); - false - } else { - true - } - }) - .collect(); - }) - .await; - consensus - .metrics - .outstanding_transactions - .update(-(included_txns_set.len() as i64)); - consensus - .metrics - .outstanding_transactions_memory_size - .update(-(i64::try_from(included_txn_size).unwrap_or(i64::MAX))); - - consensus - .metrics - .rejected_transactions - .add(leaf.rejected.len()); - - let decide_sent = self.api.send_decide( - consensus.last_decided_view, - leaf_views, - new_decide_qc.unwrap(), - ); - let old_anchor_view = consensus.last_decided_view; - consensus - .collect_garbage(old_anchor_view, new_anchor_view) - .await; - consensus.last_decided_view = new_anchor_view; - consensus.invalid_qc = 0; - - // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. - if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { - error!("Could not insert new anchor into the storage API: {:?}", e); - } - - decide_sent.await; - } - high_qc - } -} diff --git a/consensus/src/sequencing_leader.rs b/consensus/src/sequencing_leader.rs index e85c7ccc43..3f80214fe1 100644 --- a/consensus/src/sequencing_leader.rs +++ b/consensus/src/sequencing_leader.rs @@ -18,7 +18,7 @@ use hotshot_types::traits::election::CommitteeExchangeType; use hotshot_types::traits::election::ConsensusExchange; use hotshot_types::traits::election::QuorumExchangeType; use hotshot_types::traits::node_implementation::{ - NodeImplementation, QuorumProposalType, QuorumVoteType, SequencingExchangesType, + NodeImplementation, QuorumProposalType, QuorumVoteType, }; use hotshot_types::traits::state::State; use hotshot_types::{ @@ -30,7 +30,6 @@ use hotshot_types::{ ProcessedSequencingMessage, Proposal, SequencingMessage, }, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::SignedCertificate, node_implementation::{CommitteeEx, NodeType, SequencingQuorumEx}, signature_key::SignatureKey, @@ -47,15 +46,13 @@ use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone)] pub struct DALeader< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, ConsensusMessage = SequencingMessage, >, -> where - I::Exchanges: SequencingExchangesType>, -{ +> { /// id of node pub id: u64, /// Reference to consensus. Leader will require a read lock on this. @@ -81,7 +78,7 @@ pub struct DALeader< } impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -89,7 +86,6 @@ impl< >, > DALeader where - I::Exchanges: SequencingExchangesType>, CommitteeEx: ConsensusExchange< TYPES, Message, @@ -315,15 +311,13 @@ where /// For now this step happens after the `DALeader` completes it's proposal and collects enough votes. pub struct ConsensusLeader< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, ConsensusMessage = SequencingMessage, >, -> where - I::Exchanges: SequencingExchangesType>, -{ +> { /// id of node pub id: u64, /// Reference to consensus. Leader will require a read lock on this. @@ -349,7 +343,7 @@ pub struct ConsensusLeader< } impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -357,7 +351,6 @@ impl< >, > ConsensusLeader where - I::Exchanges: SequencingExchangesType>, SequencingQuorumEx: ConsensusExchange< TYPES, Message, @@ -414,15 +407,13 @@ where /// Implenting the next leader. Collect votes on the previous leaders proposal and return the QC pub struct ConsensusNextLeader< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, ConsensusMessage = SequencingMessage, >, -> where - I::Exchanges: SequencingExchangesType>, -{ +> { /// id of node pub id: u64, /// Reference to consensus. Leader will require a read lock on this. @@ -446,7 +437,7 @@ pub struct ConsensusNextLeader< impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -454,7 +445,6 @@ impl< >, > ConsensusNextLeader where - I::Exchanges: SequencingExchangesType>, SequencingQuorumEx: ConsensusExchange< TYPES, Message, diff --git a/consensus/src/sequencing_replica.rs b/consensus/src/sequencing_replica.rs index 2d54b4ee0c..dc062fbb23 100644 --- a/consensus/src/sequencing_replica.rs +++ b/consensus/src/sequencing_replica.rs @@ -15,8 +15,7 @@ use hotshot_types::message::Message; use hotshot_types::traits::election::ConsensusExchange; use hotshot_types::traits::election::QuorumExchangeType; use hotshot_types::traits::node_implementation::{ - CommitteeEx, NodeImplementation, QuorumProposalType, QuorumVoteType, SequencingExchangesType, - SequencingQuorumEx, + CommitteeEx, NodeImplementation, QuorumProposalType, QuorumVoteType, SequencingQuorumEx, }; use hotshot_types::traits::state::ConsensusTime; use hotshot_types::{ @@ -27,8 +26,8 @@ use hotshot_types::{ ProcessedGeneralConsensusMessage, ProcessedSequencingMessage, SequencingMessage, }, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::SignedCertificate, - node_implementation::NodeType, signature_key::SignatureKey, Block, + election::SignedCertificate, node_implementation::NodeType, signature_key::SignatureKey, + Block, }, }; use hotshot_utils::bincode::bincode_opts; @@ -41,15 +40,13 @@ use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone)] pub struct SequencingReplica< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, ConsensusMessage = SequencingMessage, >, -> where - I::Exchanges: SequencingExchangesType>, -{ +> { /// ID of node. pub id: u64, /// Reference to consensus. The replica will require a write lock on this. @@ -77,7 +74,7 @@ pub struct SequencingReplica< impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -85,7 +82,6 @@ impl< >, > SequencingReplica where - I::Exchanges: SequencingExchangesType>, SequencingQuorumEx: ConsensusExchange< TYPES, Message, diff --git a/consensus/src/traits.rs b/consensus/src/traits.rs index 2fc2ab0f3f..42feb0ae57 100644 --- a/consensus/src/traits.rs +++ b/consensus/src/traits.rs @@ -3,22 +3,14 @@ use async_trait::async_trait; use hotshot_types::certificate::QuorumCertificate; use hotshot_types::message::DataMessage; -use hotshot_types::message::{Message, SequencingMessage, ValidatingMessage}; -use hotshot_types::traits::node_implementation::{ - NodeImplementation, NodeType, SequencingExchangesType, ValidatingExchangesType, -}; +use hotshot_types::message::SequencingMessage; +use hotshot_types::traits::node_implementation::{NodeImplementation, NodeType}; use hotshot_types::traits::storage::StorageError; use hotshot_types::{ - data::{LeafType, ProposalType, ValidatingLeaf}, + data::{LeafType, ProposalType}, error::HotShotError, event::{Event, EventType}, - traits::{ - consensus_type::{ - sequencing_consensus::SequencingConsensus, validating_consensus::ValidatingConsensus, - }, - network::NetworkError, - signature_key::SignatureKey, - }, + traits::{network::NetworkError, signature_key::SignatureKey}, vote::VoteType, }; @@ -127,50 +119,13 @@ pub trait ConsensusSharedApi< } } -/// The API that [`HotStuff`] needs to talk to the system for validating consensus. -#[async_trait] -pub trait ValidatingConsensusApi< - TYPES: NodeType, - LEAF: LeafType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, ->: ConsensusSharedApi where - I::Exchanges: ValidatingExchangesType>, -{ - /// Send a direct message to the given recipient - async fn send_direct_message, VOTE: VoteType>( - &self, - recipient: TYPES::SignatureKey, - message: ValidatingMessage, - ) -> std::result::Result<(), NetworkError>; - - /// Send a broadcast message to the entire network. - async fn send_broadcast_message< - PROPOSAL: ProposalType, - VOTE: VoteType, - >( - &self, - message: ValidatingMessage, - ) -> std::result::Result<(), NetworkError>; - - /// Send a message with a transaction. - async fn send_transaction( - &self, - message: DataMessage, - ) -> std::result::Result<(), NetworkError>; -} - /// The API that [`HotStuff`] needs to talk to the system, for sequencing consensus. #[async_trait] pub trait SequencingConsensusApi< - TYPES: NodeType, + TYPES: NodeType, LEAF: LeafType, I: NodeImplementation>, ->: ConsensusSharedApi where - I::Exchanges: SequencingExchangesType>, +>: ConsensusSharedApi { /// Send a direct message to the given recipient async fn send_direct_message, VOTE: VoteType>( diff --git a/examples/infra/mod.rs b/examples/infra/mod.rs index e535987023..20973a8df8 100644 --- a/examples/infra/mod.rs +++ b/examples/infra/mod.rs @@ -1,70 +1,20 @@ -use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; -use async_lock::RwLock; -use async_trait::async_trait; use clap::Parser; -use futures::StreamExt; -use hotshot::{ - traits::{ - implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryStorage, WebCommChannel, WebServerNetwork, - }, - NodeImplementation, Storage, - }, - types::{SignatureKey, SystemContextHandle}, - SystemContext, ViewRunner, -}; use hotshot_orchestrator::{ self, - client::{OrchestratorClient, ValidatorArgs}, - config::{NetworkConfig, NetworkConfigFile, WebServerConfig}, + config::{NetworkConfig, NetworkConfigFile}, }; -use hotshot_task::task::FilterEvent; -use hotshot_types::event::{Event, EventType}; -use hotshot_types::traits::election::ConsensusExchange; -use hotshot_types::traits::state::ConsensusTime; -use hotshot_types::vote::ViewSyncVote; -use hotshot_types::{ - data::{TestableLeaf, ValidatingLeaf, ValidatingProposal}, - message::ValidatingMessage, - traits::{ - consensus_type::validating_consensus::ValidatingConsensus, - election::{Membership, ViewSyncExchange}, - metrics::NoMetrics, - network::CommunicationChannel, - node_implementation::{ExchangesType, NodeType, ValidatingExchanges}, - state::{TestableBlock, TestableState}, - }, - vote::QuorumVote, - HotShotConfig, -}; -use hotshot_types::{message::Message, traits::election::QuorumExchange}; +use hotshot_types::traits::node_implementation::NodeType; +use hotshot_types::traits::signature_key::SignatureKey; use libp2p::{ identity::{ ed25519::{Keypair as EdKeypair, SecretKey}, Keypair, }, - multiaddr::{self, Protocol}, + multiaddr::{self}, Multiaddr, }; -use libp2p_identity::PeerId; -use libp2p_networking::network::{MeshParams, NetworkNodeConfigBuilder, NetworkNodeType}; -#[allow(deprecated)] -use nll::nll_todo::nll_todo; -use rand::SeedableRng; -use std::fmt::Debug; -use std::net::Ipv4Addr; -use std::{ - cmp, - collections::{BTreeSet, VecDeque}, - fs, mem, - net::IpAddr, - num::NonZeroUsize, - str::FromStr, - sync::Arc, - time::Instant, -}; -#[allow(deprecated)] -use tracing::error; +use std::{fmt::Debug, fs}; +use std::{net::IpAddr, str::FromStr}; // ORCHESTRATOR @@ -110,390 +60,6 @@ pub fn load_config_from_file( config } -/// Runs the orchestrator -pub async fn run_orchestrator< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > + Debug, - VIEWSYNCNETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - > + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - VIEWSYNCNETWORK, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, ->( - OrchestratorArgs { - host, - port, - config_file, - }: OrchestratorArgs, -) { - error!("Starting orchestrator",); - let run_config = load_config_from_file::(config_file); - let _result = hotshot_orchestrator::run_orchestrator::< - TYPES::SignatureKey, - TYPES::ElectionConfigType, - >(run_config, host, port) - .await; -} - -/// Defines the behavior of a "run" of the network with a given configuration -#[async_trait] -pub trait Run< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > + Debug, - VIEWSYNCNETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - > + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - VIEWSYNCNETWORK, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, -> where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, - Self: Sync, -{ - /// Initializes networking, returns self - async fn initialize_networking( - config: NetworkConfig, - ) -> Self; - - /// Initializes the genesis state and HotShot instance; does not start HotShot consensus - /// # Panics if it cannot generate a genesis block, fails to initialize HotShot, or cannot - /// get the anchored view - async fn initialize_state_and_hotshot( - &self, - ) -> (TYPES::StateType, SystemContextHandle) { - let genesis_block = TYPES::BlockType::genesis(); - let initializer = - hotshot::HotShotInitializer::>::from_genesis( - genesis_block, - ) - .expect("Couldn't generate genesis block"); - - let config = self.get_config(); - - let (pk, sk) = - TYPES::SignatureKey::generated_from_seed_indexed(config.seed, config.node_index); - let ek = jf_primitives::aead::KeyPair::generate(&mut rand_chacha::ChaChaRng::from_seed( - [0u8; 32], - )); - let known_nodes = config.config.known_nodes.clone(); - - let network = self.get_network(); - - // Since we do not currently pass the election config type in the NetworkConfig, this will always be the default election config - let election_config = config.config.election_config.clone().unwrap_or_else(|| { - , - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - > as ConsensusExchange>>::Membership::default_election_config( - config.config.total_nodes.get() as u64 - ) - }); - - let exchanges = NODE::Exchanges::create( - known_nodes.clone(), - (election_config.clone(), ()), - //Kaley todo: add view sync network - #[allow(deprecated)] - (network.clone(), nll_todo(), ()), - pk.clone(), - sk.clone(), - ek.clone(), - ); - let hotshot = SystemContext::init( - pk, - sk, - config.node_index, - config.config, - MemoryStorage::empty(), - exchanges, - initializer, - NoMetrics::boxed(), - ) - .await - .expect("Could not init hotshot"); - - let state = hotshot - .storage() - .get_anchored_view() - .await - .expect("Couldn't get HotShot's anchored view") - .state; - (state, hotshot) - } - - /// Starts HotShot consensus, returns when consensus has finished - async fn run_hotshot(&self, mut context: SystemContextHandle) { - let NetworkConfig { - padding, - rounds, - transactions_per_round, - node_index, - config: HotShotConfig { total_nodes, .. }, - .. - } = self.get_config(); - - let size = mem::size_of::(); - let adjusted_padding = if padding < size { 0 } else { padding - size }; - let mut txns: VecDeque = VecDeque::new(); - let state = context.get_state().await; - - // This assumes that no node will be a leader more than 5x the expected number of times they should be the leader - // FIXME is this a reasonable assumption when we start doing DA? - // TODO ED: In the future we should have each node generate transactions every round to simulate a more realistic network - let tx_to_gen = transactions_per_round * (cmp::max(rounds / total_nodes, 1) + 5); - { - let mut txn_rng = rand::thread_rng(); - for _ in 0..tx_to_gen { - let txn = - <::StateType as TestableState>::create_random_transaction( - Some(&state), - &mut txn_rng, - padding as u64, - ); - txns.push_back(txn); - } - } - error!("Generated {} transactions", tx_to_gen); - - error!("Adjusted padding size is {:?} bytes", adjusted_padding); - let mut round = 1; - let mut total_transactions = 0; - - let start = Instant::now(); - - error!("Starting hotshot!"); - context.hotshot.start_consensus().await; - let (mut event_stream, _streamid) = context.get_event_stream(FilterEvent::default()).await; - let mut anchor_view: TYPES::Time = ::genesis(); - let mut num_successful_commits = 0; - - let total_nodes_u64 = total_nodes.get() as u64; - - let mut should_submit_txns = node_index == (*anchor_view % total_nodes_u64); - - loop { - if should_submit_txns { - for _ in 0..transactions_per_round { - let _txn = txns.pop_front().unwrap(); - tracing::info!("Submitting txn on round {}", round); - // context.submit_transaction(txn).await.unwrap(); - total_transactions += 1; - } - should_submit_txns = false; - } - - match event_stream.next().await { - None => { - panic!("Error! Event stream completed before consensus ended."); - } - Some(Event { event, .. }) => { - match event { - EventType::Error { error } => { - error!("Error in consensus: {:?}", error); - // TODO what to do here - } - EventType::Decide { - leaf_chain, - qc: _, - block_size: _, - } => { - // this might be a obob - if let Some(leaf) = leaf_chain.get(0) { - let new_anchor = leaf.view_number; - if new_anchor >= anchor_view { - anchor_view = leaf.view_number; - } - if (*anchor_view % total_nodes_u64) == node_index { - should_submit_txns = true; - } - } - num_successful_commits += leaf_chain.len(); - if num_successful_commits >= rounds { - break; - } - // when we make progress, submit new events - } - EventType::ReplicaViewTimeout { view_number } => { - error!("Timed out as a replicas in view {:?}", view_number); - } - EventType::NextLeaderViewTimeout { view_number } => { - error!("Timed out as the next leader in view {:?}", view_number); - } - EventType::ViewFinished { view_number } => { - tracing::info!("view finished: {:?}", view_number); - } - _ => unimplemented!(), - } - } - } - - round += 1; - } - - // while round <= rounds { - // error!("Round {}:", round); - // - // let num_submitted = - // if node_index == ((round % total_nodes) as u64) { - // for _ in 0..transactions_per_round { - // let txn = txns.pop_front().unwrap(); - // tracing::info!("Submitting txn on round {}", round); - // hotshot.submit_transaction(txn).await.unwrap(); - // } - // transactions_per_round - // } else { - // 0 - // }; - // error!("Submitting {} transactions", num_submitted); - // - // // Start consensus - // let view_results = nll_todo(); - // - // match view_results { - // Ok((leaf_chain, _qc)) => { - // let blocks: Vec = leaf_chain - // .into_iter() - // .map(|leaf| leaf.get_deltas()) - // .collect(); - // - // for block in blocks { - // total_transactions += block.txn_count(); - // } - // } - // Err(e) => { - // timed_out_views += 1; - // error!("View: {:?}, failed with : {:?}", round, e); - // } - // } - // - // round += 1; - // } - // - let total_time_elapsed = start.elapsed(); - let total_size = total_transactions * (padding as u64); - // - // // This assumes all transactions that were submitted made it through consensus, and does not account for the genesis block - error!("All {rounds} rounds completed in {total_time_elapsed:?}. {total_size} total bytes submitted"); - } - - /// Returns the network for this run - fn get_network(&self) -> NETWORK; - - /// Returns view sync network for this run KALEY TODO - //fn get_view_sync_network(&self) -> VIEWSYNCNETWORK; - - /// Returns the config for this run - fn get_config(&self) -> NetworkConfig; -} - -type Proposal = ValidatingProposal>; - -// LIBP2P - -/// Represents a libp2p-based run -pub struct Libp2pRun< - TYPES: NodeType, - I: NodeImplementation, - MEMBERSHIP: Membership, -> { - _bootstrap_nodes: Vec<(PeerId, Multiaddr)>, - _node_type: NetworkNodeType, - _bound_addr: Multiaddr, - /// for libp2p layer - _identity: Keypair, - - network: Libp2pCommChannel< - TYPES, - I, - Proposal, - QuorumVote>, - MEMBERSHIP, - >, - // view_sync_network: Libp2pCommChannel< - // TYPES, - // I, - // Proposal, - // ViewSyncVote, - // MEMBERSHIP, - // >, - config: - NetworkConfig<::SignatureKey, ::ElectionConfigType>, -} - /// yeesh maybe we should just implement SignatureKey for this... pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair { let mut hasher = blake3::Hasher::new(); @@ -521,459 +87,3 @@ pub fn parse_ip(s: &str) -> Result { let port = i.next().ok_or(multiaddr::Error::InvalidMultiaddr)?; Multiaddr::from_str(&format!("/ip4/{ip}/tcp/{port}")) } - -#[async_trait] -impl< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, - > - Run< - TYPES, - MEMBERSHIP, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - NODE, - > for Libp2pRun -where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, - Self: Sync, -{ - async fn initialize_networking( - config: NetworkConfig, - ) -> Libp2pRun { - let (pubkey, _privkey) = - <::SignatureKey as SignatureKey>::generated_from_seed_indexed( - config.seed, - config.node_index, - ); - let mut config = config; - let libp2p_config = config - .libp2p_config - .take() - .expect("Configuration is not for a Libp2p network"); - let bs_len = libp2p_config.bootstrap_nodes.len(); - let bootstrap_nodes: Vec<(PeerId, Multiaddr)> = libp2p_config - .bootstrap_nodes - .iter() - .map(|(addr, pair)| { - let kp = Keypair::from_protobuf_encoding(pair).unwrap(); - let peer_id = PeerId::from_public_key(&kp.public()); - let mut multiaddr = Multiaddr::from(addr.ip()); - multiaddr.push(Protocol::Tcp(addr.port())); - (peer_id, multiaddr) - }) - .collect(); - let identity = libp2p_generate_indexed_identity(config.seed, config.node_index); - let node_type = if (config.node_index as usize) < bs_len { - NetworkNodeType::Bootstrap - } else { - NetworkNodeType::Regular - }; - let node_index = config.node_index; - let port_index = match libp2p_config.index_ports { - true => node_index, - false => 0, - }; - let bound_addr: Multiaddr = format!( - "/{}/{}/tcp/{}", - if libp2p_config.public_ip.is_ipv4() { - "ip4" - } else { - "ip6" - }, - libp2p_config.public_ip, - libp2p_config.base_port as u64 + port_index - ) - .parse() - .unwrap(); - - // generate network - let mut config_builder = NetworkNodeConfigBuilder::default(); - assert!(config.config.total_nodes.get() > 2); - let replicated_nodes = NonZeroUsize::new(config.config.total_nodes.get() - 2).unwrap(); - config_builder.replication_factor(replicated_nodes); - config_builder.identity(identity.clone()); - - config_builder.bound_addr(Some(bound_addr.clone())); - - let to_connect_addrs = bootstrap_nodes - .iter() - .map(|(peer_id, multiaddr)| (Some(*peer_id), multiaddr.clone())) - .collect(); - - config_builder.to_connect_addrs(to_connect_addrs); - - let mesh_params = - // NOTE I'm arbitrarily choosing these. - match node_type { - NetworkNodeType::Bootstrap => MeshParams { - mesh_n_high: libp2p_config.bootstrap_mesh_n_high, - mesh_n_low: libp2p_config.bootstrap_mesh_n_low, - mesh_outbound_min: libp2p_config.bootstrap_mesh_outbound_min, - mesh_n: libp2p_config.bootstrap_mesh_n, - }, - NetworkNodeType::Regular => MeshParams { - mesh_n_high: libp2p_config.mesh_n_high, - mesh_n_low: libp2p_config.mesh_n_low, - mesh_outbound_min: libp2p_config.mesh_outbound_min, - mesh_n: libp2p_config.mesh_n, - }, - NetworkNodeType::Conductor => unreachable!(), - }; - config_builder.mesh_params(Some(mesh_params)); - - let node_config = config_builder.build().unwrap(); - let network = Arc::new( - Libp2pNetwork::new( - NoMetrics::boxed(), - node_config, - pubkey.clone(), - Arc::new(RwLock::new( - bootstrap_nodes - .iter() - .map(|(peer_id, addr)| (Some(*peer_id), addr.clone())) - .collect(), - )), - bs_len, - config.node_index as usize, - // NOTE: this introduces an invariant that the keys are assigned using this indexed - // function - { - let mut keys = BTreeSet::new(); - for i in 0..config.config.total_nodes.get() { - let pk = - ::generated_from_seed_indexed( - config.seed, - i as u64, - ) - .0; - keys.insert(pk); - } - keys - }, - BTreeSet::new(), - ) - .await - .unwrap(), - ); - - let comm_channel = Libp2pCommChannel::< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >::new(network); - - comm_channel.wait_for_ready().await; - - Libp2pRun { - config, - - _bootstrap_nodes: bootstrap_nodes, - _node_type: node_type, - _identity: identity, - _bound_addr: bound_addr, - // _socket: stream, - network: comm_channel, - } - } - - fn get_config( - &self, - ) -> NetworkConfig<::SignatureKey, ::ElectionConfigType> - { - self.config.clone() - } - - fn get_network( - &self, - ) -> Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > { - self.network.clone() - } -} - -// WEB SERVER - -/// Alias for the [`WebCommChannel`] for validating consensus. -type ValidatingWebCommChannel = - WebCommChannel, QuorumVote>, MEMBERSHIP>; - -/// Represents a web server-based run -pub struct WebServerRun< - TYPES: NodeType, - I: NodeImplementation, - MEMBERSHIP: Membership, -> { - config: NetworkConfig, - network: ValidatingWebCommChannel, -} - -#[async_trait] -impl< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, - > - Run< - TYPES, - MEMBERSHIP, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - NODE, - > for WebServerRun -where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, - Self: Sync, -{ - async fn initialize_networking( - config: NetworkConfig, - ) -> WebServerRun { - // Generate our own key - let (pub_key, _priv_key) = - <::SignatureKey as SignatureKey>::generated_from_seed_indexed( - config.seed, - config.node_index, - ); - - // Get the configuration for the web server - let WebServerConfig { - host, - port, - wait_between_polls, - }: WebServerConfig = config.clone().web_server_config.unwrap(); - - // Create the network - let network: WebCommChannel< - TYPES, - NODE, - Proposal, - QuorumVote>, - MEMBERSHIP, - > = WebCommChannel::new( - WebServerNetwork::create(&host.to_string(), port, wait_between_polls, pub_key, false) - .into(), - ); - WebServerRun { config, network } - } - - fn get_network( - &self, - ) -> WebCommChannel< - TYPES, - NODE, - Proposal, - QuorumVote>, - MEMBERSHIP, - > { - self.network.clone() - } - - fn get_config(&self) -> NetworkConfig { - self.config.clone() - } -} - -/// Main entry point for validators -pub async fn main_entry_point< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > + Debug, - VIEWSYNCNETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - > + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - VIEWSYNCNETWORK, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, - RUN: Run, ->( - args: ValidatorArgs, -) where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, -{ - setup_logging(); - setup_backtrace(); - - error!("Starting validator"); - - let orchestrator_client: OrchestratorClient = - OrchestratorClient::connect_to_orchestrator(args.clone()).await; - - // Identify with the orchestrator - let public_ip = match args.public_ip { - Some(ip) => ip, - None => IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - }; - error!( - "Identifying with orchestrator using IP address {}", - public_ip.to_string() - ); - let node_index: u16 = orchestrator_client - .identify_with_orchestrator(public_ip.to_string()) - .await; - error!("Finished identifying; our node index is {node_index}"); - error!("Getting config from orchestrator"); - - let mut run_config = orchestrator_client - .get_config_from_orchestrator::(node_index) - .await; - - run_config.node_index = node_index.into(); - run_config.libp2p_config.as_mut().unwrap().public_ip = args.public_ip.unwrap(); - - error!("Initializing networking"); - let run = RUN::initialize_networking(run_config.clone()).await; - let (_state, hotshot) = run.initialize_state_and_hotshot().await; - - error!("Waiting for start command from orchestrator"); - orchestrator_client - .wait_for_all_nodes_ready(run_config.clone().node_index) - .await; - - error!("All nodes are ready! Starting HotShot"); - run.run_hotshot(hotshot).await; -} diff --git a/examples/infra/modDA.rs b/examples/infra/modDA.rs index 1edb2ea496..2f023ab5ce 100644 --- a/examples/infra/modDA.rs +++ b/examples/infra/modDA.rs @@ -38,7 +38,6 @@ use hotshot_types::{ data::{DAProposal, QuorumProposal, SequencingLeaf, TestableLeaf}, message::SequencingMessage, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::Membership, metrics::NoMetrics, network::CommunicationChannel, @@ -82,7 +81,7 @@ use tracing::warn; /// Runs the orchestrator pub async fn run_orchestrator_da< - TYPES: NodeType, + TYPES: NodeType, MEMBERSHIP: Membership + Debug, DANETWORK: CommunicationChannel< TYPES, @@ -150,7 +149,7 @@ pub async fn run_orchestrator_da< /// Defines the behavior of a "run" of the network with a given configuration #[async_trait] pub trait RunDA< - TYPES: NodeType, + TYPES: NodeType