Skip to content

Commit

Permalink
Clippy and fmt fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
elliedavidson committed Aug 7, 2023
1 parent 5491217 commit 065fbe6
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 92 deletions.
91 changes: 23 additions & 68 deletions src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use async_compatibility_layer::{
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::{boxed_sync, BoxSyncFuture};
use hotshot_task_impls::da;
use hotshot_types::message;
use hotshot_types::message::{Message, MessagePurpose};
use hotshot_types::traits::network::ConsensusIntentEvent;
use hotshot_types::traits::node_implementation::NodeImplementation;
Expand Down Expand Up @@ -55,7 +53,7 @@ use std::{
time::Duration,
};
use surf_disco::error::ClientError;
use tracing::{error, warn, debug};
use tracing::{debug, error, warn};
/// Represents the communication channel abstraction for the web server
#[derive(Clone, Debug)]
pub struct WebCommChannel<
Expand Down Expand Up @@ -149,6 +147,7 @@ struct Inner<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, T
/// Whether we are connecting to a DA server
is_da: bool,

/// The last tx_index we saw from the web server
tx_index: Arc<RwLock<u64>>,

// TODO ED This should be TYPES::Time
Expand All @@ -163,12 +162,14 @@ struct Inner<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, T
view_sync_cert_task_map: Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent>>>>,
/// Task map for view sync votes.
view_sync_vote_task_map: Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent>>>>,
/// Task map for transactions
txn_task_map: Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent>>>>,
}

impl<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, TYPES: NodeType>
Inner<M, KEY, ELECTIONCONFIG, TYPES>
{
#![allow(clippy::too_many_lines)]
/// Pull a web server.
async fn poll_web_server(
&self,
Expand Down Expand Up @@ -228,7 +229,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, TYPES: No
Ok(Some(deserialized_messages)) => {
match message_purpose {
MessagePurpose::Data => {
panic!();
error!("We should not receive transactions in this function");
}
MessagePurpose::Proposal => {
// warn!(
Expand Down Expand Up @@ -309,7 +310,9 @@ impl<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, TYPES: No
}
}

_ => todo!(),
MessagePurpose::Internal => {
error!("Received internal message in web server network")
}
}
}
Ok(None) => {
Expand All @@ -331,11 +334,8 @@ impl<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, TYPES: No
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
if view_number != event_view {
panic!("Wrong event view number was sent to this task!");
} else {
// Shutdown this task
error!("Shutting down polling task for view {}", event_view);
if view_number == event_view {
warn!("Shutting down polling task for view {}", event_view);
return Ok(());
}
}
Expand All @@ -345,11 +345,7 @@ impl<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, TYPES: No
let mut lock = self.tx_index.write().await;
*lock = tx_index;

warn!("Lock is {:?}", lock);
if view_number != event_view {
panic!("Wrong event view number was sent to this task!");
} else {
// Shutdown this task
if view_number == event_view {
warn!("Shutting down polling task for view {}", event_view);
return Ok(());
}
Expand All @@ -359,12 +355,15 @@ impl<M: NetworkMsg, KEY: SignatureKey, ELECTIONCONFIG: ElectionConfig, TYPES: No
}
}
// Nothing on receiving channel
Err(_) => {}
Err(_) => {
debug!("Nothing on receiving channel");
}
}
}
Err(NetworkError::ShutDown)
}

/// Fetches transactions from web server
async fn get_txs_from_web_server(
&self,
endpoint: String,
Expand Down Expand Up @@ -1025,52 +1024,10 @@ impl<
}
ConsensusIntentEvent::PollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
if !task_map.contains_key(&view_number) {
if let std::collections::hash_map::Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
task_map.insert(view_number, sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::Data, view_number)
.await
{
error!(
"Background receive transaction polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!")
}

// TODO ED Do we need to GC before returning? Or will view sync task handle that?
}
ConsensusIntentEvent::CancelPollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;

if let Some((view, sender)) = task_map.remove_entry(&(view_number)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForTransactions(
view_number,
))
.await;
} else {
error!("Task map entry should have existed");
}
}
ConsensusIntentEvent::PollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;
if !task_map.contains_key(&view_number) {
// create new task
let (sender, receiver) = unbounded();
task_map.insert(view_number, sender);
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
Expand All @@ -1079,29 +1036,27 @@ impl<
.await
{
error!(
"Background receive transaction polling encountered an error: {:?}",
e
);
"Background receive transaction polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!")
error!("Somehow task already existed!");
}

