Skip to content

Commit

Permalink
Merge pull request #40 from EspressoSystems/hg/fix_blocking_send
Browse files Browse the repository at this point in the history
fix blocking send
  • Loading branch information
move47 authored Apr 16, 2024
2 parents 1690a86 + 8a43d9f commit 4bc82b0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 15 deletions.
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
name = "hotshot-events-service"
version = "0.1.3"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-broadcast = "0.7"
async-compatibility-layer = { version = "1.1", default-features = false, features = [
"logging-utils",
] }
async-broadcast = "0.7"
async-std = { version = "1", features = ["attributes"] }
async-lock = "2.8"
async-std = { version = "1", features = ["attributes"] }
async-trait = "0.1"
clap = { version = "4.4", features = ["derive", "env"] }
derive_more = "0.99"
derivative = "2.2"
derive_more = "0.99"
either = { version = "1.10", features = ["serde"] }
futures = "0.3"
hotshot-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.36" }
Expand Down
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tide_disco::api::{Api, ApiError};
use toml::{map::Entry, Value};
use vbs::version::StaticVersionType;

pub(crate) fn load_api<State: 'static, Error : 'static, Ver: StaticVersionType + 'static>(
pub(crate) fn load_api<State: 'static, Error: 'static, Ver: StaticVersionType + 'static>(
path: Option<impl AsRef<Path>>,
default: &str,
extensions: impl IntoIterator<Item = Value>,
Expand Down
15 changes: 9 additions & 6 deletions src/events_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,21 @@ impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {

async fn get_event_stream(&self) -> Self::EventStream {
let recv_channel = self.inactive_to_subscribe_clone_recv.activate_cloned();
let starup_event_initialized = false;
let startup_event_initialized = false;
let startup_event = self.get_startup_event().clone();
stream::unfold(
(recv_channel, startup_event, starup_event_initialized),
|(mut recv_channel, startup_event, mut starup_event_initialized)| async move {
let event_res = if starup_event_initialized {
(recv_channel, startup_event, startup_event_initialized),
|(mut recv_channel, startup_event, mut startup_event_initialized)| async move {
let event_res = if startup_event_initialized {
recv_channel.recv().await.ok()
} else {
starup_event_initialized = true;
startup_event_initialized = true;
Some(Arc::new(startup_event.clone()))
};
event_res.map(|event| {
(
event,
(recv_channel, startup_event, starup_event_initialized),
(recv_channel, startup_event, startup_event_initialized),
)
})
},
Expand All @@ -195,7 +195,10 @@ impl<Types: NodeType> EventsStreamer<Types> {
) -> Self {
let (mut subscriber_send_channel, to_subscribe_clone_recv) =
broadcast::<Arc<BuilderEvent<Types>>>(RETAINED_EVENTS_COUNT);
// set the overflow to true to drop older messages from the channel
subscriber_send_channel.set_overflow(true);
// set the await active to false to not block the sender
subscriber_send_channel.set_await_active(false);
let inactive_to_subscribe_clone_recv = to_subscribe_clone_recv.deactivate();
EventsStreamer {
subscriber_send_channel,
Expand Down
58 changes: 54 additions & 4 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,59 @@ mod tests {
}
}

#[async_std::test]
async fn test_no_active_receiver() {
tracing::info!("Starting test_no_active_receiver");
setup_logging();
setup_backtrace();
let port = portpicker::pick_unused_port().expect("Could not find an open port");
let api_url = Url::parse(format!("http://localhost:{port}").as_str()).unwrap();

let known_nodes_with_stake = vec![];
let non_staked_node_count = 0;
let events_streamer = Arc::new(RwLock::new(EventsStreamer::new(
known_nodes_with_stake,
non_staked_node_count,
)));

// Start the web server.
let mut app = App::<_, Error>::with_state(events_streamer.clone());

let hotshot_events_api =
define_api::<Arc<RwLock<EventsStreamer<TestTypes>>>, TestTypes, Version01>(
&Options::default(),
)
.expect("Failed to define hotshot eventsAPI");

app.register_module("hotshot_events", hotshot_events_api)
.expect("Failed to register hotshot events API");

async_spawn(app.serve(api_url, STATIC_VER_0_1));
let total_count = 5;
let send_handle = async_spawn(async move {
let mut send_count = 0;
loop {
let tx_event = generate_event(send_count);
tracing::debug!("Before writing to events_source");
events_streamer
.write()
.await
.handle_event(tx_event.clone())
.await;
send_count += 1;
tracing::debug!("After writing to events_source");
if send_count >= total_count {
break;
}
}
});

send_handle.await;
}

#[async_std::test]
async fn test_event_stream() {
tracing::info!("Starting hotshot test_event_stream");
tracing::info!("Starting test_event_stream");
setup_logging();
setup_backtrace();

Expand Down Expand Up @@ -107,11 +157,11 @@ mod tests {
tracing::info!("Received event in Client 1: {:?}", event);
receive_count += 1;
if receive_count > total_count {
tracing::info!("Clien1 Received all sent events, exiting loop");
tracing::info!("Client1 Received all sent events, exiting loop");
break;
}
}
// Offest 1 is due to the startup event info
// Offset 1 is due to the startup event info
assert_eq!(receive_count, total_count + 1);
});

Expand All @@ -127,7 +177,7 @@ mod tests {
break;
}
}
// Offest 1 is due to the startup event info
// Offset 1 is due to the startup event info
assert_eq!(receive_count, total_count + 1);
});

Expand Down

0 comments on commit 4bc82b0

Please sign in to comment.