From 835482782d9e9d70ca2d57377bb4500fc1653336 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Sat, 3 Feb 2024 14:36:24 +0100 Subject: [PATCH] feat: blob decoding wip --- src/derive/mod.rs | 12 +- src/derive/stages/batcher_transactions.rs | 4 +- src/driver/mod.rs | 10 +- src/l1/blob_encoding.rs | 95 ++++++++++ src/l1/blob_fetcher.rs | 208 ++++++++++++++++------ src/l1/chain_watcher.rs | 7 +- src/l1/mod.rs | 3 + 7 files changed, 269 insertions(+), 70 deletions(-) create mode 100644 src/l1/blob_encoding.rs diff --git a/src/derive/mod.rs b/src/derive/mod.rs index 22586ea8..90c105cc 100644 --- a/src/derive/mod.rs +++ b/src/derive/mod.rs @@ -1,5 +1,6 @@ use std::sync::{mpsc, Arc, RwLock}; +use bytes::Bytes; use eyre::Result; use crate::{config::Config, engine::PayloadAttributes}; @@ -53,7 +54,7 @@ impl Pipeline { }) } - pub fn push_batcher_transactions(&self, txs: Vec>, l1_origin: u64) -> Result<()> { + pub fn push_batcher_transactions(&self, txs: Vec, l1_origin: u64) -> Result<()> { self.batcher_transaction_sender .send(BatcherTransactionMessage { txs, l1_origin })?; Ok(()) @@ -142,12 +143,11 @@ mod tests { _ => panic!("wrong update type"), }; - // let calldata = l1_info.batcher_transactions.into(); - // TODO: get calldata from blob - let calldata = vec![]; - pipeline - .push_batcher_transactions(calldata, l1_info.block_info.number) + .push_batcher_transactions( + l1_info.batcher_transactions.clone(), + l1_info.block_info.number, + ) .unwrap(); state.write().unwrap().update_l1_info(l1_info); diff --git a/src/derive/stages/batcher_transactions.rs b/src/derive/stages/batcher_transactions.rs index ccd72809..cb2abed4 100644 --- a/src/derive/stages/batcher_transactions.rs +++ b/src/derive/stages/batcher_transactions.rs @@ -6,7 +6,7 @@ use std::collections::VecDeque; use crate::derive::PurgeableIterator; pub struct BatcherTransactionMessage { - pub txs: Vec>, + pub txs: Vec, pub l1_origin: u64, } @@ -156,7 +156,7 @@ mod tests { #[test] fn test_push_tx() { - let data = hex::decode(TX_DATA).unwrap(); + let data = bytes::Bytes::from(hex::decode(TX_DATA).unwrap()); let txs = vec![data]; let (tx, rx) = mpsc::channel(); diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 615953d7..464926b0 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -264,11 +264,11 @@ impl Driver { self.unsafe_block_signer_sender .send(l1_info.system_config.unsafe_block_signer)?; - // let calldata = l1_info.batcher_transactions.into(); - // TODO: decode valid data from blob - let calldata = vec![]; - - self.pipeline.push_batcher_transactions(calldata, num)?; + self.pipeline.push_batcher_transactions( + // cloning `bytes::Bytes` is cheap + l1_info.batcher_transactions.clone(), + num, + )?; self.state .write() diff --git a/src/l1/blob_encoding.rs b/src/l1/blob_encoding.rs new file mode 100644 index 00000000..6121da46 --- /dev/null +++ b/src/l1/blob_encoding.rs @@ -0,0 +1,95 @@ +use bytes::Bytes; +use eyre::Result; + +const MAX_BLOB_DATA_SIZE: usize = (4 * 31 + 3) * 1024 - 4; +const ENCODING_VERSION: u8 = 0; +const VERSION_OFFSET: usize = 0; +const ROUNDS: usize = 1024; + +pub fn decode_blob_data(blob: &Bytes) -> Result { + let mut output = vec![0; MAX_BLOB_DATA_SIZE]; + + if blob[VERSION_OFFSET] != ENCODING_VERSION { + eyre::bail!("Invalid encoding version"); + } + + // decode the 3-byte big-endian length value into a 4-byte integer + let output_len = u32::from_be_bytes([0, blob[2], blob[3], blob[4]]) as usize; + if output_len > MAX_BLOB_DATA_SIZE { + eyre::bail!("Invalid length"); + } + + output[0..27].copy_from_slice(&blob[5..32]); + + let mut output_pos = 28; + let mut input_pos = 32; + + // buffer for the 4 6-bit chunks + let mut encoded_byte = [0; 4]; + + encoded_byte[0] = blob[0]; + for byte in encoded_byte.iter_mut().skip(1) { + *byte = decode_field_element(&mut output_pos, &mut input_pos, blob, &mut output)?; + } + reassemble_bytes(&mut output_pos, encoded_byte, &mut output); + + for _ in 1..ROUNDS { + if output_pos >= output_len { + break; + } + + for byte in encoded_byte.iter_mut() { + *byte = decode_field_element(&mut output_pos, &mut input_pos, blob, &mut output)?; + } + reassemble_bytes(&mut output_pos, encoded_byte, &mut output); + } + + for output_byte in output.iter().take(MAX_BLOB_DATA_SIZE).skip(output_len) { + if output_byte != &0 { + eyre::bail!("Extraneous data in field element {}", output_pos / 32); + } + } + + output.truncate(output_len); + + for byte in blob.iter().skip(input_pos) { + if byte != &0 { + eyre::bail!("Extraneous data in input position {}", input_pos); + } + } + + Ok(output.into()) +} + +fn decode_field_element( + output_pos: &mut usize, + input_pos: &mut usize, + blob: &Bytes, + output: &mut [u8], +) -> Result { + let result = blob[*input_pos]; + + // two highest order bits of the first byte of each field element should always be 0 + if result & 0b1100_0000 != 0 { + eyre::bail!("Invalid field element"); + } + + output[*output_pos..*output_pos + 31].copy_from_slice(&blob[*input_pos + 1..*input_pos + 32]); + + *output_pos += 32; + *input_pos += 32; + + Ok(result) +} + +fn reassemble_bytes(output_pos: &mut usize, encoded_byte: [u8; 4], output: &mut [u8]) { + *output_pos -= 1; + + let x = (encoded_byte[0] & 0b0011_1111) | ((encoded_byte[1] & 0b0011_0000) << 2); + let y = (encoded_byte[1] & 0b0000_1111) | ((encoded_byte[3] & 0b0000_1111) << 4); + let z = (encoded_byte[2] & 0b0011_1111) | ((encoded_byte[3] & 0b0011_0000) << 2); + + output[*output_pos - 32] = z; + output[*output_pos - (32 * 2)] = y; + output[*output_pos - (32 * 3)] = x; +} diff --git a/src/l1/blob_fetcher.rs b/src/l1/blob_fetcher.rs index 06732d0d..ef6801d7 100644 --- a/src/l1/blob_fetcher.rs +++ b/src/l1/blob_fetcher.rs @@ -1,33 +1,38 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + use bytes::Bytes; -use ethers::types::{Address, Block, Transaction}; +use ethers::types::{Block, Transaction, H256}; use eyre::Result; use serde::Deserialize; use serde_json::Value; +use super::decode_blob_data; +use crate::config::Config; + const BLOB_CARRYING_TRANSACTION_TYPE: u64 = 3; -#[derive(Debug)] -pub enum BatcherTransactionData { - Calldata(Vec), - Blob(BlobSidecar), -} +pub type BatcherTransactionData = Bytes; +/// The blob fetcher is responsible for fetching blob data from the L1 beacon chain, +/// along with relevant parsing and validation. +/// +/// Consensus layer info required for deriving the slot at which a specific blob was +/// included in the beacon chain is fetched on the first call to [`Self::get_slot_from_time`] +/// and cached for all subsequent calls. pub struct BlobFetcher { - l1_beacon_url: String, + config: Arc, client: reqwest::Client, - - batch_sender: Address, - batch_inbox: Address, -} - -pub enum FetchBlobFilter { - Slot(u64), - BlockRoot(String), + genesis_timestamp: AtomicU64, + seconds_per_slot: AtomicU64, } #[derive(Debug, Deserialize)] pub struct BlobSidecar { - pub index: String, + #[serde(deserialize_with = "deserialize_string_to_u64")] + pub index: u64, pub blob: Bytes, // kzg_commitment: String, // kzg_proof: String, @@ -36,41 +41,28 @@ pub struct BlobSidecar { } impl BlobFetcher { - pub fn new(l1_beacon_url: String, batch_inbox: Address, batch_sender: Address) -> Self { + pub fn new(config: Arc) -> Self { Self { - l1_beacon_url, - batch_inbox, - batch_sender, + config, client: reqwest::Client::new(), + genesis_timestamp: AtomicU64::new(0), + seconds_per_slot: AtomicU64::new(0), } } - pub async fn fetch_blob_sidecars(&self, filter: FetchBlobFilter) -> Result> { - let base_url = format!("{}/eth/v1/beacon/blob_sidecars", self.l1_beacon_url); - let full_url = match filter { - FetchBlobFilter::BlockRoot(block_root) => format!("{}/{}", base_url, block_root), - FetchBlobFilter::Slot(slot) => format!("{}/{}", base_url, slot), - }; - - let resp = self.client.get(full_url).send().await?.error_for_status()?; - let resp = serde_json::from_slice::(&resp.bytes().await?)?; - let resp = resp.get("data").ok_or(eyre::eyre!("No data in response"))?; - - let blobs = serde_json::from_value::>(resp.clone())?; - - Ok(blobs) - } - + /// Given a block, return a list of `BatcherTransactionData` containing either the + /// calldata or the decoded blob data for each batcher transaction in the block. pub async fn get_batcher_transactions( &self, block: &Block, ) -> Result> { - let mut batcher_transactions = Vec::new(); + let mut batcher_transactions_data = Vec::new(); + let mut indexed_blobs = Vec::new(); let mut blob_index = 0; for tx in block.transactions.iter() { if !self.is_valid_batcher_transaction(tx) { - blob_index += 1; + blob_index += 1; // TODO: += number of actual tx.blob_hashes continue; } @@ -81,22 +73,134 @@ impl BlobFetcher { }; if tx_type != BLOB_CARRYING_TRANSACTION_TYPE { - batcher_transactions.push(BatcherTransactionData::Calldata(tx.input.to_vec())); + // this is necessary because ethers-rs wraps `bytes::Bytes` into its own type + // that doesn't come with free conversion back to `bytes::Bytes`. + let calldata = Bytes::from(tx.input.to_vec().clone()); + batcher_transactions_data.push(calldata); continue; } - // TODO: - // download blobs from the beacon chain here... - // let slot = get_slot_from_block_number(block.number); - // let blobs = self.fetch_blob_sidecars(FetchBlobFilter::Slot(slot)).await?; + // TODO: retrieve tx.blob_hashes. might need to update/fork ethers-rs. + // are there other ways to see how many blobs are in a tx? + let blob_hashes = vec![H256::zero()]; + for blob_hash in blob_hashes { + indexed_blobs.push((blob_index, blob_hash)); + blob_index += 1; + } + } + + // if at this point there are no blobs, return early + if indexed_blobs.is_empty() { + return Ok(batcher_transactions_data); } - Ok(vec![]) + let slot = self.get_slot_from_time(block.timestamp.as_u64()).await?; + // perf: fetch only the required indexes instead of all + let blobs = self.fetch_blob_sidecars(slot).await?; + + for (blob_index, blob_hash) in indexed_blobs { + let Some(blob_sidecar) = blobs.iter().find(|b| b.index == blob_index) else { + // This can happen in the case the blob retention window has expired + // and the data is no longer available. This case is not handled yet. + eyre::bail!("blob index {} not found in fetched sidecars", blob_index); + }; + + // decode the full blob + let decoded_blob_data = decode_blob_data(&blob_sidecar.blob)?; + tracing::debug!("successfully decoded blob data for hash {:?}", blob_hash); + + batcher_transactions_data.push(decoded_blob_data); + } + + Ok(batcher_transactions_data) } + #[inline] fn is_valid_batcher_transaction(&self, tx: &Transaction) -> bool { - tx.from == self.batch_sender && tx.to.map(|to| to == self.batch_inbox).unwrap_or(false) + let batch_sender = self.config.chain.system_config.batch_sender; + let batch_inbox = self.config.chain.batch_inbox; + + tx.from == batch_sender && tx.to.map(|to| to == batch_inbox).unwrap_or(false) } + + #[inline] + async fn get_slot_from_time(&self, time: u64) -> Result { + let mut genesis_timestamp = self.genesis_timestamp.load(Ordering::Relaxed); + let mut seconds_per_slot = self.seconds_per_slot.load(Ordering::Relaxed); + + // If we don't have data about the genesis timestamp, we need to fetch it + // from the CL first along with the "SECONDS_PER_SLOT" value from the spec. + if genesis_timestamp == 0 { + genesis_timestamp = self.fetch_beacon_genesis_timestamp().await?; + self.genesis_timestamp + .store(genesis_timestamp, Ordering::Relaxed); + + let spec = self.fetch_beacon_spec().await?; + seconds_per_slot = spec + .get("SECONDS_PER_SLOT") + .ok_or(eyre::eyre!("No seconds per slot in beacon spec"))? + .as_str() + .ok_or(eyre::eyre!("Seconds per slot: expected string"))? + .parse::()?; + + if seconds_per_slot == 0 { + eyre::bail!("Seconds per slot is 0; cannot calculate slot number"); + } + + self.seconds_per_slot + .store(seconds_per_slot, Ordering::Relaxed); + } + + if time < genesis_timestamp { + eyre::bail!("Time is before genesis; cannot calculate slot number"); + } + + Ok((time - genesis_timestamp) / seconds_per_slot) + } + + async fn fetch_blob_sidecars(&self, slot: u64) -> Result> { + let base_url = format!("{}/eth/v1/beacon/blob_sidecars", self.config.l1_beacon_url); + let full_url = format!("{}/{}", base_url, slot); + + let res = self.client.get(full_url).send().await?.error_for_status()?; + let res = serde_json::from_slice::(&res.bytes().await?)?; + let res = res.get("data").ok_or(eyre::eyre!("No data in response"))?; + + let blobs = serde_json::from_value::>(res.clone())?; + + Ok(blobs) + } + + async fn fetch_beacon_genesis_timestamp(&self) -> Result { + let base_url = format!("{}/eth/v1/beacon/genesis", self.config.l1_beacon_url); + + let res = self.client.get(base_url).send().await?.error_for_status()?; + let res = serde_json::from_slice::(&res.bytes().await?)?; + let res = res.get("data").ok_or(eyre::eyre!("No data in response"))?; + let res = res.get("genesis_time").ok_or(eyre::eyre!("No time"))?; + + let genesis_time = serde_json::from_value::(res.clone())?; + + Ok(genesis_time) + } + + async fn fetch_beacon_spec(&self) -> Result { + let base_url = format!("{}/eth/v1/config/spec", self.config.l1_beacon_url); + + let res = self.client.get(base_url).send().await?.error_for_status()?; + let res = serde_json::from_slice::(&res.bytes().await?)?; + let res = res.get("data").ok_or(eyre::eyre!("No data in response"))?; + + Ok(res.clone()) + } +} + +fn deserialize_string_to_u64<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + s.parse::().map_err(serde::de::Error::custom) } #[cfg(test)] @@ -110,12 +214,14 @@ mod tests { // let Ok(l1_beacon_url) = std::env::var("L1_BEACON_TEST_RPC_URL") else { // return; // }; - let l1_beacon_url = "https://remotelab.taila355b.ts.net".to_string(); - let retriever = BlobFetcher::new(l1_beacon_url, Address::zero(), Address::zero()); - let blobs = retriever - .fetch_blob_sidecars(FetchBlobFilter::Slot(4248703)) - .await - .unwrap(); + + let config = Arc::new(Config { + l1_beacon_url: "https://remotelab.taila355b.ts.net".to_string(), + ..Default::default() + }); + + let retriever = BlobFetcher::new(config); + let blobs = retriever.fetch_blob_sidecars(4248703).await.unwrap(); assert_eq!(blobs.len(), 3); } diff --git a/src/l1/chain_watcher.rs b/src/l1/chain_watcher.rs index e18347cc..9bff83a7 100644 --- a/src/l1/chain_watcher.rs +++ b/src/l1/chain_watcher.rs @@ -166,12 +166,7 @@ impl InnerWatcher { l2_start_block: u64, ) -> Self { let provider = generate_http_provider(&config.l1_rpc_url); - - let blob_fetcher = Arc::new(BlobFetcher::new( - config.l1_beacon_url.clone(), - config.chain.batch_inbox, - config.chain.system_config.batch_sender, - )); + let blob_fetcher = Arc::new(BlobFetcher::new(Arc::clone(&config))); let system_config = if l2_start_block == config.chain.l2_genesis.number { config.chain.system_config diff --git a/src/l1/mod.rs b/src/l1/mod.rs index 09035241..166e8749 100644 --- a/src/l1/mod.rs +++ b/src/l1/mod.rs @@ -9,3 +9,6 @@ pub use l1_info::L1Info; pub mod blob_fetcher; pub use blob_fetcher::{BatcherTransactionData, BlobFetcher, BlobSidecar}; + +pub mod blob_encoding; +pub use blob_encoding::decode_blob_data;