Skip to content

Commit

Permalink
Merge branch 'main' into ss/anytrace
Browse files Browse the repository at this point in the history
  • Loading branch information
ss-es authored Oct 23, 2024
2 parents 4808210 + 53e28ea commit c6a8982
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 53 deletions.
8 changes: 6 additions & 2 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,19 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
let anchored_leaf = initializer.inner;
let instance_state = initializer.instance_state;

let (internal_tx, internal_rx) = internal_channel;
let (internal_tx, mut internal_rx) = internal_channel;
let (mut external_tx, mut external_rx) = external_channel;

let upgrade_lock =
UpgradeLock::<TYPES, V>::from_certificate(&initializer.decided_upgrade_certificate);

// Allow overflow on the channel, otherwise sending to it may block.
// Allow overflow on the external channel, otherwise sending to it may block.
external_rx.set_overflow(true);

// Allow overflow on the internal channel as well. We don't want to block consensus if we
// have a slow receiver
internal_rx.set_overflow(true);

// Get the validated state from the initializer or construct an incomplete one from the
// block header.
let validated_state = match initializer.validated_state {
Expand Down
28 changes: 26 additions & 2 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

/// Provides trait to create task states from a `SystemContextHandle`
pub mod task_state;
use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, sync::Arc, time::Duration};

use async_broadcast::{broadcast, RecvError};
use async_compatibility_layer::art::async_sleep;
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
use async_trait::async_trait;
Expand Down Expand Up @@ -94,6 +95,29 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
));
}

/// Add a task which updates our queue lenght metric at a set interval
pub fn add_queue_len_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
handle: &mut SystemContextHandle<TYPES, I, V>,
) {
let consensus = handle.hotshot.consensus();
let rx = handle.internal_event_stream.1.clone();
let shutdown_signal = create_shutdown_event_monitor(handle).fuse();
let task_handle = async_spawn(async move {
futures::pin_mut!(shutdown_signal);
loop {
futures::select! {
() = shutdown_signal => {
return;
},
() = async_sleep(Duration::from_millis(500)).fuse() => {
consensus.read().await.metrics.internal_event_queue_len.set(rx.len());
}
}
}
});
handle.network_registry.register(task_handle);
}

/// Add the network task to handle messages and publish events.
pub fn add_network_message_task<
TYPES: NodeType,
Expand Down Expand Up @@ -228,7 +252,7 @@ pub async fn add_consensus_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>,
handle.add_task(QuorumProposalRecvTaskState::<TYPES, I, V>::create_from(handle).await);
handle.add_task(ConsensusTaskState::<TYPES, I, V>::create_from(handle).await);
}

