Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove builder event types #97

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 10 additions & 44 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
build:
runs-on: ubuntu-latest
env:
RUSTFLAGS: "--cfg async_executor_impl=\"async-std\" --cfg async_channel_impl=\"async-std\""
RUST_LOG: info
RUSTFLAGS: '--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std"'
RUST_LOG: info
steps:
- uses: styfle/[email protected].0
- uses: styfle/[email protected].1
name: Cancel Outdated Builds
with:
all_but_latest: true
Expand All @@ -27,59 +27,25 @@ jobs:
- uses: actions/checkout@v4
name: Checkout Repository

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Install capnproto
uses: ./.github/actions/install-capnp

- uses: dtolnay/rust-toolchain@stable

- name: Configure Git
run: |
git config --global url."https://ancient123:${{ secrets.ORG_GITHUB_PAT }}@github.com".insteadOf git://github.com
git config --global url."https://ancient123:${{ secrets.ORG_GITHUB_PAT }}@github.com".insteadOf ssh://[email protected]

- uses: Swatinem/rust-cache@v2
name: Enable Rust Caching

- name: Build
run: |
cargo build --release

- name: Format Check
run: cargo fmt -- --check

# Run Clippy on all targets. The lint workflow doesn't run Clippy on tests, because the tests
# don't compile with all combinations of features.
- uses: actions-rs/clippy-check@v1
name: Clippy
with:
token: ${{ github.token }}
args: --workspace --all-features --all-targets -- -D warnings

# TODO: This should not be the basis of a build failure. Move to a different job
# - name: Audit
# run: cargo audit --ignore RUSTSEC-2023-0018 --ignore RUSTSEC-2023-0052 --ignore RUSTSEC-2023-0065

- name: Build
# Build in release without `testing` feature, this should work without `hotshot_example` config.
run: |
cargo build --workspace --release
- name: Clippy
run: cargo clippy --workspace --all-features --all-targets -- -D warnings

- name: Test
# Build test binary with `testing` feature, which requires `hotshot_example` config
run: |
export RUSTFLAGS="$RUSTFLAGS --cfg hotshot_example"
cargo test --workspace --release --all-features --no-run
cargo test --workspace --release --all-features --verbose -- --test-threads 2
timeout-minutes: 60

- name: Generate Documentation
run: |
cargo doc --no-deps --lib --release
echo '<meta http-equiv="refresh" content="0; url=hotshot_query_service">' > target/doc/index.html

- name: Deploy Documentation
uses: peaceiris/actions-gh-pages@v3
if: ${{ github.ref == 'refs/heads/main' }}
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: ./target/doc
cname: tide-disco.docs.espressosys.com
cargo doc --no-deps --lib --release --all-features
4 changes: 3 additions & 1 deletion src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ where
async move {
tracing::info!("client subscribed to events");
state
.read(|state| async move { Ok(state.subscribe_events().await.map(Ok)) }.boxed())
.read(|state| {
async move { Ok(state.get_event_stream(None).await.map(Ok)) }.boxed()
})
.await
}
.try_flatten_stream()
Expand Down
254 changes: 93 additions & 161 deletions src/events_source.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,22 @@
use async_broadcast::{broadcast, InactiveReceiver, Sender as BroadcastSender};
use async_std::future;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::stream::{self, BoxStream, Stream, StreamExt};
use hotshot_types::{
data::{DaProposal, QuorumProposal},
error::HotShotError,
event::{error_adaptor, Event, EventType},
message::Proposal,
traits::node_implementation::{ConsensusTime, NodeType},
PeerConfig,
};
use serde::{Deserialize, Serialize};
use futures::stream::{BoxStream, Stream, StreamExt};
use hotshot_types::event::EventType;
use hotshot_types::{event::Event, traits::node_implementation::NodeType, PeerConfig};
use std::marker::PhantomData;
use std::sync::Arc;
use tide_disco::method::ReadState;
const RETAINED_EVENTS_COUNT: usize = 4096;

/// A builder event
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub struct BuilderEvent<Types: NodeType> {
/// The view number that this event originates from
pub view_number: Types::Time,

/// The underlying event
pub event: BuilderEventType<Types>,
}

