Skip to content

Commit

Permalink
fix(l1): Use tokio mpsc::channel instead of std sync_channel in L…
Browse files Browse the repository at this point in the history
…1 watcher (#162)

fix: use tokio `mpsc::channel` instead of std `sync_channel` in l1 watcher
  • Loading branch information
borispovod authored Sep 5, 2023
1 parent 2f18889 commit 36e3e07
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
4 changes: 2 additions & 2 deletions src/derive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ mod tests {

let mut pipeline = Pipeline::new(state.clone(), config.clone(), 0).unwrap();

chain_watcher.block_update_receiver.recv().unwrap();
let update = chain_watcher.block_update_receiver.recv().unwrap();
chain_watcher.block_update_receiver.recv().await.unwrap();
let update = chain_watcher.block_update_receiver.recv().await.unwrap();

let l1_info = match update {
BlockUpdate::NewBlock(block) => *block,
Expand Down
28 changes: 11 additions & 17 deletions src/l1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
use std::{
collections::HashMap,
sync::{
mpsc::{channel, sync_channel, Receiver, SyncSender},
Arc,
},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};

use ethers::{
providers::{Http, HttpRateLimitRetryPolicy, Middleware, Provider, RetryClient},
Expand All @@ -19,7 +12,7 @@ use ethers::{
use eyre::Result;
use once_cell::sync::Lazy;
use reqwest::Url;
use tokio::{spawn, task::JoinHandle, time::sleep};
use tokio::{spawn, sync::mpsc, task::JoinHandle, time::sleep};

use crate::{
common::BlockInfo,
Expand Down Expand Up @@ -50,7 +43,7 @@ pub struct ChainWatcher {
/// The L2 starting block
l2_start_block: u64,
/// Channel for receiving block updates for each new block
pub block_update_receiver: Receiver<BlockUpdate>,
pub block_update_receiver: mpsc::Receiver<BlockUpdate>,
}

/// Updates L1Info
Expand Down Expand Up @@ -101,7 +94,7 @@ struct InnerWatcher {
/// Ethers provider for L1
provider: Arc<Provider<RetryClient<Http>>>,
/// Channel to send block updates
block_update_sender: SyncSender<BlockUpdate>,
block_update_sender: mpsc::Sender<BlockUpdate>,
/// Most recent ingested block
current_block: u64,
/// Most recent block
Expand Down Expand Up @@ -133,7 +126,7 @@ impl ChainWatcher {
/// Creates a new ChainWatcher and begins the monitoring task.
/// Errors if the rpc url in the config is invalid.
pub fn new(l1_start_block: u64, l2_start_block: u64, config: Arc<Config>) -> Result<Self> {
let (_, block_updates) = channel();
let (_, block_updates) = mpsc::channel(1);

Ok(Self {
handle: None,
Expand Down Expand Up @@ -182,7 +175,7 @@ impl ChainWatcher {
impl InnerWatcher {
async fn new(
config: Arc<Config>,
block_update_sender: SyncSender<BlockUpdate>,
block_update_sender: mpsc::Sender<BlockUpdate>,
l1_start_block: u64,
l2_start_block: u64,
) -> Self {
Expand Down Expand Up @@ -241,7 +234,8 @@ impl InnerWatcher {

self.finalized_block = finalized_block;
self.block_update_sender
.send(BlockUpdate::FinalityUpdate(finalized_block))?;
.send(BlockUpdate::FinalityUpdate(finalized_block))
.await?;

self.unfinalized_blocks
.retain(|b| b.number > self.finalized_block)
Expand Down Expand Up @@ -284,7 +278,7 @@ impl InnerWatcher {
BlockUpdate::NewBlock(Box::new(l1_info))
};

self.block_update_sender.send(update)?;
self.block_update_sender.send(update).await?;

self.current_block += 1;
} else {
Expand Down Expand Up @@ -479,8 +473,8 @@ fn start_watcher(
l1_start_block: u64,
l2_start_block: u64,
config: Arc<Config>,
) -> Result<(JoinHandle<()>, Receiver<BlockUpdate>)> {
let (block_update_sender, block_update_receiver) = sync_channel(1000);
) -> Result<(JoinHandle<()>, mpsc::Receiver<BlockUpdate>)> {
let (block_update_sender, block_update_receiver) = mpsc::channel(1000);

let handle = spawn(async move {
let mut watcher =
Expand Down

0 comments on commit 36e3e07

Please sign in to comment.