Skip to content

Commit

Permalink
Merge pull request #1541 from EspressoSystems/jr/nuke_consensus_type
Browse files Browse the repository at this point in the history
Nuke validating consensus
  • Loading branch information
DieracDelta authored Aug 15, 2023
2 parents f193182 + 5cc17f9 commit 398c863
Show file tree
Hide file tree
Showing 45 changed files with 1,713 additions and 4,541 deletions.
16 changes: 5 additions & 11 deletions consensus/src/da_member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ use hotshot_types::{
certificate::QuorumCertificate,
data::SequencingLeaf,
message::{
ConsensusMessageType, Message, ProcessedCommitteeConsensusMessage,
ProcessedGeneralConsensusMessage, ProcessedSequencingMessage, SequencingMessage,
ConsensusMessageType, ProcessedCommitteeConsensusMessage, ProcessedGeneralConsensusMessage,
ProcessedSequencingMessage, SequencingMessage,
},
traits::{
consensus_type::sequencing_consensus::SequencingConsensus,
election::{CommitteeExchangeType, ConsensusExchange},
node_implementation::{
CommitteeEx, CommitteeProposalType, CommitteeVote, NodeImplementation, NodeType,
SequencingExchangesType,
},
signature_key::SignatureKey,
},
Expand All @@ -34,15 +32,13 @@ use tracing::{error, info, instrument, warn};
#[derive(Debug, Clone)]
pub struct DAMember<
A: SequencingConsensusApi<TYPES, SequencingLeaf<TYPES>, I>,
TYPES: NodeType<ConsensusType = SequencingConsensus>,
TYPES: NodeType,
I: NodeImplementation<
TYPES,
Leaf = SequencingLeaf<TYPES>,
ConsensusMessage = SequencingMessage<TYPES, I>,
>,
> where
I::Exchanges: SequencingExchangesType<TYPES, Message<TYPES, I>>,
{
> {
/// ID of node.
pub id: u64,
/// Reference to consensus. DA committee member will require a write lock on this.
Expand All @@ -67,15 +63,13 @@ pub struct DAMember<

impl<
A: SequencingConsensusApi<TYPES, SequencingLeaf<TYPES>, I>,
TYPES: NodeType<ConsensusType = SequencingConsensus>,
TYPES: NodeType,
I: NodeImplementation<
TYPES,
Leaf = SequencingLeaf<TYPES>,
ConsensusMessage = SequencingMessage<TYPES, I>,
>,
> DAMember<A, TYPES, I>
where
I::Exchanges: SequencingExchangesType<TYPES, Message<TYPES, I>>,
{
/// DA committee member task that spins until a valid DA proposal can be signed or timeout is
/// hit.
Expand Down
213 changes: 4 additions & 209 deletions consensus/src/leader.rs
Original file line number Diff line number Diff line change
@@ -1,234 +1,29 @@
//! Contains the [`ValidatingLeader`] struct used for the leader step in the hotstuff consensus algorithm.

use crate::{CommitmentMap, Consensus, ValidatingConsensusApi};
use crate::{CommitmentMap, Consensus};
use async_compatibility_layer::{
art::{async_sleep, async_timeout},
async_primitives::subscribable_rwlock::{ReadView, SubscribableRwLock},
};
use async_lock::RwLock;
use commit::Committable;
use hotshot_types::message::Message;
use hotshot_types::{
certificate::QuorumCertificate,
data::{ValidatingLeaf, ValidatingProposal},
message::GeneralConsensusMessage,
traits::{
consensus_type::validating_consensus::ValidatingConsensus,
election::SignedCertificate,
node_implementation::{
NodeImplementation, NodeType, QuorumProposalType, QuorumVoteType, ValidatingQuorumEx,
},
node_implementation::{NodeImplementation, NodeType, QuorumProposalType, QuorumVoteType},
signature_key::SignatureKey,
Block, State,
},
};
use hotshot_types::{message::Message, traits::node_implementation::ValidatingExchangesType};
use hotshot_types::{
message::{Proposal, ValidatingMessage},
message::Proposal,
traits::election::{ConsensusExchange, QuorumExchangeType},
};
use std::marker::PhantomData;
use std::{sync::Arc, time::Instant};
use tracing::{error, info, instrument, warn};
/// This view's validating leader
#[derive(Debug, Clone)]
pub struct ValidatingLeader<
A: ValidatingConsensusApi<TYPES, ValidatingLeaf<TYPES>, I>,
TYPES: NodeType<ConsensusType = ValidatingConsensus>,
I: NodeImplementation<
TYPES,
Leaf = ValidatingLeaf<TYPES>,
ConsensusMessage = ValidatingMessage<TYPES, I>,
>,
> where
I::Exchanges: ValidatingExchangesType<TYPES, Message<TYPES, I>>,
{
/// id of node
pub id: u64,
/// Reference to consensus. Validating leader will require a read lock on this.
pub consensus: Arc<RwLock<Consensus<TYPES, ValidatingLeaf<TYPES>>>>,
/// The `high_qc` per spec
pub high_qc: QuorumCertificate<TYPES, ValidatingLeaf<TYPES>>,
/// The view number we're running on
pub cur_view: TYPES::Time,
/// Lock over the transactions list
pub transactions: Arc<SubscribableRwLock<CommitmentMap<TYPES::Transaction>>>,
/// Limited access to the consensus protocol
pub api: A,

/// the quorum exchange
pub exchange: Arc<ValidatingQuorumEx<TYPES, I>>,

/// needed for type checking
pub _pd: PhantomData<I>,
}

impl<
A: ValidatingConsensusApi<TYPES, ValidatingLeaf<TYPES>, I>,
TYPES: NodeType<ConsensusType = ValidatingConsensus>,
I: NodeImplementation<
TYPES,
Leaf = ValidatingLeaf<TYPES>,
ConsensusMessage = ValidatingMessage<TYPES, I>,
>,
> ValidatingLeader<A, TYPES, I>
where
I::Exchanges: ValidatingExchangesType<TYPES, Message<TYPES, I>>,
ValidatingQuorumEx<TYPES, I>: ConsensusExchange<
TYPES,
Message<TYPES, I>,
Proposal = ValidatingProposal<TYPES, ValidatingLeaf<TYPES>>,
>,
{
/// Run one view of the leader task
#[instrument(skip(self), fields(id = self.id, view = *self.cur_view), name = "Validating ValidatingLeader Task", level = "error")]
pub async fn run_view(self) -> QuorumCertificate<TYPES, ValidatingLeaf<TYPES>> {
let pk = self.api.public_key();
info!("Validating leader task started!");

let task_start_time = Instant::now();
let parent_view_number = &self.high_qc.view_number();
let consensus = self.consensus.read().await;
let mut reached_decided = false;

let Some(parent_view) = consensus.state_map.get(parent_view_number) else {
warn!("Couldn't find high QC parent in state map.");
return self.high_qc;
};
let Some(leaf) = parent_view.get_leaf_commitment() else {
warn!(
?parent_view_number,
?parent_view,
"Parent of high QC points to a view without a proposal"
);
return self.high_qc;
};
let Some(leaf) = consensus.saved_leaves.get(&leaf) else {
warn!("Failed to find high QC parent.");
return self.high_qc;
};
if leaf.view_number == consensus.last_decided_view {
reached_decided = true;
}
let parent_leaf = leaf.clone();

let original_parent_hash = parent_leaf.commit();
let starting_state = &parent_leaf.state;

let mut previous_used_txns = parent_leaf.deltas.contained_transactions();

let mut next_parent_hash = original_parent_hash;

if !reached_decided {
while let Some(next_parent_leaf) = consensus.saved_leaves.get(&next_parent_hash) {
if next_parent_leaf.view_number <= consensus.last_decided_view {
break;
}
let next_parent_txns = next_parent_leaf.deltas.contained_transactions();
for next_parent_txn in next_parent_txns {
previous_used_txns.insert(next_parent_txn);
}
next_parent_hash = next_parent_leaf.parent_commitment;
}
// TODO do some sort of sanity check on the view number that it matches decided
}

let passed_time = task_start_time - Instant::now();
async_sleep(self.api.propose_min_round_time() - passed_time).await;

let receiver = self.transactions.subscribe().await;
let mut block = <TYPES as NodeType>::StateType::next_block(Some(starting_state.clone()));

// Wait until we have min_transactions for the block or we hit propose_max_round_time
while task_start_time.elapsed() < self.api.propose_max_round_time() {
let txns = self.transactions.cloned().await;
let unclaimed_txns: Vec<_> = txns
.iter()
.filter(|(txn_hash, _txn)| !previous_used_txns.contains(txn_hash))
.collect();

let time_past = task_start_time.elapsed();
if unclaimed_txns.len() < self.api.min_transactions()
&& (time_past < self.api.propose_max_round_time())
{
let duration = self.api.propose_max_round_time() - time_past;
let result = async_timeout(duration, receiver.recv()).await;
match result {
Err(_) => {
// Fall through below to updating new block
info!("propose_max_round_time passed, sending transactions we have so far");
}
Ok(Err(e)) => {
// Something unprecedented is wrong, and `transactions` has been dropped
error!("Channel receiver error for SubscribableRwLock {:?}", e);
return self.high_qc;
}
Ok(Ok(_)) => continue,
}
}

// Add unclaimed transactions to the new block
for (_txn_hash, txn) in &unclaimed_txns {
let new_block_check = block.add_transaction_raw(txn);
if let Ok(new_block) = new_block_check {
if starting_state.validate_block(&new_block, &self.cur_view) {
block = new_block;
continue;
}
}
}
break;
}

consensus
.metrics
.proposal_wait_duration
.add_point(task_start_time.elapsed().as_secs_f64());

let proposal_build_start = Instant::now();

if let Ok(new_state) = starting_state.append(&block, &self.cur_view) {
let leaf = ValidatingLeaf {
view_number: self.cur_view,
height: parent_leaf.height + 1,
justify_qc: self.high_qc.clone(),
parent_commitment: original_parent_hash,
deltas: block,
state: new_state,
rejected: Vec::new(),
timestamp: time::OffsetDateTime::now_utc().unix_timestamp_nanos(),
proposer_id: pk.to_bytes(),
};
let signature = self
.exchange
.sign_validating_or_commitment_proposal::<I>(&leaf.commit());
let data: ValidatingProposal<TYPES, ValidatingLeaf<TYPES>> = leaf.into();
let message =
ValidatingMessage::<TYPES, I>(GeneralConsensusMessage::Proposal(Proposal {
data,
signature,
}));
consensus
.metrics
.proposal_build_duration
.add_point(proposal_build_start.elapsed().as_secs_f64());
info!("Sending out proposal {:?}", message);

if let Err(e) = self
.api
.send_broadcast_message::<QuorumProposalType<TYPES, I>, QuorumVoteType<TYPES, I>>(
message.clone(),
)
.await
{
consensus.metrics.failed_to_send_messages.add(1);
warn!(?message, ?e, "Could not broadcast leader proposal");
} else {
consensus.metrics.outgoing_broadcast_messages.add(1);
}
} else {
error!("Could not append state in high qc for proposal. Failed to send out proposal.");
}

self.high_qc.clone()
}
}
8 changes: 1 addition & 7 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
#![allow(clippy::module_name_repetitions)]

mod da_member;
mod leader;
mod next_leader;
mod replica;
mod sequencing_leader;
mod sequencing_replica;
pub mod traits;
Expand All @@ -23,13 +20,10 @@ pub mod utils;
use async_compatibility_layer::async_primitives::subscribable_rwlock::SubscribableRwLock;
pub use da_member::DAMember;
pub use hotshot_types::traits::node_implementation::ViewQueue;
pub use leader::ValidatingLeader;
pub use next_leader::NextValidatingLeader;
pub use replica::Replica;
pub use sequencing_leader::{ConsensusLeader, ConsensusNextLeader, DALeader};
pub use sequencing_replica::SequencingReplica;
use std::collections::HashSet;
pub use traits::{ConsensusSharedApi, SequencingConsensusApi, ValidatingConsensusApi};
pub use traits::{ConsensusSharedApi, SequencingConsensusApi};
pub use utils::{View, ViewInner};

use commit::{Commitment, Committable};
Expand Down
Loading

0 comments on commit 398c863

Please sign in to comment.