// impl From event to builder event
impl<Types: NodeType> From<Event<Types>> for BuilderEvent<Types> {
fn from(event: Event<Types>) -> Self {
BuilderEvent {
view_number: event.view_number,
event: match event.event {
EventType::Error { error } => BuilderEventType::HotshotError { error },
EventType::Transactions { transactions } => {
BuilderEventType::HotshotTransactions { transactions }
}
EventType::Decide {
leaf_chain,
block_size,
..
} => {
let latest_decide_view_num = leaf_chain[0].leaf.view_number();
BuilderEventType::HotshotDecide {
latest_decide_view_num,
block_size,
}
}
EventType::DaProposal { proposal, sender } => {
BuilderEventType::HotshotDaProposal { proposal, sender }
}
EventType::QuorumProposal { proposal, sender } => {
BuilderEventType::HotshotQuorumProposal { proposal, sender }
}
_ => BuilderEventType::Unknown,
},
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub enum BuilderEventType<Types: NodeType> {
// Information required by the builder to create a membership to get view leader
StartupInfo {
known_node_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_staked_node_count: usize,
},
/// Hotshot error
HotshotError {
/// The underlying error
#[serde(with = "error_adaptor")]
error: Arc<HotShotError<Types>>,
},
/// Hotshot public mempool transactions
HotshotTransactions {
/// The list of hotshot transactions
transactions: Vec<Types::Transaction>,
},
// Decide event with the chain of decided leaves
HotshotDecide {
/// The chain of decided leaves with its corresponding state and VID info.
latest_decide_view_num: Types::Time,
/// Optional information of the number of transactions in the block
block_size: Option<u64>,
},
/// DA proposal was received from the network
HotshotDaProposal {
/// Contents of the proposal
proposal: Proposal<Types, DaProposal<Types>>,
/// Public key of the leader submitting the proposal
sender: Types::SignatureKey,
},
/// Quorum proposal was received from the network
HotshotQuorumProposal {
/// Contents of the proposal
proposal: Proposal<Types, QuorumProposal<Types>>,
/// Public key of the leader submitting the proposal
sender: Types::SignatureKey,
},
Unknown,
}

#[async_trait]
pub trait EventsSource<Types>
where
Types: NodeType,
{
type EventStream: Stream<Item = Arc<BuilderEvent<Types>>> + Unpin + Send + 'static;
async fn get_event_stream(&self) -> Self::EventStream;

async fn subscribe_events(&self) -> BoxStream<'static, Arc<BuilderEvent<Types>>> {
self.get_event_stream().await.boxed()
}
type EventStream: Stream<Item = Arc<Event<Types>>> + Unpin + Send + 'static;
async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream;
}

#[async_trait]
Expand All @@ -126,78 +30,115 @@ where
#[derive(Debug)]
pub struct EventsStreamer<Types: NodeType> {
// required for api subscription
inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<BuilderEvent<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<BuilderEvent<Types>>>,
inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<Event<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<Event<Types>>>,

// required for sending startup info
known_nodes_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_staked_node_count: usize,
}

impl<Types: NodeType> EventsStreamer<Types> {
pub fn known_node_with_stake(&self) -> Vec<PeerConfig<Types::SignatureKey>> {
self.known_nodes_with_stake.clone()
}

pub fn non_staked_node_count(&self) -> usize {
self.non_staked_node_count
}
}

#[async_trait]
impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
async fn handle_event(&mut self, event: Event<Types>) {
let filter = match event {
Event {
event: EventType::DaProposal { .. },
..
} => true,
Event {
event: EventType::QuorumProposal { .. },
..
} => true,
Event {
event: EventType::Transactions { .. },
..
} => true,
Event {
event: EventType::Decide { .. },
..
} => true,
Event { .. } => false,
};
if filter {
let builder_event = Arc::new(BuilderEvent::from(event));
let _status = self.subscriber_send_channel.broadcast(builder_event).await;
if let Err(e) = self.subscriber_send_channel.broadcast(event.into()).await {
tracing::error!("Error broadcasting the event: {:?}", e);
}
}
}

/// Wrapper struct representing a set of event filters.
#[derive(Clone, Debug)]
pub struct EventFilterSet<Types: NodeType>(pub(crate) Vec<EventFilter<Types>>);

/// `From` trait impl to create an `EventFilterSet` from a vector of `EventFilter`s.
impl<Types: NodeType> From<Vec<EventFilter<Types>>> for EventFilterSet<Types> {
fn from(filters: Vec<EventFilter<Types>>) -> Self {
EventFilterSet(filters)
}
}

/// `From` trait impl to create an `EventFilterSet` from a single `EventFilter`.
impl<Types: NodeType> From<EventFilter<Types>> for EventFilterSet<Types> {
fn from(filter: EventFilter<Types>) -> Self {
EventFilterSet(vec![filter])
}
}

impl<Types: NodeType> EventFilterSet<Types> {
/// Determines whether the given hotshot event should be broadcast based on the filters in the set.
///
/// Returns `true` if the event should be broadcast, `false` otherwise.
pub(crate) fn should_broadcast(&self, hotshot_event: &EventType<Types>) -> bool {
let filter = &self.0;

match hotshot_event {
EventType::Error { .. } => filter.contains(&EventFilter::Error),
EventType::Decide { .. } => filter.contains(&EventFilter::Decide),
EventType::ReplicaViewTimeout { .. } => {
filter.contains(&EventFilter::ReplicaViewTimeout)
}
EventType::ViewFinished { .. } => filter.contains(&EventFilter::ViewFinished),
EventType::ViewTimeout { .. } => filter.contains(&EventFilter::ViewTimeout),
EventType::Transactions { .. } => filter.contains(&EventFilter::Transactions),
EventType::DaProposal { .. } => filter.contains(&EventFilter::DaProposal),
EventType::QuorumProposal { .. } => filter.contains(&EventFilter::QuorumProposal),
EventType::UpgradeProposal { .. } => filter.contains(&EventFilter::UpgradeProposal),
_ => false,
}
}
}

/// Possible event filters
/// If the hotshot`EventType` enum is modified, this enum should also be updated
#[derive(Clone, Debug, PartialEq)]
pub enum EventFilter<Types: NodeType> {
Error,
Decide,
ReplicaViewTimeout,
ViewFinished,
ViewTimeout,
Transactions,
DaProposal,
QuorumProposal,
UpgradeProposal,
Pd(PhantomData<Types>),
}

#[async_trait]
impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
type EventStream = BoxStream<'static, Arc<BuilderEvent<Types>>>;

async fn get_event_stream(&self) -> Self::EventStream {
let recv_channel = self.inactive_to_subscribe_clone_recv.activate_cloned();
let startup_event_initialized = false;
let startup_event = self.get_startup_event().clone();
stream::unfold(
(recv_channel, startup_event, startup_event_initialized),
|(mut recv_channel, startup_event, mut startup_event_initialized)| async move {
let event_res = if startup_event_initialized {
recv_channel.recv().await.ok()
} else {
startup_event_initialized = true;
Some(Arc::new(startup_event.clone()))
};
event_res.map(|event| {
(
event,
(recv_channel, startup_event, startup_event_initialized),
)
})
},
)
.boxed()
type EventStream = BoxStream<'static, Arc<Event<Types>>>;

async fn get_event_stream(&self, filter: Option<EventFilterSet<Types>>) -> Self::EventStream {
let receiver = self.inactive_to_subscribe_clone_recv.activate_cloned();

if let Some(filter) = filter {
receiver
.filter(move |event| future::ready(filter.should_broadcast(&event.as_ref().event)))
.boxed()
} else {
receiver.boxed()
}
}
}

impl<Types: NodeType> EventsStreamer<Types> {
pub fn new(
known_nodes_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_staked_node_count: usize,
) -> Self {
let (mut subscriber_send_channel, to_subscribe_clone_recv) =
broadcast::<Arc<BuilderEvent<Types>>>(RETAINED_EVENTS_COUNT);
broadcast::<Arc<Event<Types>>>(RETAINED_EVENTS_COUNT);
// set the overflow to true to drop older messages from the channel
subscriber_send_channel.set_overflow(true);
// set the await active to false to not block the sender
Expand All @@ -210,15 +151,6 @@ impl<Types: NodeType> EventsStreamer<Types> {
non_staked_node_count,
}
}
pub fn get_startup_event(&self) -> BuilderEvent<Types> {
BuilderEvent {
view_number: Types::Time::genesis(),
event: BuilderEventType::StartupInfo {
known_node_with_stake: self.known_nodes_with_stake.clone(),
non_staked_node_count: self.non_staked_node_count,
},
}
}
}

#[async_trait]
Expand Down
Loading
Loading