Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clippy to task-impls dir #1575

Merged
merged 12 commits into from
Aug 24, 2023
9 changes: 0 additions & 9 deletions task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
#![warn(
clippy::all,
clippy::pedantic,
missing_docs,
clippy::missing_docs_in_private_items,
clippy::panic
)]
#![allow(clippy::module_name_repetitions)]

use crate::events::SequencingHotShotEvent;
use async_compatibility_layer::art::{async_sleep, async_spawn};
use async_compatibility_layer::async_primitives::subscribable_rwlock::ReadView;
Expand Down
20 changes: 17 additions & 3 deletions task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ use std::time::Instant;
use tracing::{debug, error, instrument, warn};

#[derive(Snafu, Debug)]
/// Error type for consensus tasks
pub struct ConsensusTaskError {}

/// Tracks state of a DA task
pub struct DATaskState<
TYPES: NodeType,
I: NodeImplementation<
Expand All @@ -68,7 +70,9 @@ pub struct DATaskState<
Commitment = TYPES::BlockType,
>,
{
/// The state's api
pub api: A,
/// Global registry task for the state
pub registry: GlobalRegistry,

/// View number this view is executing in.
Expand All @@ -87,9 +91,11 @@ pub struct DATaskState<
/// Global events stream to publish events
pub event_stream: ChannelStream<SequencingHotShotEvent<TYPES, I>>,

/// This state's ID
pub id: u64,
}

/// Struct to maintain DA Vote Collection task state
pub struct DAVoteCollectionTaskState<
TYPES: NodeType,
I: NodeImplementation<TYPES, Leaf = SequencingLeaf<TYPES>>,
Expand All @@ -103,11 +109,15 @@ pub struct DAVoteCollectionTaskState<
{
/// the committee exchange
pub committee_exchange: Arc<CommitteeEx<TYPES, I>>,
/// the vote accumulator
pub accumulator:
Either<VoteAccumulator<TYPES::VoteTokenType, TYPES::BlockType>, DACertificate<TYPES>>,
// TODO ED Make this just "view" since it is only for this task
/// the current view
pub cur_view: ViewNumber,
/// event stream for channel events
pub event_stream: ChannelStream<SequencingHotShotEvent<TYPES, I>>,
/// the id of this task state
pub id: u64,
}

