diff --git a/src/traits/networking/web_server_network.rs b/src/traits/networking/web_server_network.rs index 4e34831e05..245d3bdcc2 100644 --- a/src/traits/networking/web_server_network.rs +++ b/src/traits/networking/web_server_network.rs @@ -19,8 +19,6 @@ use async_compatibility_layer::{ use async_lock::RwLock; use async_trait::async_trait; use hotshot_task::{boxed_sync, BoxSyncFuture}; -use hotshot_task_impls::da; -use hotshot_types::message; use hotshot_types::message::{Message, MessagePurpose}; use hotshot_types::traits::network::ConsensusIntentEvent; use hotshot_types::traits::node_implementation::NodeImplementation; @@ -55,7 +53,7 @@ use std::{ time::Duration, }; use surf_disco::error::ClientError; -use tracing::{error, warn, debug}; +use tracing::{debug, error, warn}; /// Represents the communication channel abstraction for the web server #[derive(Clone, Debug)] pub struct WebCommChannel< @@ -149,6 +147,7 @@ struct Inner>, // TODO ED This should be TYPES::Time @@ -163,12 +162,14 @@ struct Inner>>>, /// Task map for view sync votes. view_sync_vote_task_map: Arc>>>, + /// Task map for transactions txn_task_map: Arc>>>, } impl Inner { + #![allow(clippy::too_many_lines)] /// Pull a web server. async fn poll_web_server( &self, @@ -228,7 +229,7 @@ impl { match message_purpose { MessagePurpose::Data => { - panic!(); + error!("We should not receive transactions in this function"); } MessagePurpose::Proposal => { // warn!( @@ -309,7 +310,9 @@ impl todo!(), + MessagePurpose::Internal => { + error!("Received internal message in web server network") + } } } Ok(None) => { @@ -331,11 +334,8 @@ impl { - if view_number != event_view { - panic!("Wrong event view number was sent to this task!"); - } else { - // Shutdown this task - error!("Shutting down polling task for view {}", event_view); + if view_number == event_view { + warn!("Shutting down polling task for view {}", event_view); return Ok(()); } } @@ -345,11 +345,7 @@ impl {} + Err(_) => { + debug!("Nothing on receiving channel"); + } } } Err(NetworkError::ShutDown) } + /// Fetches transactions from web server async fn get_txs_from_web_server( &self, endpoint: String, @@ -1025,52 +1024,10 @@ impl< } ConsensusIntentEvent::PollForTransactions(view_number) => { let mut task_map = self.inner.txn_task_map.write().await; - if !task_map.contains_key(&view_number) { + if let std::collections::hash_map::Entry::Vacant(e) = task_map.entry(view_number) { // create new task let (sender, receiver) = unbounded(); - task_map.insert(view_number, sender); - async_spawn({ - let inner_clone = self.inner.clone(); - async move { - if let Err(e) = inner_clone - .poll_web_server(receiver, MessagePurpose::Data, view_number) - .await - { - error!( - "Background receive transaction polling encountered an error: {:?}", - e - ); - } - } - }); - } else { - error!("Somehow task already existed!") - } - - // TODO ED Do we need to GC before returning? Or will view sync task handle that? - } - ConsensusIntentEvent::CancelPollForTransactions(view_number) => { - let mut task_map = self.inner.txn_task_map.write().await; - - if let Some((view, sender)) = task_map.remove_entry(&(view_number)) { - // Send task cancel message to old task - - // If task already exited we expect an error - let _res = sender - .send(ConsensusIntentEvent::CancelPollForTransactions( - view_number, - )) - .await; - } else { - error!("Task map entry should have existed"); - } - } - ConsensusIntentEvent::PollForTransactions(view_number) => { - let mut task_map = self.inner.txn_task_map.write().await; - if !task_map.contains_key(&view_number) { - // create new task - let (sender, receiver) = unbounded(); - task_map.insert(view_number, sender); + e.insert(sender); async_spawn({ let inner_clone = self.inner.clone(); async move { @@ -1079,14 +1036,14 @@ impl< .await { error!( - "Background receive transaction polling encountered an error: {:?}", - e - ); + "Background receive transaction polling encountered an error: {:?}", + e + ); } } }); } else { - error!("Somehow task already existed!") + error!("Somehow task already existed!"); } // TODO ED Do we need to GC before returning? Or will view sync task handle that? @@ -1094,14 +1051,12 @@ impl< ConsensusIntentEvent::CancelPollForTransactions(view_number) => { let mut task_map = self.inner.txn_task_map.write().await; - if let Some((view, sender)) = task_map.remove_entry(&(view_number)) { + if let Some((_view, sender)) = task_map.remove_entry(&(view_number)) { // Send task cancel message to old task // If task already exited we expect an error let _res = sender - .send(ConsensusIntentEvent::CancelPollForTransactions( - view_number, - )) + .send(ConsensusIntentEvent::CancelPollForTransactions(view_number)) .await; } else { error!("Task map entry should have existed"); diff --git a/task-impls/src/da.rs b/task-impls/src/da.rs index 6eda23f97d..9c89211403 100644 --- a/task-impls/src/da.rs +++ b/task-impls/src/da.rs @@ -435,18 +435,18 @@ where warn!("Polling for DA proposals for view {}", *self.cur_view + 1); self.committee_exchange .network() - .inject_consensus_info( - (ConsensusIntentEvent::PollForProposal(*self.cur_view + 1)), - ) + .inject_consensus_info(ConsensusIntentEvent::PollForProposal( + *self.cur_view + 1, + )) .await; } if self.committee_exchange.is_leader(self.cur_view + 3) { warn!("Polling for transactions for view {}", *self.cur_view + 3); self.committee_exchange .network() - .inject_consensus_info( - (ConsensusIntentEvent::PollForTransactions(*self.cur_view + 3)), - ) + .inject_consensus_info(ConsensusIntentEvent::PollForTransactions( + *self.cur_view + 3, + )) .await; } @@ -501,9 +501,9 @@ where self.committee_exchange .network() - .inject_consensus_info( - (ConsensusIntentEvent::CancelPollForTransactions(*self.cur_view + 1)), - ) + .inject_consensus_info(ConsensusIntentEvent::CancelPollForTransactions( + *self.cur_view + 1, + )) .await; for txn in txns { @@ -572,7 +572,7 @@ where let task_start_time = Instant::now(); // let parent_leaf = self.parent_leaf().await?; - let mut previous_used_txns = match parent_leaf.deltas { + let previous_used_txns = match parent_leaf.deltas { Either::Left(block) => block.contained_transactions(), Either::Right(_commitment) => HashSet::new(), }; diff --git a/types/src/traits/network.rs b/types/src/traits/network.rs index cc1fbe4687..2a1b17a643 100644 --- a/types/src/traits/network.rs +++ b/types/src/traits/network.rs @@ -6,7 +6,6 @@ use async_std::future::TimeoutError; use hotshot_task::BoxSyncFuture; use libp2p_networking::network::NetworkNodeHandleError; -use nll::nll_todo; #[cfg(feature = "tokio-executor")] use tokio::time::error::Elapsed as TimeoutError; #[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))] @@ -14,7 +13,6 @@ std::compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-e use super::{election::Membership, node_implementation::NodeType, signature_key::SignatureKey}; use crate::{data::ProposalType, message::MessagePurpose, vote::VoteType}; use async_trait::async_trait; -use nll::nll_todo::nll_todo; use serde::{Deserialize, Serialize}; use snafu::Snafu; use std::{collections::BTreeSet, fmt::Debug, sync::Arc, time::Duration}; @@ -155,7 +153,7 @@ pub enum ConsensusIntentEvent { CancelPollForDAC(u64), /// Cancel polling for view sync certificate. CancelPollForViewSyncCertificate(u64), - + /// Cancel polling for transactions CancelPollForTransactions(u64), } @@ -173,9 +171,9 @@ impl ConsensusIntentEvent { | ConsensusIntentEvent::CancelPollForProposal(view_number) | ConsensusIntentEvent::CancelPollForDAC(view_number) | ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number) - | ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => *view_number, - ConsensusIntentEvent::PollForTransactions(v) => *v, - ConsensusIntentEvent::CancelPollForTransactions(v) => *v, + | ConsensusIntentEvent::PollForViewSyncCertificate(view_number) + | ConsensusIntentEvent::PollForTransactions(view_number) + | ConsensusIntentEvent::CancelPollForTransactions(view_number) => *view_number, } } } diff --git a/web_server/src/lib.rs b/web_server/src/lib.rs index 63698c552a..14dbec80dc 100644 --- a/web_server/src/lib.rs +++ b/web_server/src/lib.rs @@ -127,6 +127,7 @@ pub trait WebServerDataSource { index: u64, ) -> Result>>, Error>; + #[allow(clippy::type_complexity)] fn get_transactions(&self, index: u64) -> Result>)>, Error>; fn get_da_certificate(&self, index: u64) -> Result>>, Error>; fn post_vote(&mut self, view_number: u64, vote: Vec) -> Result<(), Error>; @@ -223,24 +224,23 @@ impl WebServerDataSource for WebServerState { } } + #[allow(clippy::type_complexity)] /// Return the transaction at the specified index (which will help with Nginx caching, but reduce performance otherwise) /// In the future we will return batches of transactions fn get_transactions(&self, index: u64) -> Result>)>, Error> { let mut txns_to_return = vec![]; - let mut txn_vec_size = 0; - let mut new_index = index as usize; let lowest_in_memory_txs = if self.num_txns < MAX_TXNS.try_into().unwrap() { 0 } else { - (self.num_txns as usize - MAX_TXNS) + self.num_txns as usize - MAX_TXNS }; - if (index as usize) < lowest_in_memory_txs { - new_index = lowest_in_memory_txs; + let new_index = if (index as usize) < lowest_in_memory_txs { + lowest_in_memory_txs } else { - new_index = index as usize; - } + index as usize + }; for idx in new_index..=self.num_txns.try_into().unwrap() { if let Some(txn) = self.transactions.get(&(idx as u64)) { @@ -253,7 +253,7 @@ impl WebServerDataSource for WebServerState { if !txns_to_return.is_empty() { error!("Returning this many txs {}", txns_to_return.len()); - Ok(Some((index.try_into().unwrap(), txns_to_return))) + Ok(Some((index, txns_to_return))) } else { Err(ServerError { // TODO ED: Why does NoContent status code cause errors?