add_queue_len_task(handle);
#[cfg(feature = "rewind")]
handle.add_task(RewindTaskState::<TYPES>::create_from(&handle).await);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(consensus),
instance_state: handle.hotshot.instance_state(),
latest_voted_view: handle.cur_view().await,
vote_dependencies: HashMap::new(),
vote_dependencies: BTreeMap::new(),
network: Arc::clone(&handle.hotshot.network),
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
da_membership: handle.hotshot.memberships.da_membership.clone().into(),
Expand Down
12 changes: 2 additions & 10 deletions crates/libp2p-networking/src/network/behaviours/dht/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,8 @@ impl<R: RecordStore, K: SignatureKey> RecordStore for ValidatedStore<R, K>
where
K: 'static,
{
type ProvidedIter<'a>
= R::ProvidedIter<'a>
where
R: 'a,
K: 'a;
type RecordsIter<'a>
= R::RecordsIter<'a>
where
R: 'a,
K: 'a;
type ProvidedIter<'a> = R::ProvidedIter<'a> where R: 'a, K: 'a;
type RecordsIter<'a> = R::RecordsIter<'a> where R: 'a, K: 'a;

// Delegate all `RecordStore` methods except `put` to the inner store
delegate! {
Expand Down
14 changes: 14 additions & 0 deletions crates/libp2p-networking/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
.mesh_n_low(config.gossip_config.mesh_n_low) // Lower limit of mesh peers
.mesh_outbound_min(config.gossip_config.mesh_outbound_min) // Minimum number of outbound peers in mesh
.max_transmit_size(config.gossip_config.max_transmit_size) // Maximum size of a message
.max_ihave_length(config.gossip_config.max_ihave_length) // Maximum number of messages to include in an IHAVE message
.max_ihave_messages(config.gossip_config.max_ihave_messages) // Maximum number of IHAVE messages to accept from a peer within a heartbeat
.published_message_ids_cache_time(
config.gossip_config.published_message_ids_cache_time,
) // Cache duration for published message IDs
.iwant_followup_time(config.gossip_config.iwant_followup_time) // Time to wait for a message requested through IWANT following an IHAVE advertisement
.max_messages_per_rpc(config.gossip_config.max_messages_per_rpc) // The maximum number of messages we will process in a given RPC
.gossip_retransimission(config.gossip_config.gossip_retransmission) // Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
.flood_publish(config.gossip_config.flood_publish) // If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
.duplicate_cache_time(config.gossip_config.duplicate_cache_time) // The time period that messages are stored in the cache
.fanout_ttl(config.gossip_config.fanout_ttl) // Time to live for fanout peers
.heartbeat_initial_delay(config.gossip_config.heartbeat_initial_delay) // Initial delay in each heartbeat
.gossip_factor(config.gossip_config.gossip_factor) // Affects how many peers we will emit gossip to at each heartbeat
.gossip_lazy(config.gossip_config.gossip_lazy) // Minimum number of peers to emit gossip to during a heartbeat
.build()
.map_err(|err| {
NetworkError::ConfigError(format!("error building gossipsub config: {err:?}"))
Expand Down
50 changes: 50 additions & 0 deletions crates/libp2p-networking/src/network/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct NetworkNodeConfig<K: SignatureKey + 'static> {

/// Configuration for Libp2p's Gossipsub
#[derive(Clone, Debug)]
#[allow(missing_docs)]
pub struct GossipConfig {
/// The heartbeat interval
pub heartbeat_interval: Duration,
Expand All @@ -79,6 +80,42 @@ pub struct GossipConfig {

/// The maximum gossip message size
pub max_transmit_size: usize,

/// The maximum number of messages in an IHAVE message
pub max_ihave_length: usize,

/// Maximum number of IHAVE messages to accept from a peer within a heartbeat
pub max_ihave_messages: usize,

/// Cache duration for published message IDs
pub published_message_ids_cache_time: Duration,

/// Time to wait for a message requested through IWANT following an IHAVE advertisement
pub iwant_followup_time: Duration,

/// The maximum number of messages we will process in a given RPC
pub max_messages_per_rpc: Option<usize>,

/// Controls how many times we will allow a peer to request the same message id through IWANT gossip before we start ignoring them.
pub gossip_retransmission: u32,

/// If enabled newly created messages will always be sent to all peers that are subscribed to the topic and have a good enough score.
pub flood_publish: bool,

/// The time period that messages are stored in the cache
pub duplicate_cache_time: Duration,

/// Time to live for fanout peers
pub fanout_ttl: Duration,

/// Initial delay in each heartbeat
pub heartbeat_initial_delay: Duration,

/// Affects how many peers we will emit gossip to at each heartbeat
pub gossip_factor: f64,

/// Minimum number of peers to emit gossip to during a heartbeat
pub gossip_lazy: usize,
}

impl Default for GossipConfig {
Expand All @@ -97,6 +134,19 @@ impl Default for GossipConfig {
mesh_n_low: 6, // The minimum number of peers in the mesh
mesh_outbound_min: 2, // The minimum number of mesh peers that must be outbound

max_ihave_length: 5000,
max_ihave_messages: 10,
published_message_ids_cache_time: Duration::from_secs(60 * 20), // 20 minutes
iwant_followup_time: Duration::from_secs(3),
max_messages_per_rpc: None,
gossip_retransmission: 3,
flood_publish: true,
duplicate_cache_time: Duration::from_secs(60),
fanout_ttl: Duration::from_secs(60),
heartbeat_initial_delay: Duration::from_secs(5),
gossip_factor: 0.25,
gossip_lazy: 6,

max_transmit_size: MAX_GOSSIP_MSG_SIZE, // The maximum gossip message size
}
}
Expand Down
19 changes: 1 addition & 18 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,24 +415,7 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
.get(&leaf_commitment)
.context("Failed to find high QC of parent")?;

let reached_decided = leaf.view_number() == consensus_reader.last_decided_view();
let parent_leaf = leaf.clone();
let original_parent_hash = parent_leaf.commit(upgrade_lock).await;
let mut next_parent_hash = original_parent_hash;

// Walk back until we find a decide
if !reached_decided {
debug!("We have not reached decide");
while let Some(next_parent_leaf) = consensus_reader.saved_leaves().get(&next_parent_hash) {
if next_parent_leaf.view_number() <= consensus_reader.last_decided_view() {
break;
}
next_parent_hash = next_parent_leaf.parent_commitment();
}
// TODO do some sort of sanity check on the view number that it matches decided
}

Ok((parent_leaf, Arc::clone(state)))
Ok((leaf.clone(), Arc::clone(state)))
}

/// Validate the state and safety and liveness of a proposal then emit
Expand Down
33 changes: 25 additions & 8 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use std::{collections::HashMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{bail, ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_lock::RwLock;
#[cfg(async_executor_impl = "async-std")]
use async_std::task::JoinHandle;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct VoteDependencyHandle<TYPES: NodeType, I: NodeImplementation<TYPES>, V
/// Event sender.
pub sender: Sender<Arc<HotShotEvent<TYPES>>>,
/// Event receiver.
pub receiver: Receiver<Arc<HotShotEvent<TYPES>>>,
pub receiver: InactiveReceiver<Arc<HotShotEvent<TYPES>>>,
/// Lock for a decided upgrade
pub upgrade_lock: UpgradeLock<TYPES, V>,
/// The node's id
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
None => fetch_proposal(
justify_qc.view_number(),
self.sender.clone(),
self.receiver.clone(),
self.receiver.activate_cloned(),
Arc::clone(&self.quorum_membership),
OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
self.public_key.clone(),
Expand Down Expand Up @@ -259,7 +259,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
{
// Block on receiving the event from the event stream.
EventDependency::new(
self.receiver.clone(),
self.receiver.activate_cloned(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::ValidatedStateUpdated(view_number, _) = event {
Expand Down Expand Up @@ -379,7 +379,7 @@ pub struct QuorumVoteTaskState<TYPES: NodeType, I: NodeImplementation<TYPES>, V:
pub latest_voted_view: TYPES::View,

/// Table for the in-progress dependency tasks.
pub vote_dependencies: HashMap<TYPES::View, JoinHandle<()>>,
pub vote_dependencies: BTreeMap<TYPES::View, JoinHandle<()>>,

/// The underlying network
pub network: Arc<I::Network>,
Expand Down Expand Up @@ -499,7 +499,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
view_number,
epoch_number,
sender: event_sender.clone(),
receiver: event_receiver.clone(),
receiver: event_receiver.clone().deactivate(),
upgrade_lock: self.upgrade_lock.clone(),
id: self.id,
},
Expand Down Expand Up @@ -678,6 +678,23 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
return;
}
}
HotShotEvent::Timeout(view) => {
// cancel old tasks
let current_tasks = self.vote_dependencies.split_off(view);
while let Some((_, task)) = self.vote_dependencies.pop_last() {
cancel_task(task).await;
}
self.vote_dependencies = current_tasks;
}
HotShotEvent::ViewChange(mut view) => {
view = TYPES::View::new(view.saturating_sub(1));
// cancel old tasks
let current_tasks = self.vote_dependencies.split_off(&view);
while let Some((_, task)) = self.vote_dependencies.pop_last() {
cancel_task(task).await;
}
self.vote_dependencies = current_tasks;
}
_ => {}
}
}
Expand All @@ -701,7 +718,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> TaskState
}

async fn cancel_subtasks(&mut self) {
for handle in self.vote_dependencies.drain().map(|(_view, handle)| handle) {
while let Some((_, handle)) = self.vote_dependencies.pop_last() {
#[cfg(async_executor_impl = "async-std")]
handle.cancel().await;
#[cfg(async_executor_impl = "tokio")]
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/tests/tests_1/vote_dependency_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn test_vote_dependency_handle() {
view_number,
epoch_number: EpochNumber::new(1),
sender: event_sender.clone(),
receiver: event_receiver.clone(),
receiver: event_receiver.clone().deactivate(),
upgrade_lock: handle.hotshot.upgrade_lock.clone(),
id: handle.hotshot.id,
};
Expand Down
4 changes: 4 additions & 0 deletions crates/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ pub struct ConsensusMetricsValue {
pub number_of_timeouts_as_leader: Box<dyn Counter>,
/// The number of empty blocks that have been proposed
pub number_of_empty_blocks_proposed: Box<dyn Counter>,
/// Number of events in the hotshot event queue
pub internal_event_queue_len: Box<dyn Gauge>,
}

impl ConsensusMetricsValue {
Expand Down Expand Up @@ -376,6 +378,8 @@ impl ConsensusMetricsValue {
.create_counter(String::from("number_of_timeouts_as_leader"), None),
number_of_empty_blocks_proposed: metrics
.create_counter(String::from("number_of_empty_blocks_proposed"), None),
internal_event_queue_len: metrics
.create_gauge(String::from("internal_event_queue_len"), None),
}
}
}
Expand Down
16 changes: 5 additions & 11 deletions crates/types/src/simple_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{fmt::Debug, hash::Hash, marker::PhantomData};
use anyhow::Result;
use committable::{Commitment, Committable};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use vbs::version::{StaticVersionType, Version};
use vbs::version::Version;

use crate::{
data::Leaf,
Expand Down Expand Up @@ -240,16 +240,10 @@ impl<TYPES: NodeType, DATA: Voteable, V: Versions> Committable
for VersionedVoteData<TYPES, DATA, V>
{
fn commit(&self) -> Commitment<Self> {
if self.version < V::Marketplace::VERSION {
let bytes: [u8; 32] = self.data.commit().into();

Commitment::<Self>::from_raw(bytes)
} else {
committable::RawCommitmentBuilder::new("Vote")
.var_size_bytes(self.data.commit().as_ref())
.u64(*self.view)
.finalize()
}
committable::RawCommitmentBuilder::new("Vote")
.var_size_bytes(self.data.commit().as_ref())
.u64(*self.view)
.finalize()
}
}

Expand Down

0 comments on commit c6a8982

Please sign in to comment.