Expand Down Expand Up @@ -216,6 +226,7 @@ where
Commitment = TYPES::BlockType,
>,
{
/// main task event handler
#[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "DA Main Task", level = "error")]

pub async fn handle_event(
Expand Down Expand Up @@ -617,10 +628,10 @@ where
let txns: Vec<TYPES::Transaction> = all_txns
.iter()
.filter_map(|(txn_hash, txn)| {
if !previous_used_txns.contains(txn_hash) {
Some(txn.clone())
} else {
if previous_used_txns.contains(txn_hash) {
None
} else {
Some(txn.clone())
}
})
.collect();
Expand All @@ -641,6 +652,7 @@ where
}
}

/// task state implementation for DA Task
impl<
TYPES: NodeType,
I: NodeImplementation<
Expand All @@ -660,13 +672,15 @@ where
{
}

/// Type alias for DA Vote Collection Types
pub type DAVoteCollectionTypes<TYPES, I> = HSTWithEvent<
ConsensusTaskError,
SequencingHotShotEvent<TYPES, I>,
ChannelStream<SequencingHotShotEvent<TYPES, I>>,
DAVoteCollectionTaskState<TYPES, I>,
>;

/// Type alias for DA Task Types
pub type DATaskTypes<TYPES, I, A> = HSTWithEvent<
ConsensusTaskError,
SequencingHotShotEvent<TYPES, I>,
Expand Down
3 changes: 2 additions & 1 deletion task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use hotshot_types::vote::{DAVote, QuorumVote};

use crate::view_sync::ViewSyncPhase;

/// All of the possible events that can be passed between Sequecning `HotShot` tasks
#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub enum SequencingHotShotEvent<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// Shutdown the task
Expand Down Expand Up @@ -59,6 +60,6 @@ pub enum SequencingHotShotEvent<TYPES: NodeType, I: NodeImplementation<TYPES>> {
TransactionsRecv(Vec<TYPES::Transaction>),
/// Send transactions to the network
TransactionSend(TYPES::Transaction, TYPES::SignatureKey),
// Event to send DA block data from DA leader to next quorum leader (which should always be the same node); internal event only
/// Event to send DA block data from DA leader to next quorum leader (which should always be the same node); internal event only
SendDABlockData(TYPES::BlockType),
}
37 changes: 21 additions & 16 deletions task-impls/src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,33 @@ use snafu::Snafu;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

/// The state for the test harness task. Keeps track of which events and how many we expect to get
pub struct TestHarnessState<TYPES: NodeType, I: NodeImplementation<TYPES>> {
/// The expected events we get from the test. Maps an event to the number of times we expect to see it
expected_output: HashMap<SequencingHotShotEvent<TYPES, I>, usize>,
}

pub struct EventBundle<TYPES: NodeType, I: NodeImplementation<TYPES>>(
Vec<SequencingHotShotEvent<TYPES, I>>,
);

pub enum EventInputOutput<TYPES: NodeType, I: NodeImplementation<TYPES>> {
Input(EventBundle<TYPES, I>),
Output(EventBundle<TYPES, I>),
}

pub struct EventSequence<TYPES: NodeType, I: NodeImplementation<TYPES>>(
Vec<EventInputOutput<TYPES, I>>,
);

impl<TYPES: NodeType, I: NodeImplementation<TYPES>> TS for TestHarnessState<TYPES, I> {}

/// Error emitted if the test harness task fails
#[derive(Snafu, Debug)]
pub struct TestHarnessTaskError {}

/// Type alias for the Test Harness Task
pub type TestHarnessTaskTypes<TYPES, I> = HSTWithEvent<
TestHarnessTaskError,
SequencingHotShotEvent<TYPES, I>,
ChannelStream<SequencingHotShotEvent<TYPES, I>>,
TestHarnessState<TYPES, I>,
>;

/// Runs a test by building the task using `build_fn` and then passing it the `input` events
/// and testing the make sure all of the `expected_output` events are seen
///
/// # Panics
/// Panics if any state the test expects is not set. Panicing causes a test failure
#[allow(clippy::implicit_hasher)]
pub async fn run_harness<TYPES, I, Fut>(
input: Vec<SequencingHotShotEvent<TYPES, I>>,
expected_output: HashMap<SequencingHotShotEvent<TYPES, I>, usize>,
Expand Down Expand Up @@ -83,16 +81,23 @@ pub async fn run_harness<TYPES, I, Fut>(
let _ = runner.await;
}

/// Handles an event for the Test Harness Task. If the event is expected, remove it from
/// the `expected_output` in state. If unexpected fail test.
///
/// # Panics
/// Will panic to fail the test when it receives and unexpected event
#[allow(clippy::needless_pass_by_value)]
pub fn handle_event<TYPES: NodeType, I: NodeImplementation<TYPES>>(
event: SequencingHotShotEvent<TYPES, I>,
mut state: TestHarnessState<TYPES, I>,
) -> (
std::option::Option<HotShotTaskCompleted>,
TestHarnessState<TYPES, I>,
) {
if !state.expected_output.contains_key(&event) {
panic!("Got and unexpected event: {:?}", event);
}
assert!(
state.expected_output.contains_key(&event),
"Got and unexpected event: {event:?}",
);
let num_expected = state.expected_output.get_mut(&event).unwrap();
if *num_expected == 1 {
state.expected_output.remove(&event);
Expand Down
14 changes: 14 additions & 0 deletions task-impls/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//! The consensus layer for hotshot. This currently implements sequencing
//! consensus in an event driven way

#![warn(
clippy::all,
clippy::pedantic,
rust_2018_idioms,
missing_docs,
clippy::missing_docs_in_private_items,
clippy::panic
)]
#![allow(clippy::module_name_repetitions)]

/// the task which implements the main parts of consensus
pub mod consensus;

Expand All @@ -12,5 +25,6 @@ pub mod network;

/// Defines the types to run unit tests for a task.
pub mod harness;

/// The task which implements view synchronization
pub mod view_sync;
24 changes: 22 additions & 2 deletions task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ use snafu::Snafu;
use std::{marker::PhantomData, sync::Arc};
use tracing::error;

/// the type of network task
#[derive(Clone, Copy, Debug)]
pub enum NetworkTaskKind {
/// quorum: the normal "everyone" committee
Quorum,
/// da committee
Committee,
/// view sync
ViewSync,
}

/// the network message task state
pub struct NetworkMessageTaskState<
TYPES: NodeType,
I: NodeImplementation<
Expand All @@ -37,6 +42,7 @@ pub struct NetworkMessageTaskState<
ConsensusMessage = SequencingMessage<TYPES, I>,
>,
> {
/// event stream (used for publishing)
pub event_stream: ChannelStream<SequencingHotShotEvent<TYPES, I>>,
}

Expand Down Expand Up @@ -82,7 +88,7 @@ impl<
GeneralConsensusMessage::ViewSyncCertificate(view_sync_message) => {
SequencingHotShotEvent::ViewSyncCertificateRecv(view_sync_message)
}
_ => {
GeneralConsensusMessage::InternalTrigger(_) => {
error!("Got unexpected message type in network task!");
return;
}
Expand All @@ -108,7 +114,7 @@ impl<
}
MessageKind::Data(message) => match message {
hotshot_types::message::DataMessage::SubmitTransaction(transaction, _) => {
transactions.push(transaction)
transactions.push(transaction);
}
},
MessageKind::_Unreachable(_) => unimplemented!(),
Expand All @@ -122,6 +128,7 @@ impl<
}
}

