Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove slow operations from critical path #3788

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub enum HotShotEvent<TYPES: NodeType> {
/// 2. The proposal has been correctly signed by the leader of the current view
/// 3. The justify QC is valid
/// 4. The proposal passes either liveness or safety check.
QuorumProposalValidated(QuorumProposal<TYPES>, Leaf<TYPES>),
QuorumProposalValidated(Proposal<TYPES, QuorumProposal<TYPES>>, Leaf<TYPES>),
/// A quorum proposal is missing for a view that we need.
QuorumProposalRequestSend(
ProposalRequestPayload<TYPES>,
Expand Down Expand Up @@ -267,9 +267,14 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
Some(v.view_number())
}
HotShotEvent::QuorumProposalRecv(proposal, _)
| HotShotEvent::QuorumProposalSend(proposal, _) => Some(proposal.data.view_number()),
| HotShotEvent::QuorumProposalSend(proposal, _)
| HotShotEvent::QuorumProposalValidated(proposal, _)
| HotShotEvent::QuorumProposalResponseSend(_, proposal)
| HotShotEvent::QuorumProposalResponseRecv(proposal)
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteSend(vote) => Some(vote.view_number()),
HotShotEvent::QuorumProposalValidated(proposal, _) => Some(proposal.view_number()),
HotShotEvent::DaProposalRecv(proposal, _)
| HotShotEvent::DaProposalValidated(proposal, _)
| HotShotEvent::DaProposalSend(proposal, _) => Some(proposal.data.view_number()),
Expand Down Expand Up @@ -311,11 +316,6 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
}
HotShotEvent::QuorumProposalRequestSend(req, _)
| HotShotEvent::QuorumProposalRequestRecv(req, _) => Some(req.view_number),
HotShotEvent::QuorumProposalResponseSend(_, proposal)
| HotShotEvent::QuorumProposalResponseRecv(proposal)
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteDependenciesValidated(view_number)
| HotShotEvent::ViewChange(view_number)
| HotShotEvent::ViewSyncTimeout(view_number, _, _)
Expand Down Expand Up @@ -398,7 +398,7 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
HotShotEvent::QuorumProposalValidated(proposal, _) => write!(
f,
"QuorumProposalValidated(view_number={:?})",
proposal.view_number()
proposal.data.view_number()
),
HotShotEvent::DaProposalSend(proposal, _) => write!(
f,
Expand Down
12 changes: 1 addition & 11 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use hotshot_types::{
election::Membership,
node_implementation::{ConsensusTime, NodeImplementation, NodeType, Versions},
signature_key::SignatureKey,
storage::Storage,
BlockPayload, ValidatedState,
},
utils::{Terminator, View, ViewInner},
Expand Down Expand Up @@ -552,15 +551,6 @@ pub async fn validate_proposal_safety_and_liveness<
});
}

// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
task_state
.storage
.write()
.await
.append_proposal(&proposal)
.await?;

