Skip to content

Commit

Permalink
feat: blob fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
merklefruit committed Feb 2, 2024
1 parent 3c091e9 commit 90d038a
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 119 deletions.
9 changes: 5 additions & 4 deletions src/derive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ mod tests {
_ => panic!("wrong update type"),
};

// let calldata = l1_info.batcher_transactions.into();
// TODO: get calldata from blob
let calldata = vec![];

pipeline
.push_batcher_transactions(
l1_info.batcher_transactions.clone(),
l1_info.block_info.number,
)
.push_batcher_transactions(calldata, l1_info.block_info.number)
.unwrap();

state.write().unwrap().update_l1_info(l1_info);
Expand Down
7 changes: 5 additions & 2 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,11 @@ impl<E: Engine> Driver<E> {
self.unsafe_block_signer_sender
.send(l1_info.system_config.unsafe_block_signer)?;

self.pipeline
.push_batcher_transactions(l1_info.batcher_transactions.clone(), num)?;
// let calldata = l1_info.batcher_transactions.into();
// TODO: decode valid data from blob
let calldata = vec![];

self.pipeline.push_batcher_transactions(calldata, num)?;

self.state
.write()
Expand Down
61 changes: 0 additions & 61 deletions src/l1/beacon_api.rs

This file was deleted.

122 changes: 122 additions & 0 deletions src/l1/blob_fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use bytes::Bytes;
use ethers::types::{Address, Block, Transaction};
use eyre::Result;
use serde::Deserialize;
use serde_json::Value;

const BLOB_CARRYING_TRANSACTION_TYPE: u64 = 3;

#[derive(Debug)]
pub enum BatcherTransactionData {
Calldata(Vec<u8>),
Blob(BlobSidecar),
}

pub struct BlobFetcher {
l1_beacon_url: String,
client: reqwest::Client,

batch_sender: Address,
batch_inbox: Address,
}

pub enum FetchBlobFilter {
Slot(u64),
BlockRoot(String),
}

#[derive(Debug, Deserialize)]
pub struct BlobSidecar {
pub index: String,
pub blob: Bytes,
// kzg_commitment: String,
// kzg_proof: String,
// signed_block_header: Value,
// kzg_commitment_inclusion_proof: Vec<String>,
}

impl BlobFetcher {
pub fn new(l1_beacon_url: String, batch_inbox: Address, batch_sender: Address) -> Self {
Self {
l1_beacon_url,
batch_inbox,
batch_sender,
client: reqwest::Client::new(),
}
}

pub async fn fetch_blob_sidecars(&self, filter: FetchBlobFilter) -> Result<Vec<BlobSidecar>> {
let base_url = format!("{}/eth/v1/beacon/blob_sidecars", self.l1_beacon_url);
let full_url = match filter {
FetchBlobFilter::BlockRoot(block_root) => format!("{}/{}", base_url, block_root),
FetchBlobFilter::Slot(slot) => format!("{}/{}", base_url, slot),
};

let resp = self.client.get(full_url).send().await?.error_for_status()?;
let resp = serde_json::from_slice::<Value>(&resp.bytes().await?)?;
let resp = resp.get("data").ok_or(eyre::eyre!("No data in response"))?;

let blobs = serde_json::from_value::<Vec<BlobSidecar>>(resp.clone())?;

Ok(blobs)
}

pub async fn get_batcher_transactions(
&self,
block: &Block<Transaction>,
) -> Result<Vec<BatcherTransactionData>> {
let mut batcher_transactions = Vec::new();
let mut blob_index = 0;

Check failure on line 69 in src/l1/blob_fetcher.rs

View workflow job for this annotation

GitHub Actions / clippy

variable `blob_index` is assigned to, but never used

Check warning on line 69 in src/l1/blob_fetcher.rs

View workflow job for this annotation

GitHub Actions / check

variable `blob_index` is assigned to, but never used

Check warning on line 69 in src/l1/blob_fetcher.rs

View workflow job for this annotation

GitHub Actions / build

variable `blob_index` is assigned to, but never used

Check warning on line 69 in src/l1/blob_fetcher.rs

View workflow job for this annotation

GitHub Actions / test

variable `blob_index` is assigned to, but never used

for tx in block.transactions.iter() {
if !self.is_valid_batcher_transaction(tx) {
blob_index += 1;
continue;
}

// sanity check: transactions here should always have a transaction type
let Some(tx_type) = tx.transaction_type.map(|t| t.as_u64()) else {
tracing::error!("found batcher tx without tx_type. This shouldn't happen.");
continue;
};

if tx_type != BLOB_CARRYING_TRANSACTION_TYPE {
batcher_transactions.push(BatcherTransactionData::Calldata(tx.input.to_vec()));
continue;
}

// TODO:
// download blobs from the beacon chain here...
// let slot = get_slot_from_block_number(block.number);
// let blobs = self.fetch_blob_sidecars(FetchBlobFilter::Slot(slot)).await?;
}

Ok(vec![])
}

fn is_valid_batcher_transaction(&self, tx: &Transaction) -> bool {
tx.from == self.batch_sender && tx.to.map(|to| to == self.batch_inbox).unwrap_or(false)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
// TODO: update with a test from mainnet after dencun is active
async fn test_get_blobs() {
// TODO: use env vars in tests
// let Ok(l1_beacon_url) = std::env::var("L1_BEACON_TEST_RPC_URL") else {
// return;
// };
let l1_beacon_url = "https://remotelab.taila355b.ts.net".to_string();
let retriever = BlobFetcher::new(l1_beacon_url, Address::zero(), Address::zero());
let blobs = retriever
.fetch_blob_sidecars(FetchBlobFilter::Slot(4248703))
.await
.unwrap();

assert_eq!(blobs.len(), 3);
}
}
23 changes: 17 additions & 6 deletions src/l1/chain_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
derive::stages::attributes::UserDeposited,
};

use super::{L1Info, SystemConfigUpdate};
use super::{l1_info::L1BlockInfo, BlobFetcher, L1Info, SystemConfigUpdate};

static CONFIG_UPDATE_TOPIC: Lazy<H256> =
Lazy::new(|| H256::from_slice(&keccak256("ConfigUpdate(uint256,uint8,bytes)")));
Expand Down Expand Up @@ -61,6 +61,8 @@ struct InnerWatcher {
config: Arc<Config>,
/// Ethers provider for L1
provider: Arc<Provider<RetryClient<Http>>>,
/// L1 beacon node to fetch blobs
blob_fetcher: Arc<BlobFetcher>,
/// Channel to send block updates
block_update_sender: mpsc::Sender<BlockUpdate>,
/// Most recent ingested block
Expand Down Expand Up @@ -165,6 +167,12 @@ impl InnerWatcher {
) -> Self {
let provider = generate_http_provider(&config.l1_rpc_url);

let blob_fetcher = Arc::new(BlobFetcher::new(
config.l1_beacon_url.clone(),
config.chain.batch_inbox,
config.chain.system_config.batch_sender,
));

let system_config = if l2_start_block == config.chain.l2_genesis.number {
config.chain.system_config
} else {
Expand Down Expand Up @@ -201,6 +209,7 @@ impl InnerWatcher {
Self {
config,
provider,
blob_fetcher,
block_update_sender,
current_block: l1_start_block,
head_block: 0,
Expand Down Expand Up @@ -241,13 +250,15 @@ impl InnerWatcher {
let user_deposits = self.get_deposits(self.current_block).await?;
let finalized = self.current_block >= self.finalized_block;

let l1_info = L1Info::new(
&block,
let batcher_transactions = self.blob_fetcher.get_batcher_transactions(&block).await?;

let l1_info = L1Info {
system_config: self.system_config,
block_info: L1BlockInfo::try_from(&block)?,
batcher_transactions,
user_deposits,
self.config.chain.batch_inbox,
finalized,
self.system_config,
)?;
};

if l1_info.block_info.number >= self.finalized_block {
let block_info = BlockInfo {
Expand Down
64 changes: 20 additions & 44 deletions src/l1/l1_info.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use ethers::types::{Address, Block, Transaction, H256, U256};
use eyre::Result;
use ethers::types::{Block, Transaction, H256, U256};

use super::BatcherTransactionData;
use crate::{config::SystemConfig, derive::stages::attributes::UserDeposited};

type BatcherTransactionData = Vec<u8>;

/// Data tied to a specific L1 block
#[derive(Debug)]
pub struct L1Info {
Expand Down Expand Up @@ -35,53 +33,31 @@ pub struct L1BlockInfo {
pub mix_hash: H256,
}

impl L1Info {
pub fn new(
block: &Block<Transaction>,
user_deposits: Vec<UserDeposited>,
batch_inbox: Address,
finalized: bool,
system_config: SystemConfig,
) -> Result<Self> {
let block_number = block
impl TryFrom<&Block<Transaction>> for L1BlockInfo {
type Error = eyre::Error;

fn try_from(value: &Block<Transaction>) -> std::result::Result<Self, Self::Error> {
let number = value
.number
.ok_or(eyre::eyre!("block not included"))?
.as_u64();

let block_hash = block.hash.ok_or(eyre::eyre!("block not included"))?;
let hash = value.hash.ok_or(eyre::eyre!("block not included"))?;

let timestamp = value.timestamp.as_u64();

let block_info = L1BlockInfo {
number: block_number,
hash: block_hash,
timestamp: block.timestamp.as_u64(),
base_fee: block
.base_fee_per_gas
.ok_or(eyre::eyre!("block is pre london"))?,
mix_hash: block.mix_hash.ok_or(eyre::eyre!("block not included"))?,
};
let base_fee = value
.base_fee_per_gas
.ok_or(eyre::eyre!("block is pre london"))?;

let batcher_transactions =
create_batcher_transactions(block, system_config.batch_sender, batch_inbox);
let mix_hash = value.mix_hash.ok_or(eyre::eyre!("block not included"))?;

Ok(L1Info {
block_info,
system_config,
user_deposits,
batcher_transactions,
finalized,
Ok(L1BlockInfo {
number,
hash,
timestamp,
base_fee,
mix_hash,
})
}
}

fn create_batcher_transactions(
block: &Block<Transaction>,
batch_sender: Address,
batch_inbox: Address,
) -> Vec<BatcherTransactionData> {
block
.transactions
.iter()
.filter(|tx| tx.from == batch_sender && tx.to.map(|to| to == batch_inbox).unwrap_or(false))
.map(|tx| tx.input.to_vec())
.collect()
}
4 changes: 2 additions & 2 deletions src/l1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ pub use config_updates::SystemConfigUpdate;
pub mod l1_info;
pub use l1_info::L1Info;

pub mod beacon_api;
pub use beacon_api::{BlobFilter, L1BeaconApi};
pub mod blob_fetcher;
pub use blob_fetcher::{BatcherTransactionData, BlobFetcher, BlobSidecar};

0 comments on commit 90d038a

Please sign in to comment.