/// network event task state
pub struct NetworkEventTaskState<
TYPES: NodeType,
I: NodeImplementation<
Expand All @@ -134,9 +141,13 @@ pub struct NetworkEventTaskState<
MEMBERSHIP: Membership<TYPES>,
COMMCHANNEL: CommunicationChannel<TYPES, Message<TYPES, I>, PROPOSAL, VOTE, MEMBERSHIP>,
> {
/// comm channel
pub channel: COMMCHANNEL,
/// event stream
pub event_stream: ChannelStream<SequencingHotShotEvent<TYPES, I>>,
/// view number
pub view: ViewNumber,
/// phantom data
pub phantom: PhantomData<(PROPOSAL, VOTE, MEMBERSHIP)>,
// TODO ED Need to add exchange so we can get the recipient key and our own key?
}
Expand Down Expand Up @@ -172,6 +183,8 @@ impl<
/// Handle the given event.
///
/// Returns the completion status.
/// # Panics
/// Panic sif a direct message event is received with no recipient
pub async fn handle_event(
&mut self,
event: SequencingHotShotEvent<TYPES, I>,
Expand Down Expand Up @@ -277,6 +290,7 @@ impl<
None
}

/// network filter
pub fn filter(task_kind: NetworkTaskKind) -> FilterEvent<SequencingHotShotEvent<TYPES, I>> {
match task_kind {
NetworkTaskKind::Quorum => FilterEvent(Arc::new(Self::quorum_filter)),
Expand All @@ -285,6 +299,7 @@ impl<
}
}

/// quorum filter
fn quorum_filter(event: &SequencingHotShotEvent<TYPES, I>) -> bool {
matches!(
event,
Expand All @@ -296,6 +311,7 @@ impl<
)
}

/// committee filter
fn committee_filter(event: &SequencingHotShotEvent<TYPES, I>) -> bool {
matches!(
event,
Expand All @@ -306,6 +322,7 @@ impl<
)
}

/// view sync filter
fn view_sync_filter(event: &SequencingHotShotEvent<TYPES, I>) -> bool {
matches!(
event,
Expand All @@ -317,9 +334,11 @@ impl<
}
}

/// network error (no errors right now, only stub)
#[derive(Snafu, Debug)]
pub struct NetworkTaskError {}

/// networking message task types
pub type NetworkMessageTaskTypes<TYPES, I> = HSTWithMessage<
NetworkTaskError,
Either<Messages<TYPES, I>, Messages<TYPES, I>>,
Expand All @@ -328,6 +347,7 @@ pub type NetworkMessageTaskTypes<TYPES, I> = HSTWithMessage<
NetworkMessageTaskState<TYPES, I>,
>;

/// network event task types
pub type NetworkEventTaskTypes<TYPES, I, PROPOSAL, VOTE, MEMBERSHIP, COMMCHANNEL> = HSTWithEvent<
NetworkTaskError,
SequencingHotShotEvent<TYPES, I>,
Expand Down
Loading