// We accept the proposal, notify the application layer
broadcast_event(
Event {
Expand All @@ -577,7 +567,7 @@ pub async fn validate_proposal_safety_and_liveness<
// Notify other tasks
broadcast_event(
Arc::new(HotShotEvent::QuorumProposalValidated(
proposal.data.clone(),
proposal.clone(),
parent_leaf,
)),
&event_stream,
Expand Down
46 changes: 37 additions & 9 deletions crates/task-impls/src/quorum_proposal_recv/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::Arc;

use anyhow::{bail, Context, Result};
use async_broadcast::{broadcast, Receiver, Sender};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLockUpgradableReadGuard;
use committable::Committable;
use hotshot_types::{
Expand All @@ -20,6 +21,7 @@ use hotshot_types::{
traits::{
election::Membership,
node_implementation::{NodeImplementation, NodeType},
signature_key::SignatureKey,
storage::Storage,
ValidatedState,
},
Expand Down Expand Up @@ -104,6 +106,35 @@ async fn validate_proposal_liveness<TYPES: NodeType, I: NodeImplementation<TYPES
Ok(())
}

/// Spawn a task which will fire a request to get a proposal, and store it.
#[allow(clippy::too_many_arguments)]
fn spawn_fetch_proposal<TYPES: NodeType, V: Versions>(
view: TYPES::View,
event_sender: Sender<Arc<HotShotEvent<TYPES>>>,
event_receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
membership: Arc<TYPES::Membership>,
consensus: OuterConsensus<TYPES>,
sender_public_key: TYPES::SignatureKey,
sender_private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: UpgradeLock<TYPES, V>,
) {
async_spawn(async move {
let lock = upgrade_lock;

let _ = fetch_proposal(
view,
event_sender,
event_receiver,
membership,
consensus,
sender_public_key,
sender_private_key,
&lock,
)
.await;
});
Comment on lines +111 to +135
Copy link
Collaborator

@rob-maron rob-maron Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get included with the [task] cancellation logic [on new views] somehow? Do we want it to be?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably ok to not cancel this because it is bounded anyways: it will exact as soon as it either gets the proposal or fails

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}

/// Handles the `QuorumProposalRecv` event by first validating the cert itself for the view, and then
/// updating the states, which runs when the proposal cannot be found in the internal state map.
///
Expand Down Expand Up @@ -155,17 +186,16 @@ pub(crate) async fn handle_quorum_proposal_recv<
.await;

// Get the parent leaf and state.
let mut parent_leaf = task_state
let parent_leaf = task_state
.consensus
.read()
.await
.saved_leaves()
.get(&justify_qc.data.leaf_commit)
.cloned();

parent_leaf = match parent_leaf {
Some(p) => Some(p),
None => fetch_proposal(
if parent_leaf.is_none() {
spawn_fetch_proposal(
justify_qc.view_number(),
event_sender.clone(),
event_receiver.clone(),
Expand All @@ -176,11 +206,9 @@ pub(crate) async fn handle_quorum_proposal_recv<
// incorrectly.
task_state.public_key.clone(),
task_state.private_key.clone(),
&task_state.upgrade_lock,
)
.await
.ok(),
};
task_state.upgrade_lock.clone(),
);
}
let consensus_read = task_state.consensus.read().await;

let parent = match parent_leaf {
Expand Down
21 changes: 15 additions & 6 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
match event.as_ref() {
#[allow(unused_assignments)]
HotShotEvent::QuorumProposalValidated(proposal, parent_leaf) => {
let proposal_payload_comm = proposal.block_header.payload_commitment();
let proposal_payload_comm = proposal.data.block_header.payload_commitment();
if let Some(comm) = payload_commitment {
if proposal_payload_comm != comm {
error!("Quorum proposal has inconsistent payload commitment with DAC or VID.");
Expand All @@ -290,11 +290,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
payload_commitment = Some(proposal_payload_comm);
}
let parent_commitment = parent_leaf.commit(&self.upgrade_lock).await;
let proposed_leaf = Leaf::from_quorum_proposal(proposal);
let proposed_leaf = Leaf::from_quorum_proposal(&proposal.data);
if proposed_leaf.parent_commitment() != parent_commitment {
warn!("Proposed leaf parent commitment does not match parent leaf payload commitment. Aborting vote.");
return;
}
// Update our persistent storage of the proposal. If we cannot store the proposal reutrn
// and error so we don't vote
if let Err(e) = self.storage.write().await.append_proposal(proposal).await {
error!("failed to store proposal, not voting. error = {e:#}");
return;
}
leaf = Some(proposed_leaf);
}
HotShotEvent::DaCertificateValidated(cert) => {
Expand Down Expand Up @@ -419,7 +425,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
let event_view = match dependency_type {
VoteDependency::QuorumProposal => {
if let HotShotEvent::QuorumProposalValidated(proposal, _) = event {
proposal.view_number
proposal.data.view_number
} else {
return false;
}
Expand Down Expand Up @@ -543,17 +549,20 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
let current_epoch = self.consensus.read().await.cur_epoch();
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _leaf) => {
trace!("Received Proposal for view {}", *proposal.view_number());
trace!(
"Received Proposal for view {}",
*proposal.data.view_number()
);

// Handle the event before creating the dependency task.
if let Err(e) =
handle_quorum_proposal_validated(proposal, &event_sender, self).await
handle_quorum_proposal_validated(&proposal.data, &event_sender, self).await
{
debug!("Failed to handle QuorumProposalValidated event; error = {e:#}");
}

self.create_dependency_task_if_new(
proposal.view_number,
proposal.data.view_number,
current_epoch,
event_receiver,
&event_sender,
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TaskState for NetworkRequest
) -> Result<()> {
match event.as_ref() {
HotShotEvent::QuorumProposalValidated(proposal, _) => {
let prop_view = proposal.view_number();
let prop_view = proposal.data.view_number();
let current_epoch = self.state.read().await.cur_epoch();

// If we already have the VID shares for the next view, do nothing.
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/src/byzantine/byzantine_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
];
}
HotShotEvent::QuorumProposalValidated(proposal, _) => {
self.validated_proposals.push(proposal.clone());
self.validated_proposals.push(proposal.data.clone());
}
_ => {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn test_quorum_proposal_recv_task() {
.await,
)),
exact(QuorumProposalValidated(
proposals[1].data.clone(),
proposals[1].clone(),
leaves[0].clone(),
)),
exact(ViewChange(ViewNumber::new(2))),
Expand Down
8 changes: 4 additions & 4 deletions crates/testing/tests/tests_1/quorum_vote_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn test_quorum_vote_task_success() {
// Send the quorum proposal, DAC, VID share data, and validated state, in which case a dummy
// vote can be formed and the view number will be updated.
let inputs = vec![random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[1], vids[1].0[0].clone()),
]];
Expand Down Expand Up @@ -150,11 +150,11 @@ async fn test_quorum_vote_task_miss_dependency() {
// Send two of quorum proposal, DAC, VID share data, in which case there's no vote.
let inputs = vec![
random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
VidShareRecv(leaders[1], vid_share(&vids[1].0, handle.public_key())),
],
random![
QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()),
QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()),
DaCertificateRecv(dacs[2].clone()),
],
random![
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn test_quorum_vote_task_incorrect_dependency() {

// Send the correct quorum proposal and DAC, and incorrect VID share data.
let inputs = vec![random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[0], vids[0].0[0].clone()),
]];
Expand Down
10 changes: 5 additions & 5 deletions crates/testing/tests/tests_1/upgrade_task_with_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,27 @@ async fn test_upgrade_task_with_vote() {

let inputs = vec![
random![
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
DaCertificateRecv(dacs[1].clone()),
VidShareRecv(leaders[1], vids[1].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[2].data.clone(), leaves[1].clone()),
QuorumProposalValidated(proposals[2].clone(), leaves[1].clone()),
DaCertificateRecv(dacs[2].clone()),
VidShareRecv(leaders[2], vids[2].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[3].data.clone(), leaves[2].clone()),
QuorumProposalValidated(proposals[3].clone(), leaves[2].clone()),
DaCertificateRecv(dacs[3].clone()),
VidShareRecv(leaders[3], vids[3].0[0].clone()),
],
random![
QuorumProposalValidated(proposals[4].data.clone(), leaves[3].clone()),
QuorumProposalValidated(proposals[4].clone(), leaves[3].clone()),
DaCertificateRecv(dacs[4].clone()),
VidShareRecv(leaders[4], vids[4].0[0].clone()),
],
random![QuorumProposalValidated(
proposals[5].data.clone(),
proposals[5].clone(),
leaves[5].clone()
),],
];
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/tests_1/vote_dependency_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_vote_dependency_handle() {
// the dependency handles do not (yet) work with the existing test suite.
let all_inputs = vec![
DaCertificateValidated(dacs[1].clone()),
QuorumProposalValidated(proposals[1].data.clone(), leaves[0].clone()),
QuorumProposalValidated(proposals[1].clone(), leaves[0].clone()),
VidShareValidated(vids[1].0[0].clone()),
]
.into_iter()
Expand Down
Loading