// TODO ED Do we need to GC before returning? Or will view sync task handle that?
}
ConsensusIntentEvent::CancelPollForTransactions(view_number) => {
let mut task_map = self.inner.txn_task_map.write().await;

if let Some((view, sender)) = task_map.remove_entry(&(view_number)) {
if let Some((_view, sender)) = task_map.remove_entry(&(view_number)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForTransactions(
view_number,
))
.send(ConsensusIntentEvent::CancelPollForTransactions(view_number))
.await;
} else {
error!("Task map entry should have existed");
Expand Down
20 changes: 10 additions & 10 deletions task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,18 +435,18 @@ where
warn!("Polling for DA proposals for view {}", *self.cur_view + 1);
self.committee_exchange
.network()
.inject_consensus_info(
(ConsensusIntentEvent::PollForProposal(*self.cur_view + 1)),
)
.inject_consensus_info(ConsensusIntentEvent::PollForProposal(
*self.cur_view + 1,
))
.await;
}
if self.committee_exchange.is_leader(self.cur_view + 3) {
warn!("Polling for transactions for view {}", *self.cur_view + 3);
self.committee_exchange
.network()
.inject_consensus_info(
(ConsensusIntentEvent::PollForTransactions(*self.cur_view + 3)),
)
.inject_consensus_info(ConsensusIntentEvent::PollForTransactions(
*self.cur_view + 3,
))
.await;
}

Expand Down Expand Up @@ -501,9 +501,9 @@ where

self.committee_exchange
.network()
.inject_consensus_info(
(ConsensusIntentEvent::CancelPollForTransactions(*self.cur_view + 1)),
)
.inject_consensus_info(ConsensusIntentEvent::CancelPollForTransactions(
*self.cur_view + 1,
))
.await;

for txn in txns {
Expand Down Expand Up @@ -572,7 +572,7 @@ where
let task_start_time = Instant::now();

// let parent_leaf = self.parent_leaf().await?;
let mut previous_used_txns = match parent_leaf.deltas {
let previous_used_txns = match parent_leaf.deltas {
Either::Left(block) => block.contained_transactions(),
Either::Right(_commitment) => HashSet::new(),
};
Expand Down
10 changes: 4 additions & 6 deletions types/src/traits/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
use async_std::future::TimeoutError;
use hotshot_task::BoxSyncFuture;
use libp2p_networking::network::NetworkNodeHandleError;
use nll::nll_todo;
#[cfg(feature = "tokio-executor")]
use tokio::time::error::Elapsed as TimeoutError;
#[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))]
std::compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-executor\" must be enabled for this crate."}
use super::{election::Membership, node_implementation::NodeType, signature_key::SignatureKey};
use crate::{data::ProposalType, message::MessagePurpose, vote::VoteType};
use async_trait::async_trait;
use nll::nll_todo::nll_todo;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use std::{collections::BTreeSet, fmt::Debug, sync::Arc, time::Duration};
Expand Down Expand Up @@ -155,7 +153,7 @@ pub enum ConsensusIntentEvent {
CancelPollForDAC(u64),
/// Cancel polling for view sync certificate.
CancelPollForViewSyncCertificate(u64),

/// Cancel polling for transactions
CancelPollForTransactions(u64),
}

Expand All @@ -173,9 +171,9 @@ impl ConsensusIntentEvent {
| ConsensusIntentEvent::CancelPollForProposal(view_number)
| ConsensusIntentEvent::CancelPollForDAC(view_number)
| ConsensusIntentEvent::CancelPollForViewSyncCertificate(view_number)
| ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => *view_number,
ConsensusIntentEvent::PollForTransactions(v) => *v,
ConsensusIntentEvent::CancelPollForTransactions(v) => *v,
| ConsensusIntentEvent::PollForViewSyncCertificate(view_number)
| ConsensusIntentEvent::PollForTransactions(view_number)
| ConsensusIntentEvent::CancelPollForTransactions(view_number) => *view_number,
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions web_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub trait WebServerDataSource<KEY> {
index: u64,
) -> Result<Option<Vec<Vec<u8>>>, Error>;

#[allow(clippy::type_complexity)]
fn get_transactions(&self, index: u64) -> Result<Option<(u64, Vec<Vec<u8>>)>, Error>;
fn get_da_certificate(&self, index: u64) -> Result<Option<Vec<Vec<u8>>>, Error>;
fn post_vote(&mut self, view_number: u64, vote: Vec<u8>) -> Result<(), Error>;
Expand Down Expand Up @@ -223,24 +224,23 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {
}
}

#[allow(clippy::type_complexity)]
/// Return the transaction at the specified index (which will help with Nginx caching, but reduce performance otherwise)
/// In the future we will return batches of transactions
fn get_transactions(&self, index: u64) -> Result<Option<(u64, Vec<Vec<u8>>)>, Error> {
let mut txns_to_return = vec![];
let mut txn_vec_size = 0;

let mut new_index = index as usize;
let lowest_in_memory_txs = if self.num_txns < MAX_TXNS.try_into().unwrap() {
0
} else {
(self.num_txns as usize - MAX_TXNS)
self.num_txns as usize - MAX_TXNS
};

if (index as usize) < lowest_in_memory_txs {
new_index = lowest_in_memory_txs;
let new_index = if (index as usize) < lowest_in_memory_txs {
lowest_in_memory_txs
} else {
new_index = index as usize;
}
index as usize
};

for idx in new_index..=self.num_txns.try_into().unwrap() {
if let Some(txn) = self.transactions.get(&(idx as u64)) {
Expand All @@ -253,7 +253,7 @@ impl<KEY: SignatureKey> WebServerDataSource<KEY> for WebServerState<KEY> {

if !txns_to_return.is_empty() {
error!("Returning this many txs {}", txns_to_return.len());
Ok(Some((index.try_into().unwrap(), txns_to_return)))
Ok(Some((index, txns_to_return)))
} else {
Err(ServerError {
// TODO ED: Why does NoContent status code cause errors?
Expand Down

0 comments on commit 065fbe6

Please sign in to comment.