From 8f9ba46cf45042f43c2f76fd02b455535c2443ee Mon Sep 17 00:00:00 2001 From: Justin Restivo Date: Thu, 10 Aug 2023 12:22:10 -0400 Subject: [PATCH 1/7] feat: nuke sequencing consensus and consensus types --- consensus/src/da_member.rs | 5 +- consensus/src/leader.rs | 213 +-- consensus/src/lib.rs | 8 +- consensus/src/next_leader.rs | 144 +- consensus/src/replica.rs | 549 -------- consensus/src/sequencing_leader.rs | 13 +- consensus/src/sequencing_replica.rs | 6 +- consensus/src/traits.rs | 47 +- examples/infra/mod.rs | 897 +----------- examples/infra/modDA.rs | 21 +- examples/web-server-da/multi-web-server.rs | 8 +- examples/web-server-da/web-server.rs | 4 +- src/demos/sdemo.rs | 4 +- src/demos/vdemo.rs | 1214 ++++++++--------- src/lib.rs | 523 +------ src/tasks/mod.rs | 175 +-- src/traits/networking/libp2p_network.rs | 6 +- src/traits/networking/memory_network.rs | 4 +- src/traits/storage/memory_storage.rs | 3 - src/types/handle.rs | 2 +- task-impls/src/consensus.rs | 22 +- task-impls/src/da.rs | 17 +- task-impls/src/network.rs | 59 +- task-impls/src/view_sync.rs | 19 +- testing/src/completion_task.rs | 20 +- testing/src/node_types.rs | 2 - testing/src/overall_safety_task.rs | 22 +- testing/src/spinning_task.rs | 10 +- testing/src/test_builder.rs | 17 +- testing/src/test_launcher.rs | 34 +- testing/src/test_runner.rs | 93 +- testing/src/txn_task.rs | 12 +- testing/tests/consensus_task.rs | 17 +- types/src/data.rs | 85 -- types/src/message.rs | 148 +- .../consensus_type/sequencing_consensus.rs | 16 - types/src/traits/election.rs | 22 +- types/src/traits/node_implementation.rs | 412 +----- 38 files changed, 920 insertions(+), 3953 deletions(-) diff --git a/consensus/src/da_member.rs b/consensus/src/da_member.rs index 59acb0068a..8627d22a4b 100644 --- a/consensus/src/da_member.rs +++ b/consensus/src/da_member.rs @@ -17,7 +17,6 @@ use hotshot_types::{ ProcessedGeneralConsensusMessage, ProcessedSequencingMessage, SequencingMessage, }, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::{CommitteeExchangeType, ConsensusExchange}, node_implementation::{ CommitteeEx, CommitteeProposalType, CommitteeVote, NodeImplementation, NodeType, @@ -34,7 +33,7 @@ use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone)] pub struct DAMember< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -67,7 +66,7 @@ pub struct DAMember< impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, diff --git a/consensus/src/leader.rs b/consensus/src/leader.rs index f2d411656f..b7872eafe2 100644 --- a/consensus/src/leader.rs +++ b/consensus/src/leader.rs @@ -1,12 +1,13 @@ //! Contains the [`ValidatingLeader`] struct used for the leader step in the hotstuff consensus algorithm. -use crate::{CommitmentMap, Consensus, ValidatingConsensusApi}; +use crate::{CommitmentMap, Consensus}; use async_compatibility_layer::{ art::{async_sleep, async_timeout}, async_primitives::subscribable_rwlock::{ReadView, SubscribableRwLock}, }; use async_lock::RwLock; use commit::Committable; +use hotshot_types::message::Message; use hotshot_types::{ certificate::QuorumCertificate, data::{ValidatingLeaf, ValidatingProposal}, @@ -14,221 +15,15 @@ use hotshot_types::{ traits::{ consensus_type::validating_consensus::ValidatingConsensus, election::SignedCertificate, - node_implementation::{ - NodeImplementation, NodeType, QuorumProposalType, QuorumVoteType, ValidatingQuorumEx, - }, + node_implementation::{NodeImplementation, NodeType, QuorumProposalType, QuorumVoteType}, signature_key::SignatureKey, Block, State, }, }; -use hotshot_types::{message::Message, traits::node_implementation::ValidatingExchangesType}; use hotshot_types::{ - message::{Proposal, ValidatingMessage}, + message::Proposal, traits::election::{ConsensusExchange, QuorumExchangeType}, }; use std::marker::PhantomData; use std::{sync::Arc, time::Instant}; use tracing::{error, info, instrument, warn}; -/// This view's validating leader -#[derive(Debug, Clone)] -pub struct ValidatingLeader< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, -> where - I::Exchanges: ValidatingExchangesType>, -{ - /// id of node - pub id: u64, - /// Reference to consensus. Validating leader will require a read lock on this. - pub consensus: Arc>>>, - /// The `high_qc` per spec - pub high_qc: QuorumCertificate>, - /// The view number we're running on - pub cur_view: TYPES::Time, - /// Lock over the transactions list - pub transactions: Arc>>, - /// Limited access to the consensus protocol - pub api: A, - - /// the quorum exchange - pub exchange: Arc>, - - /// needed for type checking - pub _pd: PhantomData, -} - -impl< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, - > ValidatingLeader -where - I::Exchanges: ValidatingExchangesType>, - ValidatingQuorumEx: ConsensusExchange< - TYPES, - Message, - Proposal = ValidatingProposal>, - >, -{ - /// Run one view of the leader task - #[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Validating ValidatingLeader Task", level = "error")] - pub async fn run_view(self) -> QuorumCertificate> { - let pk = self.api.public_key(); - info!("Validating leader task started!"); - - let task_start_time = Instant::now(); - let parent_view_number = &self.high_qc.view_number(); - let consensus = self.consensus.read().await; - let mut reached_decided = false; - - let Some(parent_view) = consensus.state_map.get(parent_view_number) else { - warn!("Couldn't find high QC parent in state map."); - return self.high_qc; - }; - let Some(leaf) = parent_view.get_leaf_commitment() else { - warn!( - ?parent_view_number, - ?parent_view, - "Parent of high QC points to a view without a proposal" - ); - return self.high_qc; - }; - let Some(leaf) = consensus.saved_leaves.get(&leaf) else { - warn!("Failed to find high QC parent."); - return self.high_qc; - }; - if leaf.view_number == consensus.last_decided_view { - reached_decided = true; - } - let parent_leaf = leaf.clone(); - - let original_parent_hash = parent_leaf.commit(); - let starting_state = &parent_leaf.state; - - let mut previous_used_txns = parent_leaf.deltas.contained_transactions(); - - let mut next_parent_hash = original_parent_hash; - - if !reached_decided { - while let Some(next_parent_leaf) = consensus.saved_leaves.get(&next_parent_hash) { - if next_parent_leaf.view_number <= consensus.last_decided_view { - break; - } - let next_parent_txns = next_parent_leaf.deltas.contained_transactions(); - for next_parent_txn in next_parent_txns { - previous_used_txns.insert(next_parent_txn); - } - next_parent_hash = next_parent_leaf.parent_commitment; - } - // TODO do some sort of sanity check on the view number that it matches decided - } - - let passed_time = task_start_time - Instant::now(); - async_sleep(self.api.propose_min_round_time() - passed_time).await; - - let receiver = self.transactions.subscribe().await; - let mut block = ::StateType::next_block(Some(starting_state.clone())); - - // Wait until we have min_transactions for the block or we hit propose_max_round_time - while task_start_time.elapsed() < self.api.propose_max_round_time() { - let txns = self.transactions.cloned().await; - let unclaimed_txns: Vec<_> = txns - .iter() - .filter(|(txn_hash, _txn)| !previous_used_txns.contains(txn_hash)) - .collect(); - - let time_past = task_start_time.elapsed(); - if unclaimed_txns.len() < self.api.min_transactions() - && (time_past < self.api.propose_max_round_time()) - { - let duration = self.api.propose_max_round_time() - time_past; - let result = async_timeout(duration, receiver.recv()).await; - match result { - Err(_) => { - // Fall through below to updating new block - info!("propose_max_round_time passed, sending transactions we have so far"); - } - Ok(Err(e)) => { - // Something unprecedented is wrong, and `transactions` has been dropped - error!("Channel receiver error for SubscribableRwLock {:?}", e); - return self.high_qc; - } - Ok(Ok(_)) => continue, - } - } - - // Add unclaimed transactions to the new block - for (_txn_hash, txn) in &unclaimed_txns { - let new_block_check = block.add_transaction_raw(txn); - if let Ok(new_block) = new_block_check { - if starting_state.validate_block(&new_block, &self.cur_view) { - block = new_block; - continue; - } - } - } - break; - } - - consensus - .metrics - .proposal_wait_duration - .add_point(task_start_time.elapsed().as_secs_f64()); - - let proposal_build_start = Instant::now(); - - if let Ok(new_state) = starting_state.append(&block, &self.cur_view) { - let leaf = ValidatingLeaf { - view_number: self.cur_view, - height: parent_leaf.height + 1, - justify_qc: self.high_qc.clone(), - parent_commitment: original_parent_hash, - deltas: block, - state: new_state, - rejected: Vec::new(), - timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(), - proposer_id: pk.to_bytes(), - }; - let signature = self - .exchange - .sign_validating_or_commitment_proposal::(&leaf.commit()); - let data: ValidatingProposal> = leaf.into(); - let message = - ValidatingMessage::(GeneralConsensusMessage::Proposal(Proposal { - data, - signature, - })); - consensus - .metrics - .proposal_build_duration - .add_point(proposal_build_start.elapsed().as_secs_f64()); - info!("Sending out proposal {:?}", message); - - if let Err(e) = self - .api - .send_broadcast_message::, QuorumVoteType>( - message.clone(), - ) - .await - { - consensus.metrics.failed_to_send_messages.add(1); - warn!(?message, ?e, "Could not broadcast leader proposal"); - } else { - consensus.metrics.outgoing_broadcast_messages.add(1); - } - } else { - error!("Could not append state in high qc for proposal. Failed to send out proposal."); - } - - self.high_qc.clone() - } -} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f6f1e9294a..a8291d344c 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -12,9 +12,6 @@ #![allow(clippy::module_name_repetitions)] mod da_member; -mod leader; -mod next_leader; -mod replica; mod sequencing_leader; mod sequencing_replica; pub mod traits; @@ -23,13 +20,10 @@ pub mod utils; use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock; pub use da_member::DAMember; pub use hotshot_types::traits::node_implementation::ViewQueue; -pub use leader::ValidatingLeader; -pub use next_leader::NextValidatingLeader; -pub use replica::Replica; pub use sequencing_leader::{ConsensusLeader, ConsensusNextLeader, DALeader}; pub use sequencing_replica::SequencingReplica; use std::collections::HashSet; -pub use traits::{ConsensusSharedApi, SequencingConsensusApi, ValidatingConsensusApi}; +pub use traits::{ConsensusSharedApi, SequencingConsensusApi}; pub use utils::{View, ViewInner}; use commit::{Commitment, Committable}; diff --git a/consensus/src/next_leader.rs b/consensus/src/next_leader.rs index 64f590b423..abd447a6ff 100644 --- a/consensus/src/next_leader.rs +++ b/consensus/src/next_leader.rs @@ -1,7 +1,6 @@ //! Contains the [`NextValidatingLeader`] struct used for the next leader step in the hotstuff consensus algorithm. use crate::ConsensusMetrics; -use crate::ValidatingConsensusApi; use async_compatibility_layer::channel::UnboundedReceiver; use async_lock::Mutex; use either::Either; @@ -9,14 +8,12 @@ use hotshot_types::data::ValidatingLeaf; use hotshot_types::message::Message; use hotshot_types::message::ProcessedGeneralConsensusMessage; use hotshot_types::traits::election::ConsensusExchange; -use hotshot_types::traits::node_implementation::{ - NodeImplementation, NodeType, ValidatingExchangesType, ValidatingQuorumEx, -}; +use hotshot_types::traits::node_implementation::{NodeImplementation, NodeType}; use hotshot_types::traits::signature_key::SignatureKey; use hotshot_types::vote::VoteAccumulator; use hotshot_types::{ certificate::QuorumCertificate, - message::{ConsensusMessageType, InternalTrigger, ValidatingMessage}, + message::{ConsensusMessageType, InternalTrigger}, traits::consensus_type::validating_consensus::ValidatingConsensus, vote::QuorumVote, }; @@ -27,140 +24,3 @@ use std::{ sync::Arc, }; use tracing::{info, instrument, warn}; - -/// The next view's validating leader -#[derive(custom_debug::Debug, Clone)] -pub struct NextValidatingLeader< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, -> where - I::Exchanges: ValidatingExchangesType>, -{ - /// id of node - pub id: u64, - /// generic_qc before starting this - pub generic_qc: QuorumCertificate>, - /// channel through which the leader collects votes - #[allow(clippy::type_complexity)] - pub vote_collection_chan: - Arc>>>, - /// The view number we're running on - pub cur_view: TYPES::Time, - /// Limited access to the consensus protocol - pub api: A, - - /// quorum exchange - pub exchange: Arc>, - /// Metrics for reporting stats - #[debug(skip)] - pub metrics: Arc, - - /// needed to type check - pub _pd: PhantomData, -} - -impl< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, - > NextValidatingLeader -where - I::Exchanges: ValidatingExchangesType>, - ValidatingQuorumEx: ConsensusExchange< - TYPES, - Message, - Certificate = QuorumCertificate>, - Commitment = ValidatingLeaf, - >, -{ - /// Run one view of the next leader task - /// # Panics - /// While we are unwrapping, this function can logically never panic - /// unless there is a bug in std - #[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Next Validating ValidatingLeader Task", level = "error")] - pub async fn run_view(self) -> QuorumCertificate> { - info!("Next validating leader task started!"); - - let vote_collection_start = Instant::now(); - - let mut qcs = HashSet::>>::new(); - qcs.insert(self.generic_qc.clone()); - - let mut accumlator = VoteAccumulator { - total_vote_outcomes: HashMap::new(), - yes_vote_outcomes: HashMap::new(), - no_vote_outcomes: HashMap::new(), - viewsync_precommit_vote_outcomes: HashMap::new(), - success_threshold: self.exchange.success_threshold(), - failure_threshold: self.exchange.failure_threshold(), - }; - - let lock = self.vote_collection_chan.lock().await; - while let Ok(msg) = lock.recv().await { - // If the message is for a different view number, skip it. - if Into::>::into(msg.clone()).view_number() != self.cur_view { - continue; - } - match msg { - ProcessedGeneralConsensusMessage::Vote(vote_message, sender) => { - match vote_message { - QuorumVote::Yes(vote) | QuorumVote::No(vote) => { - if vote.signature.0 - != ::to_bytes(&sender) - { - continue; - } - match self.exchange.accumulate_vote( - &vote.signature.0, - &vote.signature.1, - vote.leaf_commitment, - vote.vote_data, - vote.vote_token.clone(), - self.cur_view, - accumlator, - None, - ) { - Either::Left(acc) => { - accumlator = acc; - } - Either::Right(qc) => { - self.metrics - .vote_validate_duration - .add_point(vote_collection_start.elapsed().as_secs_f64()); - return qc; - } - } - } - QuorumVote::Timeout(vote) => { - qcs.insert(vote.justify_qc); - } - } - } - ProcessedGeneralConsensusMessage::InternalTrigger(trigger) => match trigger { - InternalTrigger::Timeout(_) => { - self.api.send_next_leader_timeout(self.cur_view).await; - break; - } - }, - ProcessedGeneralConsensusMessage::Proposal(_p, _sender) => { - warn!("The next leader has received an unexpected proposal!"); - } - ProcessedGeneralConsensusMessage::ViewSyncCertificate(_) => todo!(), - ProcessedGeneralConsensusMessage::ViewSyncVote(_) => todo!(), - } - } - - qcs.into_iter() - .max_by_key(hotshot_types::traits::election::SignedCertificate::view_number) - .unwrap() - } -} diff --git a/consensus/src/replica.rs b/consensus/src/replica.rs index fb8a1e9269..e69de29bb2 100644 --- a/consensus/src/replica.rs +++ b/consensus/src/replica.rs @@ -1,549 +0,0 @@ -//! Contains the [`Replica`] struct used for the replica step in the hotstuff consensus algorithm. - -use crate::{ - utils::{Terminator, View, ViewInner}, - Consensus, ValidatingConsensusApi, -}; -use async_compatibility_layer::channel::UnboundedReceiver; -use async_lock::{Mutex, RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}; -use bincode::Options; -use commit::Committable; -use hotshot_types::traits::election::ConsensusExchange; -use hotshot_types::traits::election::QuorumExchangeType; -use hotshot_types::traits::node_implementation::{ - NodeImplementation, QuorumProposalType, ValidatingExchangesType, ValidatingQuorumEx, -}; -use hotshot_types::{ - certificate::QuorumCertificate, - data::{ValidatingLeaf, ValidatingProposal}, - message::{ - ConsensusMessageType, InternalTrigger, Message, ProcessedGeneralConsensusMessage, - ValidatingMessage, - }, - traits::{ - consensus_type::validating_consensus::ValidatingConsensus, node_implementation::NodeType, - signature_key::SignatureKey, Block, State, - }, - vote::QuorumVote, -}; -use hotshot_utils::bincode::bincode_opts; -use std::marker::PhantomData; -use std::ops::Bound::{Excluded, Included}; -use std::{collections::HashSet, sync::Arc}; -use tracing::{error, info, instrument, warn}; - -/// This view's replica -#[derive(Debug, Clone)] -pub struct Replica< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, -> where - I::Exchanges: ValidatingExchangesType>, -{ - /// id of node - pub id: u64, - /// Reference to consensus. Replica will require a write lock on this. - pub consensus: Arc>>>, - /// channel for accepting leader proposals and timeouts messages - #[allow(clippy::type_complexity)] - pub proposal_collection_chan: - Arc>>>, - /// view number this view is executing in - pub cur_view: TYPES::Time, - /// genericQC from the pseudocode - pub high_qc: QuorumCertificate>, - /// hotshot consensus api - pub api: A, - - /// quorum exchange - pub exchange: Arc>, - - /// neeeded to typecheck - pub _pd: PhantomData, -} - -impl< - A: ValidatingConsensusApi, I>, - TYPES: NodeType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, - > Replica -where - I::Exchanges: ValidatingExchangesType>, - ValidatingQuorumEx: ConsensusExchange< - TYPES, - Message, - Proposal = ValidatingProposal>, - Certificate = QuorumCertificate>, - Commitment = ValidatingLeaf, - >, -{ - /// portion of the replica task that spins until a valid QC can be signed or - /// timeout is hit. - #[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Replica Task", level = "error")] - #[allow(clippy::type_complexity)] - async fn find_valid_msg<'a>( - &self, - view_leader_key: TYPES::SignatureKey, - consensus: RwLockUpgradableReadGuard<'a, Consensus>>, - ) -> ( - RwLockUpgradableReadGuard<'a, Consensus>>, - Option>, - ) { - let lock = self.proposal_collection_chan.lock().await; - let mut invalid_qcs = 0; - let leaf = loop { - let msg = lock.recv().await; - info!("recv-ed message {:?}", msg.clone()); - if let Ok(msg) = msg { - // stale/newer view messages should never reach this specific task's receive channel - if Into::>::into(msg.clone()).view_number() != self.cur_view - { - continue; - } - match msg { - ProcessedGeneralConsensusMessage::Proposal(p, sender) => { - if view_leader_key != sender { - continue; - } - - let Some(parent) = consensus.saved_leaves.get(&p.data.parent_commitment) - else { - warn!("Proposal's parent missing from storage"); - continue; - }; - - let justify_qc = p.data.justify_qc; - - // go no further if the parent view number does not - // match the justify_qc. We can't accept this - if parent.view_number != justify_qc.view_number { - warn!( - "Inconsistency in recv-ed proposal. The parent's view number, {:?} did not match the justify_qc view number, {:?}", - parent.view_number, justify_qc.view_number - ); - continue; - } - - // check that the chain height is correct - if p.data.height != parent.height + 1 { - warn!( - "Incorrect height in recv-ed proposal. The parent's height, {}, did not follow from the proposal's height, {}", - parent.height, p.data.height - ); - continue; - } - - // check that the justify_qc is valid - if !self - .exchange - .is_valid_cert(&justify_qc, justify_qc.leaf_commitment) - { - invalid_qcs += 1; - warn!("Invalid justify_qc in proposal! Skipping proposal."); - continue; - } - - // check that we can indeed create the state - let leaf = if let Ok(state) = - parent.state.append(&p.data.deltas, &self.cur_view) - { - // check the commitment - if state.commit() != p.data.state_commitment { - warn!("Rejected proposal! After applying deltas to parent state, resulting commitment did not match proposal's"); - continue; - } - ValidatingLeaf::new( - state, - p.data.deltas, - p.data.parent_commitment, - justify_qc.clone(), - self.cur_view, - p.data.height, - Vec::new(), - time::OffsetDateTime::now_utc().unix_timestamp_nanos(), - p.data.proposer_id, - ) - } else { - warn!("State of proposal didn't match parent + deltas"); - continue; - }; - - if !view_leader_key.validate(&p.signature, leaf.commit().as_ref()) { - warn!(?p.signature, "Could not verify proposal."); - continue; - } - - let liveness_check = justify_qc.view_number > consensus.locked_view; - - // check if proposal extends from the locked leaf - let outcome = consensus.visit_leaf_ancestors( - parent.view_number, - Terminator::Inclusive(consensus.locked_view), - false, - |leaf| { - // if leaf view no == locked view no then we're done, report success by - // returning true - leaf.view_number != consensus.locked_view - }, - ); - - let safety_check = outcome.is_ok(); - - if let Err(e) = outcome { - self.api.send_view_error(self.cur_view, Arc::new(e)).await; - } - - // NOTE safenode check is here - // if !safenode, continue - // if !(safety_check || liveness_check) - // if !safety_check && !liveness_check - if !safety_check && !liveness_check { - warn!("Failed safety check and liveness check"); - continue; - } - - let leaf_commitment = leaf.commit(); - let vote_token = self.exchange.make_vote_token(self.cur_view); - - match vote_token { - Err(e) => { - error!( - "Failed to generate vote token for {:?} {:?}", - self.cur_view, e - ); - } - Ok(None) => { - info!("We were not chosen for committee on {:?}", self.cur_view); - } - Ok(Some(vote_token)) => { - info!("We were chosen for committee on {:?}", self.cur_view); - - // Generate and send vote - // TODO ED Will remove the below code once actual tests are in place - // let message = if self.id % 2 == 0 { - // self.exchange.create_no_message( - // leaf.justify_qc.commit(), - // leaf_commitment, - // self.cur_view, - // vote_token, - // ) - // } else if self.id % 5 == 0 { - // self.exchange.create_timeout_message( - // leaf.justify_qc.clone(), - // self.cur_view, - // vote_token, - // ) - // } else { - // self.exchange.create_yes_message( - // leaf.justify_qc.commit(), - // leaf_commitment, - // self.cur_view, - // vote_token, - // ) - // }; - - let message = self.exchange.create_yes_message( - leaf.justify_qc.commit(), - leaf_commitment, - self.cur_view, - vote_token, - ); - - let next_leader = self.exchange.get_leader(self.cur_view + 1); - - info!("Sending vote to next leader {:?}", message); - if self - .api - .send_direct_message::, QuorumVote>>(next_leader, ValidatingMessage(message)) - .await - .is_err() - { - consensus.metrics.failed_to_send_messages.add(1); - warn!("Failed to send vote to next leader"); - } else { - consensus.metrics.outgoing_direct_messages.add(1); - } - } - } - break leaf; - } - ProcessedGeneralConsensusMessage::InternalTrigger(trigger) => { - match trigger { - InternalTrigger::Timeout(_) => { - let next_leader = self.exchange.get_leader(self.cur_view + 1); - - consensus.metrics.number_of_timeouts.add(1); - - let vote_token = self.exchange.make_vote_token(self.cur_view); - - match vote_token { - Err(e) => { - error!( - "Failed to generate vote token for {:?} {:?}", - self.cur_view, e - ); - } - Ok(None) => { - info!( - "We were not chosen for committee on {:?}", - self.cur_view - ); - } - Ok(Some(vote_token)) => { - let timeout_msg = self.exchange.create_timeout_message( - self.high_qc.clone(), - self.cur_view, - vote_token, - ); - warn!( - "Timed out! Sending timeout to next leader {:?}", - timeout_msg - ); - - // send timedout message to the next leader - if let Err(e) = self - .api - .send_direct_message::, QuorumVote>>( - next_leader.clone(), - ValidatingMessage(timeout_msg), - ) - .await - { - consensus.metrics.failed_to_send_messages.add(1); - warn!( - ?next_leader, - ?e, - "Could not send time out message to next_leader" - ); - } else { - consensus.metrics.outgoing_direct_messages.add(1); - } - - // exits from entire function - self.api.send_replica_timeout(self.cur_view).await; - } - } - return (consensus, None); - } - } - } - ProcessedGeneralConsensusMessage::Vote(_, _) => { - // should only be for leader, never replica - warn!("Replica receieved a vote message. This is not what the replica expects. Skipping."); - continue; - } - ProcessedGeneralConsensusMessage::ViewSyncCertificate(_) => unimplemented!(), - ProcessedGeneralConsensusMessage::ViewSyncVote(_) => unimplemented!(), - } - } - // fall through logic if we did not receive successfully from channel - warn!("Replica did not receive successfully from channel. Terminating Replica."); - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.invalid_qc += invalid_qcs; - self.api.send_replica_timeout(self.cur_view).await; - return (RwLockWriteGuard::downgrade_to_upgradable(consensus), None); - }; - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.invalid_qc += invalid_qcs; - ( - RwLockWriteGuard::downgrade_to_upgradable(consensus), - Some(leaf), - ) - } - - /// run one view of replica - /// returns the `high_qc` - #[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Replica Task", level = "error")] - pub async fn run_view(self) -> QuorumCertificate> { - info!("Replica task started!"); - let consensus = self.consensus.upgradable_read().await; - let view_leader_key = self.exchange.get_leader(self.cur_view); - - let (consensus, maybe_leaf) = self.find_valid_msg(view_leader_key, consensus).await; - - let Some(leaf) = maybe_leaf else { - // we either timed out or for some reason - // could not accept a proposal - return self.high_qc; - }; - - let mut new_anchor_view = consensus.last_decided_view; - let mut new_locked_view = consensus.locked_view; - let mut last_view_number_visited = self.cur_view; - let mut new_commit_reached: bool = false; - let mut new_decide_reached = false; - let mut new_decide_qc = None; - let mut leaf_views = Vec::new(); - let mut included_txns = HashSet::new(); - let old_anchor_view = consensus.last_decided_view; - let parent_view = leaf.justify_qc.view_number; - let mut current_chain_length = 0usize; - if parent_view + 1 == self.cur_view { - current_chain_length += 1; - if let Err(e) = consensus.visit_leaf_ancestors( - parent_view, - Terminator::Exclusive(old_anchor_view), - true, - |leaf| { - if !new_decide_reached { - if last_view_number_visited == leaf.view_number + 1 { - last_view_number_visited = leaf.view_number; - current_chain_length += 1; - if current_chain_length == 2 { - new_locked_view = leaf.view_number; - new_commit_reached = true; - // The next leaf in the chain, if there is one, is decided, so this - // leaf's justify_qc would become the QC for the decided chain. - new_decide_qc = Some(leaf.justify_qc.clone()); - } else if current_chain_length == 3 { - new_anchor_view = leaf.view_number; - new_decide_reached = true; - } - } else { - // nothing more to do here... we don't have a new chain extension - return false; - } - } - // starting from the first iteration with a three chain, e.g. right after the else if case nested in the if case above - if new_decide_reached { - leaf_views.push(leaf.clone()); - let txns = leaf.deltas.contained_transactions(); - for txn in txns { - included_txns.insert(txn); - } - } - true - }, - ) { - self.api.send_view_error(self.cur_view, Arc::new(e)).await; - } - } - let high_qc = leaf.justify_qc.clone(); - - let included_txns_set: HashSet<_> = if new_decide_reached { - included_txns - } else { - HashSet::new() - }; - - // promote lock here - let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await; - consensus.state_map.insert( - self.cur_view, - View { - view_inner: ViewInner::Leaf { - leaf: leaf.commit(), - }, - }, - ); - - consensus.metrics.number_of_views_since_last_commit.set( - consensus - .state_map - .range(( - Excluded(consensus.last_decided_view), - Included(self.cur_view), - )) - .count(), - ); - - consensus.saved_leaves.insert(leaf.commit(), leaf.clone()); - if new_commit_reached { - consensus.locked_view = new_locked_view; - } - #[allow(clippy::cast_precision_loss)] - if new_decide_reached { - let num_views_since_last_anchor = - (*self.cur_view - *consensus.last_decided_view) as f64; - let views_seen = consensus - .state_map - .range(( - Excluded(consensus.last_decided_view), - Included(self.cur_view), - )) - .count(); - // A count of all veiws we saw that aren't in the current chain (so won't be commited) - consensus - .metrics - .discarded_views_per_decide_event - .add_point((views_seen - current_chain_length) as f64); - // An empty view is one we didn't see a leaf for but we moved past that view number - consensus - .metrics - .empty_views_per_decide_event - .add_point(num_views_since_last_anchor - views_seen as f64); - consensus - .metrics - .number_of_views_per_decide_event - .add_point(num_views_since_last_anchor); - consensus - .metrics - .invalid_qc_views - .add_point(consensus.invalid_qc as f64); - - let mut included_txn_size = 0; - consensus - .transactions - .modify(|txns| { - *txns = txns - .drain() - .filter(|(txn_hash, txn)| { - if included_txns_set.contains(txn_hash) { - included_txn_size += - bincode_opts().serialized_size(txn).unwrap_or_default(); - false - } else { - true - } - }) - .collect(); - }) - .await; - consensus - .metrics - .outstanding_transactions - .update(-(included_txns_set.len() as i64)); - consensus - .metrics - .outstanding_transactions_memory_size - .update(-(i64::try_from(included_txn_size).unwrap_or(i64::MAX))); - - consensus - .metrics - .rejected_transactions - .add(leaf.rejected.len()); - - let decide_sent = self.api.send_decide( - consensus.last_decided_view, - leaf_views, - new_decide_qc.unwrap(), - ); - let old_anchor_view = consensus.last_decided_view; - consensus - .collect_garbage(old_anchor_view, new_anchor_view) - .await; - consensus.last_decided_view = new_anchor_view; - consensus.invalid_qc = 0; - - // We're only storing the last QC. We could store more but we're realistically only going to retrieve the last one. - if let Err(e) = self.api.store_leaf(old_anchor_view, leaf).await { - error!("Could not insert new anchor into the storage API: {:?}", e); - } - - decide_sent.await; - } - high_qc - } -} diff --git a/consensus/src/sequencing_leader.rs b/consensus/src/sequencing_leader.rs index e85c7ccc43..77d0e271dd 100644 --- a/consensus/src/sequencing_leader.rs +++ b/consensus/src/sequencing_leader.rs @@ -30,7 +30,6 @@ use hotshot_types::{ ProcessedSequencingMessage, Proposal, SequencingMessage, }, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::SignedCertificate, node_implementation::{CommitteeEx, NodeType, SequencingQuorumEx}, signature_key::SignatureKey, @@ -47,7 +46,7 @@ use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone)] pub struct DALeader< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -81,7 +80,7 @@ pub struct DALeader< } impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -315,7 +314,7 @@ where /// For now this step happens after the `DALeader` completes it's proposal and collects enough votes. pub struct ConsensusLeader< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -349,7 +348,7 @@ pub struct ConsensusLeader< } impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -414,7 +413,7 @@ where /// Implenting the next leader. Collect votes on the previous leaders proposal and return the QC pub struct ConsensusNextLeader< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -446,7 +445,7 @@ pub struct ConsensusNextLeader< impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, diff --git a/consensus/src/sequencing_replica.rs b/consensus/src/sequencing_replica.rs index 2d54b4ee0c..835dd31b7f 100644 --- a/consensus/src/sequencing_replica.rs +++ b/consensus/src/sequencing_replica.rs @@ -27,7 +27,7 @@ use hotshot_types::{ ProcessedGeneralConsensusMessage, ProcessedSequencingMessage, SequencingMessage, }, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::SignedCertificate, + election::SignedCertificate, node_implementation::NodeType, signature_key::SignatureKey, Block, }, }; @@ -41,7 +41,7 @@ use tracing::{error, info, instrument, warn}; #[derive(Debug, Clone)] pub struct SequencingReplica< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, @@ -77,7 +77,7 @@ pub struct SequencingReplica< impl< A: SequencingConsensusApi, I>, - TYPES: NodeType, + TYPES: NodeType, I: NodeImplementation< TYPES, Leaf = SequencingLeaf, diff --git a/consensus/src/traits.rs b/consensus/src/traits.rs index 2fc2ab0f3f..b38f155bec 100644 --- a/consensus/src/traits.rs +++ b/consensus/src/traits.rs @@ -3,19 +3,16 @@ use async_trait::async_trait; use hotshot_types::certificate::QuorumCertificate; use hotshot_types::message::DataMessage; -use hotshot_types::message::{Message, SequencingMessage, ValidatingMessage}; +use hotshot_types::message::{Message, SequencingMessage}; use hotshot_types::traits::node_implementation::{ - NodeImplementation, NodeType, SequencingExchangesType, ValidatingExchangesType, + NodeImplementation, NodeType, SequencingExchangesType, }; use hotshot_types::traits::storage::StorageError; use hotshot_types::{ - data::{LeafType, ProposalType, ValidatingLeaf}, + data::{LeafType, ProposalType}, error::HotShotError, event::{Event, EventType}, traits::{ - consensus_type::{ - sequencing_consensus::SequencingConsensus, validating_consensus::ValidatingConsensus, - }, network::NetworkError, signature_key::SignatureKey, }, @@ -127,46 +124,10 @@ pub trait ConsensusSharedApi< } } -/// The API that [`HotStuff`] needs to talk to the system for validating consensus. -#[async_trait] -pub trait ValidatingConsensusApi< - TYPES: NodeType, - LEAF: LeafType, - I: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - ConsensusMessage = ValidatingMessage, - >, ->: ConsensusSharedApi where - I::Exchanges: ValidatingExchangesType>, -{ - /// Send a direct message to the given recipient - async fn send_direct_message, VOTE: VoteType>( - &self, - recipient: TYPES::SignatureKey, - message: ValidatingMessage, - ) -> std::result::Result<(), NetworkError>; - - /// Send a broadcast message to the entire network. - async fn send_broadcast_message< - PROPOSAL: ProposalType, - VOTE: VoteType, - >( - &self, - message: ValidatingMessage, - ) -> std::result::Result<(), NetworkError>; - - /// Send a message with a transaction. - async fn send_transaction( - &self, - message: DataMessage, - ) -> std::result::Result<(), NetworkError>; -} - /// The API that [`HotStuff`] needs to talk to the system, for sequencing consensus. #[async_trait] pub trait SequencingConsensusApi< - TYPES: NodeType, + TYPES: NodeType, LEAF: LeafType, I: NodeImplementation>, >: ConsensusSharedApi where diff --git a/examples/infra/mod.rs b/examples/infra/mod.rs index e535987023..aa054fc67b 100644 --- a/examples/infra/mod.rs +++ b/examples/infra/mod.rs @@ -1,70 +1,23 @@ -use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; -use async_lock::RwLock; -use async_trait::async_trait; use clap::Parser; -use futures::StreamExt; -use hotshot::{ - traits::{ - implementations::{ - Libp2pCommChannel, Libp2pNetwork, MemoryStorage, WebCommChannel, WebServerNetwork, - }, - NodeImplementation, Storage, - }, - types::{SignatureKey, SystemContextHandle}, - SystemContext, ViewRunner, -}; use hotshot_orchestrator::{ self, - client::{OrchestratorClient, ValidatorArgs}, - config::{NetworkConfig, NetworkConfigFile, WebServerConfig}, + config::{NetworkConfig, NetworkConfigFile}, }; -use hotshot_task::task::FilterEvent; -use hotshot_types::event::{Event, EventType}; -use hotshot_types::traits::election::ConsensusExchange; -use hotshot_types::traits::state::ConsensusTime; -use hotshot_types::vote::ViewSyncVote; -use hotshot_types::{ - data::{TestableLeaf, ValidatingLeaf, ValidatingProposal}, - message::ValidatingMessage, - traits::{ - consensus_type::validating_consensus::ValidatingConsensus, - election::{Membership, ViewSyncExchange}, - metrics::NoMetrics, - network::CommunicationChannel, - node_implementation::{ExchangesType, NodeType, ValidatingExchanges}, - state::{TestableBlock, TestableState}, - }, - vote::QuorumVote, - HotShotConfig, -}; -use hotshot_types::{message::Message, traits::election::QuorumExchange}; +use hotshot_types::traits::signature_key::SignatureKey; +use hotshot_types::traits::{node_implementation::NodeType}; use libp2p::{ identity::{ ed25519::{Keypair as EdKeypair, SecretKey}, Keypair, }, - multiaddr::{self, Protocol}, + multiaddr::{self}, Multiaddr, }; -use libp2p_identity::PeerId; -use libp2p_networking::network::{MeshParams, NetworkNodeConfigBuilder, NetworkNodeType}; -#[allow(deprecated)] -use nll::nll_todo::nll_todo; -use rand::SeedableRng; -use std::fmt::Debug; -use std::net::Ipv4Addr; +use std::{fmt::Debug, fs}; use std::{ - cmp, - collections::{BTreeSet, VecDeque}, - fs, mem, net::IpAddr, - num::NonZeroUsize, str::FromStr, - sync::Arc, - time::Instant, }; -#[allow(deprecated)] -use tracing::error; // ORCHESTRATOR @@ -110,390 +63,6 @@ pub fn load_config_from_file( config } -/// Runs the orchestrator -pub async fn run_orchestrator< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > + Debug, - VIEWSYNCNETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - > + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - VIEWSYNCNETWORK, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, ->( - OrchestratorArgs { - host, - port, - config_file, - }: OrchestratorArgs, -) { - error!("Starting orchestrator",); - let run_config = load_config_from_file::(config_file); - let _result = hotshot_orchestrator::run_orchestrator::< - TYPES::SignatureKey, - TYPES::ElectionConfigType, - >(run_config, host, port) - .await; -} - -/// Defines the behavior of a "run" of the network with a given configuration -#[async_trait] -pub trait Run< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > + Debug, - VIEWSYNCNETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - > + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - VIEWSYNCNETWORK, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, -> where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, - Self: Sync, -{ - /// Initializes networking, returns self - async fn initialize_networking( - config: NetworkConfig, - ) -> Self; - - /// Initializes the genesis state and HotShot instance; does not start HotShot consensus - /// # Panics if it cannot generate a genesis block, fails to initialize HotShot, or cannot - /// get the anchored view - async fn initialize_state_and_hotshot( - &self, - ) -> (TYPES::StateType, SystemContextHandle) { - let genesis_block = TYPES::BlockType::genesis(); - let initializer = - hotshot::HotShotInitializer::>::from_genesis( - genesis_block, - ) - .expect("Couldn't generate genesis block"); - - let config = self.get_config(); - - let (pk, sk) = - TYPES::SignatureKey::generated_from_seed_indexed(config.seed, config.node_index); - let ek = jf_primitives::aead::KeyPair::generate(&mut rand_chacha::ChaChaRng::from_seed( - [0u8; 32], - )); - let known_nodes = config.config.known_nodes.clone(); - - let network = self.get_network(); - - // Since we do not currently pass the election config type in the NetworkConfig, this will always be the default election config - let election_config = config.config.election_config.clone().unwrap_or_else(|| { - , - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - > as ConsensusExchange>>::Membership::default_election_config( - config.config.total_nodes.get() as u64 - ) - }); - - let exchanges = NODE::Exchanges::create( - known_nodes.clone(), - (election_config.clone(), ()), - //Kaley todo: add view sync network - #[allow(deprecated)] - (network.clone(), nll_todo(), ()), - pk.clone(), - sk.clone(), - ek.clone(), - ); - let hotshot = SystemContext::init( - pk, - sk, - config.node_index, - config.config, - MemoryStorage::empty(), - exchanges, - initializer, - NoMetrics::boxed(), - ) - .await - .expect("Could not init hotshot"); - - let state = hotshot - .storage() - .get_anchored_view() - .await - .expect("Couldn't get HotShot's anchored view") - .state; - (state, hotshot) - } - - /// Starts HotShot consensus, returns when consensus has finished - async fn run_hotshot(&self, mut context: SystemContextHandle) { - let NetworkConfig { - padding, - rounds, - transactions_per_round, - node_index, - config: HotShotConfig { total_nodes, .. }, - .. - } = self.get_config(); - - let size = mem::size_of::(); - let adjusted_padding = if padding < size { 0 } else { padding - size }; - let mut txns: VecDeque = VecDeque::new(); - let state = context.get_state().await; - - // This assumes that no node will be a leader more than 5x the expected number of times they should be the leader - // FIXME is this a reasonable assumption when we start doing DA? - // TODO ED: In the future we should have each node generate transactions every round to simulate a more realistic network - let tx_to_gen = transactions_per_round * (cmp::max(rounds / total_nodes, 1) + 5); - { - let mut txn_rng = rand::thread_rng(); - for _ in 0..tx_to_gen { - let txn = - <::StateType as TestableState>::create_random_transaction( - Some(&state), - &mut txn_rng, - padding as u64, - ); - txns.push_back(txn); - } - } - error!("Generated {} transactions", tx_to_gen); - - error!("Adjusted padding size is {:?} bytes", adjusted_padding); - let mut round = 1; - let mut total_transactions = 0; - - let start = Instant::now(); - - error!("Starting hotshot!"); - context.hotshot.start_consensus().await; - let (mut event_stream, _streamid) = context.get_event_stream(FilterEvent::default()).await; - let mut anchor_view: TYPES::Time = ::genesis(); - let mut num_successful_commits = 0; - - let total_nodes_u64 = total_nodes.get() as u64; - - let mut should_submit_txns = node_index == (*anchor_view % total_nodes_u64); - - loop { - if should_submit_txns { - for _ in 0..transactions_per_round { - let _txn = txns.pop_front().unwrap(); - tracing::info!("Submitting txn on round {}", round); - // context.submit_transaction(txn).await.unwrap(); - total_transactions += 1; - } - should_submit_txns = false; - } - - match event_stream.next().await { - None => { - panic!("Error! Event stream completed before consensus ended."); - } - Some(Event { event, .. }) => { - match event { - EventType::Error { error } => { - error!("Error in consensus: {:?}", error); - // TODO what to do here - } - EventType::Decide { - leaf_chain, - qc: _, - block_size: _, - } => { - // this might be a obob - if let Some(leaf) = leaf_chain.get(0) { - let new_anchor = leaf.view_number; - if new_anchor >= anchor_view { - anchor_view = leaf.view_number; - } - if (*anchor_view % total_nodes_u64) == node_index { - should_submit_txns = true; - } - } - num_successful_commits += leaf_chain.len(); - if num_successful_commits >= rounds { - break; - } - // when we make progress, submit new events - } - EventType::ReplicaViewTimeout { view_number } => { - error!("Timed out as a replicas in view {:?}", view_number); - } - EventType::NextLeaderViewTimeout { view_number } => { - error!("Timed out as the next leader in view {:?}", view_number); - } - EventType::ViewFinished { view_number } => { - tracing::info!("view finished: {:?}", view_number); - } - _ => unimplemented!(), - } - } - } - - round += 1; - } - - // while round <= rounds { - // error!("Round {}:", round); - // - // let num_submitted = - // if node_index == ((round % total_nodes) as u64) { - // for _ in 0..transactions_per_round { - // let txn = txns.pop_front().unwrap(); - // tracing::info!("Submitting txn on round {}", round); - // hotshot.submit_transaction(txn).await.unwrap(); - // } - // transactions_per_round - // } else { - // 0 - // }; - // error!("Submitting {} transactions", num_submitted); - // - // // Start consensus - // let view_results = nll_todo(); - // - // match view_results { - // Ok((leaf_chain, _qc)) => { - // let blocks: Vec = leaf_chain - // .into_iter() - // .map(|leaf| leaf.get_deltas()) - // .collect(); - // - // for block in blocks { - // total_transactions += block.txn_count(); - // } - // } - // Err(e) => { - // timed_out_views += 1; - // error!("View: {:?}, failed with : {:?}", round, e); - // } - // } - // - // round += 1; - // } - // - let total_time_elapsed = start.elapsed(); - let total_size = total_transactions * (padding as u64); - // - // // This assumes all transactions that were submitted made it through consensus, and does not account for the genesis block - error!("All {rounds} rounds completed in {total_time_elapsed:?}. {total_size} total bytes submitted"); - } - - /// Returns the network for this run - fn get_network(&self) -> NETWORK; - - /// Returns view sync network for this run KALEY TODO - //fn get_view_sync_network(&self) -> VIEWSYNCNETWORK; - - /// Returns the config for this run - fn get_config(&self) -> NetworkConfig; -} - -type Proposal = ValidatingProposal>; - -// LIBP2P - -/// Represents a libp2p-based run -pub struct Libp2pRun< - TYPES: NodeType, - I: NodeImplementation, - MEMBERSHIP: Membership, -> { - _bootstrap_nodes: Vec<(PeerId, Multiaddr)>, - _node_type: NetworkNodeType, - _bound_addr: Multiaddr, - /// for libp2p layer - _identity: Keypair, - - network: Libp2pCommChannel< - TYPES, - I, - Proposal, - QuorumVote>, - MEMBERSHIP, - >, - // view_sync_network: Libp2pCommChannel< - // TYPES, - // I, - // Proposal, - // ViewSyncVote, - // MEMBERSHIP, - // >, - config: - NetworkConfig<::SignatureKey, ::ElectionConfigType>, -} - /// yeesh maybe we should just implement SignatureKey for this... pub fn libp2p_generate_indexed_identity(seed: [u8; 32], index: u64) -> Keypair { let mut hasher = blake3::Hasher::new(); @@ -521,459 +90,3 @@ pub fn parse_ip(s: &str) -> Result { let port = i.next().ok_or(multiaddr::Error::InvalidMultiaddr)?; Multiaddr::from_str(&format!("/ip4/{ip}/tcp/{port}")) } - -#[async_trait] -impl< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, - > - Run< - TYPES, - MEMBERSHIP, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - NODE, - > for Libp2pRun -where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, - Self: Sync, -{ - async fn initialize_networking( - config: NetworkConfig, - ) -> Libp2pRun { - let (pubkey, _privkey) = - <::SignatureKey as SignatureKey>::generated_from_seed_indexed( - config.seed, - config.node_index, - ); - let mut config = config; - let libp2p_config = config - .libp2p_config - .take() - .expect("Configuration is not for a Libp2p network"); - let bs_len = libp2p_config.bootstrap_nodes.len(); - let bootstrap_nodes: Vec<(PeerId, Multiaddr)> = libp2p_config - .bootstrap_nodes - .iter() - .map(|(addr, pair)| { - let kp = Keypair::from_protobuf_encoding(pair).unwrap(); - let peer_id = PeerId::from_public_key(&kp.public()); - let mut multiaddr = Multiaddr::from(addr.ip()); - multiaddr.push(Protocol::Tcp(addr.port())); - (peer_id, multiaddr) - }) - .collect(); - let identity = libp2p_generate_indexed_identity(config.seed, config.node_index); - let node_type = if (config.node_index as usize) < bs_len { - NetworkNodeType::Bootstrap - } else { - NetworkNodeType::Regular - }; - let node_index = config.node_index; - let port_index = match libp2p_config.index_ports { - true => node_index, - false => 0, - }; - let bound_addr: Multiaddr = format!( - "/{}/{}/tcp/{}", - if libp2p_config.public_ip.is_ipv4() { - "ip4" - } else { - "ip6" - }, - libp2p_config.public_ip, - libp2p_config.base_port as u64 + port_index - ) - .parse() - .unwrap(); - - // generate network - let mut config_builder = NetworkNodeConfigBuilder::default(); - assert!(config.config.total_nodes.get() > 2); - let replicated_nodes = NonZeroUsize::new(config.config.total_nodes.get() - 2).unwrap(); - config_builder.replication_factor(replicated_nodes); - config_builder.identity(identity.clone()); - - config_builder.bound_addr(Some(bound_addr.clone())); - - let to_connect_addrs = bootstrap_nodes - .iter() - .map(|(peer_id, multiaddr)| (Some(*peer_id), multiaddr.clone())) - .collect(); - - config_builder.to_connect_addrs(to_connect_addrs); - - let mesh_params = - // NOTE I'm arbitrarily choosing these. - match node_type { - NetworkNodeType::Bootstrap => MeshParams { - mesh_n_high: libp2p_config.bootstrap_mesh_n_high, - mesh_n_low: libp2p_config.bootstrap_mesh_n_low, - mesh_outbound_min: libp2p_config.bootstrap_mesh_outbound_min, - mesh_n: libp2p_config.bootstrap_mesh_n, - }, - NetworkNodeType::Regular => MeshParams { - mesh_n_high: libp2p_config.mesh_n_high, - mesh_n_low: libp2p_config.mesh_n_low, - mesh_outbound_min: libp2p_config.mesh_outbound_min, - mesh_n: libp2p_config.mesh_n, - }, - NetworkNodeType::Conductor => unreachable!(), - }; - config_builder.mesh_params(Some(mesh_params)); - - let node_config = config_builder.build().unwrap(); - let network = Arc::new( - Libp2pNetwork::new( - NoMetrics::boxed(), - node_config, - pubkey.clone(), - Arc::new(RwLock::new( - bootstrap_nodes - .iter() - .map(|(peer_id, addr)| (Some(*peer_id), addr.clone())) - .collect(), - )), - bs_len, - config.node_index as usize, - // NOTE: this introduces an invariant that the keys are assigned using this indexed - // function - { - let mut keys = BTreeSet::new(); - for i in 0..config.config.total_nodes.get() { - let pk = - ::generated_from_seed_indexed( - config.seed, - i as u64, - ) - .0; - keys.insert(pk); - } - keys - }, - BTreeSet::new(), - ) - .await - .unwrap(), - ); - - let comm_channel = Libp2pCommChannel::< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >::new(network); - - comm_channel.wait_for_ready().await; - - Libp2pRun { - config, - - _bootstrap_nodes: bootstrap_nodes, - _node_type: node_type, - _identity: identity, - _bound_addr: bound_addr, - // _socket: stream, - network: comm_channel, - } - } - - fn get_config( - &self, - ) -> NetworkConfig<::SignatureKey, ::ElectionConfigType> - { - self.config.clone() - } - - fn get_network( - &self, - ) -> Libp2pCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > { - self.network.clone() - } -} - -// WEB SERVER - -/// Alias for the [`WebCommChannel`] for validating consensus. -type ValidatingWebCommChannel = - WebCommChannel, QuorumVote>, MEMBERSHIP>; - -/// Represents a web server-based run -pub struct WebServerRun< - TYPES: NodeType, - I: NodeImplementation, - MEMBERSHIP: Membership, -> { - config: NetworkConfig, - network: ValidatingWebCommChannel, -} - -#[async_trait] -impl< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, - > - Run< - TYPES, - MEMBERSHIP, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - >, - WebCommChannel< - TYPES, - NODE, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - >, - NODE, - > for WebServerRun -where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, - Self: Sync, -{ - async fn initialize_networking( - config: NetworkConfig, - ) -> WebServerRun { - // Generate our own key - let (pub_key, _priv_key) = - <::SignatureKey as SignatureKey>::generated_from_seed_indexed( - config.seed, - config.node_index, - ); - - // Get the configuration for the web server - let WebServerConfig { - host, - port, - wait_between_polls, - }: WebServerConfig = config.clone().web_server_config.unwrap(); - - // Create the network - let network: WebCommChannel< - TYPES, - NODE, - Proposal, - QuorumVote>, - MEMBERSHIP, - > = WebCommChannel::new( - WebServerNetwork::create(&host.to_string(), port, wait_between_polls, pub_key, false) - .into(), - ); - WebServerRun { config, network } - } - - fn get_network( - &self, - ) -> WebCommChannel< - TYPES, - NODE, - Proposal, - QuorumVote>, - MEMBERSHIP, - > { - self.network.clone() - } - - fn get_config(&self) -> NetworkConfig { - self.config.clone() - } -} - -/// Main entry point for validators -pub async fn main_entry_point< - TYPES: NodeType, - MEMBERSHIP: Membership + Debug, - NETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - QuorumVote>, - MEMBERSHIP, - > + Debug, - VIEWSYNCNETWORK: CommunicationChannel< - TYPES, - Message, - ValidatingProposal>, - ViewSyncVote, - MEMBERSHIP, - > + Debug, - NODE: NodeImplementation< - TYPES, - Leaf = ValidatingLeaf, - Exchanges = ValidatingExchanges< - TYPES, - Message, - QuorumExchange< - TYPES, - ValidatingLeaf, - ValidatingProposal>, - MEMBERSHIP, - NETWORK, - Message, - >, - ViewSyncExchange< - TYPES, - ValidatingProposal>, - MEMBERSHIP, - VIEWSYNCNETWORK, - Message, - >, - >, - Storage = MemoryStorage>, - ConsensusMessage = ValidatingMessage, - >, - RUN: Run, ->( - args: ValidatorArgs, -) where - ::StateType: TestableState, - ::BlockType: TestableBlock, - ValidatingLeaf: TestableLeaf, - SystemContext: ViewRunner, -{ - setup_logging(); - setup_backtrace(); - - error!("Starting validator"); - - let orchestrator_client: OrchestratorClient = - OrchestratorClient::connect_to_orchestrator(args.clone()).await; - - // Identify with the orchestrator - let public_ip = match args.public_ip { - Some(ip) => ip, - None => IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - }; - error!( - "Identifying with orchestrator using IP address {}", - public_ip.to_string() - ); - let node_index: u16 = orchestrator_client - .identify_with_orchestrator(public_ip.to_string()) - .await; - error!("Finished identifying; our node index is {node_index}"); - error!("Getting config from orchestrator"); - - let mut run_config = orchestrator_client - .get_config_from_orchestrator::(node_index) - .await; - - run_config.node_index = node_index.into(); - run_config.libp2p_config.as_mut().unwrap().public_ip = args.public_ip.unwrap(); - - error!("Initializing networking"); - let run = RUN::initialize_networking(run_config.clone()).await; - let (_state, hotshot) = run.initialize_state_and_hotshot().await; - - error!("Waiting for start command from orchestrator"); - orchestrator_client - .wait_for_all_nodes_ready(run_config.clone().node_index) - .await; - - error!("All nodes are ready! Starting HotShot"); - run.run_hotshot(hotshot).await; -} diff --git a/examples/infra/modDA.rs b/examples/infra/modDA.rs index 1edb2ea496..0e1a001b6b 100644 --- a/examples/infra/modDA.rs +++ b/examples/infra/modDA.rs @@ -38,7 +38,6 @@ use hotshot_types::{ data::{DAProposal, QuorumProposal, SequencingLeaf, TestableLeaf}, message::SequencingMessage, traits::{ - consensus_type::sequencing_consensus::SequencingConsensus, election::Membership, metrics::NoMetrics, network::CommunicationChannel, @@ -82,7 +81,7 @@ use tracing::warn; /// Runs the orchestrator pub async fn run_orchestrator_da< - TYPES: NodeType, + TYPES: NodeType, MEMBERSHIP: Membership + Debug, DANETWORK: CommunicationChannel< TYPES, @@ -150,7 +149,7 @@ pub async fn run_orchestrator_da< /// Defines the behavior of a "run" of the network with a given configuration #[async_trait] pub trait RunDA< - TYPES: NodeType, + TYPES: NodeType