diff --git a/src/config/mod.rs b/src/config/mod.rs index 0dc6967c..98137de0 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -37,7 +37,7 @@ impl FromStr for SyncMode { } /// A system configuration -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Default)] pub struct Config { /// The base chain RPC URL pub l1_rpc_url: String, @@ -143,6 +143,8 @@ pub struct ChainConfig { pub regolith_time: u64, /// Timestamp of the canyon hardfork pub canyon_time: u64, + /// Timestamp of the delta hardfork + pub delta_time: u64, /// Network blocktime #[serde(default = "default_blocktime")] pub blocktime: u64, @@ -150,6 +152,12 @@ pub struct ChainConfig { pub l2_to_l1_message_passer: Address, } +impl Default for ChainConfig { + fn default() -> Self { + ChainConfig::optimism() + } +} + /// Optimism system config contract values #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct SystemConfig { @@ -255,6 +263,7 @@ impl ChainConfig { blocktime: 2, regolith_time: 0, canyon_time: 170499240, + delta_time: u64::MAX, } } @@ -293,6 +302,7 @@ impl ChainConfig { max_seq_drift: 600, regolith_time: 1679079600, canyon_time: 1699981200, + delta_time: 1703116800, blocktime: 2, } } @@ -331,6 +341,7 @@ impl ChainConfig { max_seq_drift: 600, regolith_time: 0, canyon_time: 1699981200, + delta_time: 1703203200, blocktime: 2, } } @@ -369,6 +380,7 @@ impl ChainConfig { blocktime: 2, regolith_time: 0, canyon_time: 1704992401, + delta_time: u64::MAX, } } @@ -405,6 +417,7 @@ impl ChainConfig { max_seq_drift: 600, regolith_time: 1683219600, canyon_time: 1699981200, + delta_time: 1703116800, blocktime: 2, } } @@ -442,6 +455,7 @@ impl ChainConfig { max_seq_drift: 600, regolith_time: 0, canyon_time: 1699981200, + delta_time: 1703203200, blocktime: 2, } } @@ -485,6 +499,7 @@ pub struct ExternalChainConfig { l2_chain_id: u64, regolith_time: u64, canyon_time: u64, + delta_time: u64, batch_inbox_address: Address, deposit_contract_address: Address, l1_system_config_address: Address, @@ -547,6 +562,7 @@ impl From for ChainConfig { max_seq_drift: external.max_sequencer_drift, regolith_time: external.regolith_time, canyon_time: external.canyon_time, + delta_time: external.delta_time, blocktime: external.block_time, l2_to_l1_message_passer: addr("0x4200000000000000000000000000000000000016"), } @@ -593,6 +609,7 @@ impl From for ExternalChainConfig { l2_chain_id: chain_config.l2_chain_id, regolith_time: chain_config.regolith_time, canyon_time: chain_config.canyon_time, + delta_time: chain_config.delta_time, batch_inbox_address: chain_config.batch_inbox, deposit_contract_address: chain_config.deposit_contract, l1_system_config_address: chain_config.system_config_contract, @@ -718,8 +735,9 @@ mod test { "channel_timeout": 120, "l1_chain_id": 900, "l2_chain_id": 901, - "regolith_time": 0, - "canyon_time": 0, + "regolith_time": 1, + "canyon_time": 2, + "delta_time": 3, "batch_inbox_address": "0xff00000000000000000000000000000000000000", "deposit_contract_address": "0x6900000000000000000000000000000000000001", "l1_system_config_address": "0x6900000000000000000000000000000000000009" @@ -765,8 +783,9 @@ mod test { assert_eq!(chain.channel_timeout, 120); assert_eq!(chain.seq_window_size, 200); assert_eq!(chain.max_seq_drift, 300); - assert_eq!(chain.regolith_time, 0); - assert_eq!(chain.canyon_time, 0); + assert_eq!(chain.regolith_time, 1); + assert_eq!(chain.canyon_time, 2); + assert_eq!(chain.delta_time, 3); assert_eq!(chain.blocktime, 2); assert_eq!( chain.l2_to_l1_message_passer, diff --git a/src/derive/mod.rs b/src/derive/mod.rs index 3f1ef0bc..ab2dcd07 100644 --- a/src/derive/mod.rs +++ b/src/derive/mod.rs @@ -120,11 +120,16 @@ mod tests { chain_watcher.start().unwrap(); - let state = Arc::new(RwLock::new(State::new( - config.chain.l2_genesis, - config.chain.l1_start_epoch, - config.clone(), - ))); + let provider = Provider::try_from(env::var("L2_TEST_RPC_URL").unwrap()).unwrap(); + let state = Arc::new(RwLock::new( + State::new( + config.chain.l2_genesis, + config.chain.l1_start_epoch, + &provider, + config.clone(), + ) + .await, + )); let mut pipeline = Pipeline::new(state.clone(), config.clone(), 0).unwrap(); diff --git a/src/derive/stages/attributes.rs b/src/derive/stages/attributes.rs index daacffd6..3f1b3959 100644 --- a/src/derive/stages/attributes.rs +++ b/src/derive/stages/attributes.rs @@ -13,10 +13,10 @@ use crate::derive::PurgeableIterator; use crate::engine::PayloadAttributes; use crate::l1::L1Info; -use super::batches::Batch; +use super::block_input::BlockInput; pub struct Attributes { - batch_iter: Box>, + block_input_iter: Box>>, state: Arc>, sequence_number: u64, epoch_hash: H256, @@ -27,15 +27,16 @@ impl Iterator for Attributes { type Item = PayloadAttributes; fn next(&mut self) -> Option { - self.batch_iter + self.block_input_iter .next() + .map(|input| input.with_full_epoch(&self.state).unwrap()) .map(|batch| self.derive_attributes(batch)) } } impl PurgeableIterator for Attributes { fn purge(&mut self) { - self.batch_iter.purge(); + self.block_input_iter.purge(); self.sequence_number = 0; self.epoch_hash = self.state.read().unwrap().safe_epoch.hash; } @@ -43,7 +44,7 @@ impl PurgeableIterator for Attributes { impl Attributes { pub fn new( - batch_iter: Box>, + block_input_iter: Box>>, state: Arc>, config: Arc, seq: u64, @@ -51,7 +52,7 @@ impl Attributes { let epoch_hash = state.read().unwrap().safe_epoch.hash; Self { - batch_iter, + block_input_iter, state, sequence_number: seq, epoch_hash, @@ -59,32 +60,27 @@ impl Attributes { } } - fn derive_attributes(&mut self, batch: Batch) -> PayloadAttributes { - tracing::debug!("attributes derived from block {}", batch.epoch_num); - tracing::debug!("batch epoch hash {:?}", batch.epoch_hash); + fn derive_attributes(&mut self, input: BlockInput) -> PayloadAttributes { + tracing::debug!("attributes derived from block {}", input.epoch.number); + tracing::debug!("batch epoch hash {:?}", input.epoch.hash); - self.update_sequence_number(batch.epoch_hash); + self.update_sequence_number(input.epoch.hash); let state = self.state.read().unwrap(); - let l1_info = state.l1_info_by_hash(batch.epoch_hash).unwrap(); + let l1_info = state.l1_info_by_hash(input.epoch.hash).unwrap(); - let epoch = Some(Epoch { - number: batch.epoch_num, - hash: batch.epoch_hash, - timestamp: l1_info.block_info.timestamp, - }); - - let withdrawals = if batch.timestamp >= self.config.chain.canyon_time { + let withdrawals = if input.timestamp >= self.config.chain.canyon_time { Some(Vec::new()) } else { None }; - let timestamp = U64([batch.timestamp]); - let l1_inclusion_block = Some(batch.l1_inclusion_block); + let timestamp = U64([input.timestamp]); + let l1_inclusion_block = Some(input.l1_inclusion_block); let seq_number = Some(self.sequence_number); let prev_randao = l1_info.block_info.mix_hash; - let transactions = Some(self.derive_transactions(batch, l1_info)); + let epoch = Some(input.epoch); + let transactions = Some(self.derive_transactions(input, l1_info)); let suggested_fee_recipient = SystemAccounts::default().fee_vault; PayloadAttributes { @@ -101,10 +97,14 @@ impl Attributes { } } - fn derive_transactions(&self, batch: Batch, l1_info: &L1Info) -> Vec { + fn derive_transactions( + &self, + input: BlockInput, + l1_info: &L1Info, + ) -> Vec { let mut transactions = Vec::new(); - let attributes_tx = self.derive_attributes_deposited(l1_info, batch.timestamp); + let attributes_tx = self.derive_attributes_deposited(l1_info, input.timestamp); transactions.push(attributes_tx); if self.sequence_number == 0 { @@ -112,7 +112,7 @@ impl Attributes { transactions.append(&mut user_deposited_txs); } - let mut rest = batch.transactions; + let mut rest = input.transactions; transactions.append(&mut rest); transactions diff --git a/src/derive/stages/batches.rs b/src/derive/stages/batches.rs index 03b195ab..768c98ab 100644 --- a/src/derive/stages/batches.rs +++ b/src/derive/stages/batches.rs @@ -4,22 +4,24 @@ use std::collections::BTreeMap; use std::io::Read; use std::sync::{Arc, RwLock}; -use ethers::types::H256; -use ethers::utils::rlp::{DecoderError, Rlp}; - +use ethers::utils::rlp::Rlp; use eyre::Result; use libflate::zlib::Decoder; -use crate::common::RawTransaction; use crate::config::Config; use crate::derive::state::State; use crate::derive::PurgeableIterator; +use super::block_input::BlockInput; use super::channels::Channel; +use super::single_batch::SingleBatch; +use super::span_batch::SpanBatch; pub struct Batches { /// Mapping of timestamps to batches batches: BTreeMap, + /// Pending block inputs to be outputed + pending_inputs: Vec>, channel_iter: I, state: Arc>, config: Arc, @@ -29,7 +31,7 @@ impl Iterator for Batches where I: Iterator, { - type Item = Batch; + type Item = BlockInput; fn next(&mut self) -> Option { self.try_next().unwrap_or_else(|_| { @@ -46,6 +48,7 @@ where fn purge(&mut self) { self.channel_iter.purge(); self.batches.clear(); + self.pending_inputs.clear(); } } @@ -53,6 +56,7 @@ impl Batches { pub fn new(channel_iter: I, state: Arc>, config: Arc) -> Self { Self { batches: BTreeMap::new(), + pending_inputs: Vec::new(), channel_iter, state, config, @@ -64,32 +68,32 @@ impl Batches where I: Iterator, { - fn try_next(&mut self) -> Result> { + fn try_next(&mut self) -> Result>> { + if !self.pending_inputs.is_empty() { + return Ok(Some(self.pending_inputs.remove(0))); + } + let channel = self.channel_iter.next(); if let Some(channel) = channel { - let batches = decode_batches(&channel)?; + let batches = decode_batches(&channel, self.config.chain.l2_chain_id)?; batches.into_iter().for_each(|batch| { - tracing::debug!( - "saw batch: t={}, ph={:?}, e={}", - batch.timestamp, - batch.parent_hash, - batch.epoch_num - ); - self.batches.insert(batch.timestamp, batch); + let timestamp = batch.timestamp(&self.config); + tracing::debug!("saw batch: t={}", timestamp); + self.batches.insert(timestamp, batch); }); } let derived_batch = loop { if let Some((_, batch)) = self.batches.first_key_value() { + let timestamp = batch.timestamp(&self.config); match self.batch_status(batch) { BatchStatus::Accept => { let batch = batch.clone(); - self.batches.remove(&batch.timestamp); + self.batches.remove(×tamp); break Some(batch); } BatchStatus::Drop => { tracing::warn!("dropping invalid batch"); - let timestamp = batch.timestamp; self.batches.remove(×tamp); } BatchStatus::Future | BatchStatus::Undecided => { @@ -101,7 +105,16 @@ where } }; - let batch = if derived_batch.is_none() { + Ok(if let Some(derived_batch) = derived_batch { + let mut inputs = self.filter_inputs(derived_batch.as_inputs(&self.config)); + if !inputs.is_empty() { + let first = inputs.remove(0); + self.pending_inputs.append(&mut inputs); + Some(first) + } else { + None + } + } else { let state = self.state.read().unwrap(); let current_l1_block = state.current_epoch_num; @@ -119,10 +132,8 @@ where next_epoch }; - Some(Batch { - epoch_num: epoch.number, - epoch_hash: epoch.hash, - parent_hash: safe_head.parent_hash, + Some(BlockInput { + epoch: epoch.number, timestamp: next_timestamp, transactions: Vec::new(), l1_inclusion_block: current_l1_block, @@ -133,14 +144,24 @@ where } else { None } - } else { - derived_batch - }; + }) + } - Ok(batch) + fn filter_inputs(&self, inputs: Vec>) -> Vec> { + inputs + .into_iter() + .filter(|input| input.timestamp > self.state.read().unwrap().safe_head.timestamp) + .collect() } fn batch_status(&self, batch: &Batch) -> BatchStatus { + match batch { + Batch::Single(batch) => self.single_batch_status(batch), + Batch::Span(batch) => self.span_batch_status(batch), + } + } + + fn single_batch_status(&self, batch: &SingleBatch) -> BatchStatus { let state = self.state.read().unwrap(); let epoch = state.safe_epoch; let next_epoch = state.epoch_by_number(epoch.number + 1); @@ -150,7 +171,10 @@ where // check timestamp range match batch.timestamp.cmp(&next_timestamp) { Ordering::Greater => return BatchStatus::Future, - Ordering::Less => return BatchStatus::Drop, + Ordering::Less => { + tracing::warn!("past batch"); + return BatchStatus::Drop; + } Ordering::Equal => (), } @@ -218,12 +242,152 @@ where BatchStatus::Accept } + + fn span_batch_status(&self, batch: &SpanBatch) -> BatchStatus { + let state = self.state.read().unwrap(); + let epoch = state.safe_epoch; + let next_epoch = state.epoch_by_number(epoch.number + 1); + let head = state.safe_head; + let next_timestamp = head.timestamp + self.config.chain.blocktime; + + let start_epoch_num = batch.start_epoch_num(); + let end_epoch_num = batch.l1_origin_num; + let span_start_timestamp = batch.rel_timestamp + self.config.chain.l2_genesis.timestamp; + let span_end_timestamp = + span_start_timestamp + batch.block_count * self.config.chain.blocktime; + + // check batch timestamp + + if span_end_timestamp < next_timestamp { + tracing::warn!("past batch"); + return BatchStatus::Drop; + } + + if span_start_timestamp > next_timestamp { + return BatchStatus::Future; + } + + // check for delta activation + + let batch_origin = if start_epoch_num == epoch.number + 1 { + next_epoch + } else { + Some(epoch) + }; + + if let Some(batch_origin) = batch_origin { + if batch_origin.timestamp < self.config.chain.delta_time { + tracing::warn!("span batch seen before delta start"); + return BatchStatus::Drop; + } + } else { + return BatchStatus::Undecided; + } + + // find previous l2 block + + let prev_timestamp = span_start_timestamp - self.config.chain.blocktime; + let (prev_l2_block, prev_l2_epoch) = + if let Some(block) = state.l2_info_by_timestamp(prev_timestamp) { + block + } else { + tracing::warn!("prev l2 block not found"); + return BatchStatus::Drop; + }; + + // check that block builds on existing chain + + if prev_l2_block.hash.as_bytes()[..20] != batch.parent_check { + tracing::warn!("batch parent check failed"); + return BatchStatus::Drop; + } + + // sequencer window checks + + if start_epoch_num + self.config.chain.seq_window_size < batch.l1_inclusion_block { + tracing::warn!("sequence window check failed"); + return BatchStatus::Drop; + } + + if start_epoch_num > prev_l2_block.number + 1 { + tracing::warn!("invalid start epoch number"); + return BatchStatus::Drop; + } + + if let Some(l1_origin) = state.epoch_by_number(end_epoch_num) { + if batch.l1_origin_check != l1_origin.hash.as_bytes()[..20] { + tracing::warn!("origin check failed"); + return BatchStatus::Drop; + } + } else { + tracing::warn!("origin not found"); + return BatchStatus::Drop; + } + + if start_epoch_num < prev_l2_epoch.number { + tracing::warn!("invalid start epoch number"); + return BatchStatus::Drop; + } + + // check sequencer drift + + let block_inputs = batch.block_inputs(&self.config); + for (i, input) in block_inputs.iter().enumerate() { + let input_epoch = state.epoch_by_number(input.epoch).unwrap(); + let next_epoch = state.epoch_by_number(input.epoch + 1); + + if input.timestamp < input_epoch.timestamp { + tracing::warn!("input timestamp too low"); + return BatchStatus::Drop; + } + + if input.timestamp > input_epoch.timestamp + self.config.chain.max_seq_drift { + if input.transactions.is_empty() { + if !batch.origin_bits[i] { + if let Some(next_epoch) = next_epoch { + if input.timestamp >= next_epoch.timestamp { + return BatchStatus::Drop; + } + } else { + return BatchStatus::Undecided; + } + } + } else { + return BatchStatus::Drop; + } + } + } + + // overlapped block checks + + for input in block_inputs { + if input.timestamp < next_timestamp { + if let Some((_, epoch)) = state.l2_info_by_timestamp(input.timestamp) { + if input.epoch != epoch.number { + tracing::warn!("epoch mismatch in overlapped blocks"); + return BatchStatus::Drop; + } + } else { + tracing::warn!("overlapped l2 block not found"); + return BatchStatus::Drop; + } + } + } + + BatchStatus::Accept + } } -fn decode_batches(channel: &Channel) -> Result> { +fn decode_batches(channel: &Channel, chain_id: u64) -> Result> { let mut channel_data = Vec::new(); - let mut d = Decoder::new(channel.data.as_slice())?; - d.read_to_end(&mut channel_data)?; + let d = Decoder::new(channel.data.as_slice())?; + for b in d.bytes() { + if let Ok(b) = b { + channel_data.push(b); + } else { + break; + } + } let mut batches = Vec::new(); let mut offset = 0; @@ -234,27 +398,51 @@ fn decode_batches(channel: &Channel) -> Result> { let batch_data: Vec = batch_rlp.as_val()?; + let version = batch_data[0]; let batch_content = &batch_data[1..]; - let rlp = Rlp::new(batch_content); - let size = rlp.payload_info()?.total(); - let batch = Batch::decode(&rlp, channel.l1_inclusion_block)?; - batches.push(batch); + match version { + 0 => { + let rlp = Rlp::new(batch_content); + let size = rlp.payload_info()?.total(); - offset += size + batch_info.header_len + 1; + let batch = SingleBatch::decode(&rlp, channel.l1_inclusion_block)?; + batches.push(Batch::Single(batch)); + + offset += size + batch_info.header_len + 1; + } + 1 => { + let batch = SpanBatch::decode(batch_content, channel.l1_inclusion_block, chain_id)?; + batches.push(Batch::Span(batch)); + break; + } + _ => eyre::bail!("invalid batch version"), + }; } Ok(batches) } #[derive(Debug, Clone)] -pub struct Batch { - pub parent_hash: H256, - pub epoch_num: u64, - pub epoch_hash: H256, - pub timestamp: u64, - pub transactions: Vec, - pub l1_inclusion_block: u64, +pub enum Batch { + Single(SingleBatch), + Span(SpanBatch), +} + +impl Batch { + pub fn timestamp(&self, config: &Config) -> u64 { + match self { + Batch::Single(batch) => batch.timestamp, + Batch::Span(batch) => batch.rel_timestamp + config.chain.l2_genesis.timestamp, + } + } + + pub fn as_inputs(&self, config: &Config) -> Vec> { + match self { + Batch::Single(batch) => vec![batch.block_input()], + Batch::Span(batch) => batch.block_inputs(config), + } + } } #[derive(Debug, Clone, PartialEq)] @@ -264,28 +452,3 @@ enum BatchStatus { Undecided, Future, } - -impl Batch { - fn decode(rlp: &Rlp, l1_inclusion_block: u64) -> Result { - let parent_hash = rlp.val_at(0)?; - let epoch_num = rlp.val_at(1)?; - let epoch_hash = rlp.val_at(2)?; - let timestamp = rlp.val_at(3)?; - let transactions = rlp.list_at(4)?; - - Ok(Batch { - parent_hash, - epoch_num, - epoch_hash, - timestamp, - transactions, - l1_inclusion_block, - }) - } - - fn has_invalid_transactions(&self) -> bool { - self.transactions - .iter() - .any(|tx| tx.0.is_empty() || tx.0[0] == 0x7E) - } -} diff --git a/src/derive/stages/block_input.rs b/src/derive/stages/block_input.rs new file mode 100644 index 00000000..403cb3e3 --- /dev/null +++ b/src/derive/stages/block_input.rs @@ -0,0 +1,36 @@ +use std::sync::{Arc, RwLock}; + +use eyre::Result; + +use crate::{ + common::{Epoch, RawTransaction}, + derive::state::State, +}; + +pub trait EpochType {} +impl EpochType for u64 {} +impl EpochType for Epoch {} + +#[derive(Debug)] +pub struct BlockInput { + pub timestamp: u64, + pub epoch: E, + pub transactions: Vec, + pub l1_inclusion_block: u64, +} + +impl BlockInput { + pub fn with_full_epoch(self, state: &Arc>) -> Result> { + let state = state.read().map_err(|_| eyre::eyre!("lock poisoned"))?; + let epoch = state + .epoch_by_number(self.epoch) + .ok_or(eyre::eyre!("epoch not found"))?; + + Ok(BlockInput { + timestamp: self.timestamp, + epoch, + transactions: self.transactions, + l1_inclusion_block: self.l1_inclusion_block, + }) + } +} diff --git a/src/derive/stages/channels.rs b/src/derive/stages/channels.rs index c43bb160..a7dcb75a 100644 --- a/src/derive/stages/channels.rs +++ b/src/derive/stages/channels.rs @@ -142,7 +142,7 @@ where /// An intermediate pending channel #[derive(Debug)] -struct PendingChannel { +pub struct PendingChannel { channel_id: u128, frames: Vec, size: Option, diff --git a/src/derive/stages/mod.rs b/src/derive/stages/mod.rs index aaa29bb1..561ecc37 100644 --- a/src/derive/stages/mod.rs +++ b/src/derive/stages/mod.rs @@ -1,4 +1,7 @@ pub mod attributes; pub mod batcher_transactions; pub mod batches; +mod block_input; pub mod channels; +mod single_batch; +mod span_batch; diff --git a/src/derive/stages/single_batch.rs b/src/derive/stages/single_batch.rs new file mode 100644 index 00000000..b9e2570d --- /dev/null +++ b/src/derive/stages/single_batch.rs @@ -0,0 +1,52 @@ +use ethers::{ + types::H256, + utils::rlp::{DecoderError, Rlp}, +}; + +use crate::common::RawTransaction; + +use super::block_input::BlockInput; + +#[derive(Debug, Clone)] +pub struct SingleBatch { + pub parent_hash: H256, + pub epoch_num: u64, + pub epoch_hash: H256, + pub timestamp: u64, + pub transactions: Vec, + pub l1_inclusion_block: u64, +} + +impl SingleBatch { + pub fn decode(rlp: &Rlp, l1_inclusion_block: u64) -> Result { + let parent_hash = rlp.val_at(0)?; + let epoch_num = rlp.val_at(1)?; + let epoch_hash = rlp.val_at(2)?; + let timestamp = rlp.val_at(3)?; + let transactions = rlp.list_at(4)?; + + Ok(SingleBatch { + parent_hash, + epoch_num, + epoch_hash, + timestamp, + transactions, + l1_inclusion_block, + }) + } + + pub fn has_invalid_transactions(&self) -> bool { + self.transactions + .iter() + .any(|tx| tx.0.is_empty() || tx.0[0] == 0x7E) + } + + pub fn block_input(&self) -> BlockInput { + BlockInput { + timestamp: self.timestamp, + epoch: self.epoch_num, + transactions: self.transactions.clone(), + l1_inclusion_block: self.l1_inclusion_block, + } + } +} diff --git a/src/derive/stages/span_batch.rs b/src/derive/stages/span_batch.rs new file mode 100644 index 00000000..349a6dcf --- /dev/null +++ b/src/derive/stages/span_batch.rs @@ -0,0 +1,492 @@ +use ethers::{ + types::{transaction::eip2930::AccessList, Address, Bytes, U256}, + utils::rlp::{Rlp, RlpStream}, +}; +use eyre::Result; + +use crate::{common::RawTransaction, config::Config}; + +use super::block_input::BlockInput; + +#[derive(Debug, Clone)] +pub struct SpanBatch { + pub rel_timestamp: u64, + pub l1_origin_num: u64, + pub parent_check: [u8; 20], + pub l1_origin_check: [u8; 20], + pub block_count: u64, + pub origin_bits: Vec, + pub block_tx_counts: Vec, + pub transactions: Vec, + pub l1_inclusion_block: u64, +} + +impl SpanBatch { + pub fn decode(data: &[u8], l1_inclusion_block: u64, chain_id: u64) -> Result { + let (rel_timestamp, data) = unsigned_varint::decode::u64(data)?; + let (l1_origin_num, data) = unsigned_varint::decode::u64(data)?; + let (parent_check, data) = take_data(data, 20); + let (l1_origin_check, data) = take_data(data, 20); + let (block_count, data) = unsigned_varint::decode::u64(data)?; + let (origin_bits, data) = decode_bitlist(data, block_count); + let (block_tx_counts, data) = decode_block_tx_counts(data, block_count)?; + + let total_txs = block_tx_counts.iter().sum(); + let (transactions, _) = decode_transactions(chain_id, data, total_txs)?; + + Ok(SpanBatch { + rel_timestamp, + l1_origin_num, + parent_check: parent_check.try_into()?, + l1_origin_check: l1_origin_check.try_into()?, + block_count, + block_tx_counts, + origin_bits, + transactions, + l1_inclusion_block, + }) + } + + pub fn block_inputs(&self, config: &Config) -> Vec> { + let init_epoch_num = self.l1_origin_num + - self + .origin_bits + .iter() + .map(|b| if *b { 1 } else { 0 }) + .sum::(); + + let mut inputs = Vec::new(); + let mut epoch_num = init_epoch_num; + let mut tx_index = 0usize; + for i in 0..self.block_count as usize { + if self.origin_bits[i] { + epoch_num += 1; + } + + let tx_end = self.block_tx_counts[i] as usize; + let transactions = self.transactions[tx_index..tx_index + tx_end].to_vec(); + tx_index += self.block_tx_counts[i] as usize; + + let timestamp = self.rel_timestamp + + config.chain.l2_genesis.timestamp + + i as u64 * config.chain.blocktime; + + let block_input = BlockInput:: { + timestamp, + epoch: epoch_num, + transactions, + l1_inclusion_block: self.l1_inclusion_block, + }; + + inputs.push(block_input); + } + + inputs + } + + pub fn start_epoch_num(&self) -> u64 { + self.l1_origin_num + - self + .origin_bits + .iter() + .map(|b| if *b { 1 } else { 0 }) + .sum::() + + if self.origin_bits[0] { 1 } else { 0 } + } +} + +fn take_data(data: &[u8], length: usize) -> (&[u8], &[u8]) { + (&data[0..length], &data[length..]) +} + +fn decode_bitlist(data: &[u8], len: u64) -> (Vec, &[u8]) { + let mut bitlist = Vec::new(); + + let len_up = (len + 7) / 8; + let (bytes, data) = take_data(data, len_up as usize); + + for byte in bytes.iter().rev() { + for i in 0..8 { + let bit = (byte >> i) & 1 == 1; + bitlist.push(bit); + } + } + + let bitlist = bitlist[..len as usize].to_vec(); + + (bitlist, data) +} + +fn decode_block_tx_counts(data: &[u8], block_count: u64) -> Result<(Vec, &[u8])> { + let mut tx_counts = Vec::new(); + let mut data_ref = data; + for _ in 0..block_count { + let (count, d) = unsigned_varint::decode::u64(data_ref).unwrap(); + data_ref = d; + tx_counts.push(count); + } + + Ok((tx_counts, data_ref)) +} + +fn decode_transactions( + chain_id: u64, + data: &[u8], + tx_count: u64, +) -> Result<(Vec, &[u8])> { + let (contract_creation_bits, data) = decode_bitlist(data, tx_count); + let (y_parity_bits, data) = decode_bitlist(data, tx_count); + let (signatures, data) = decode_signatures(data, tx_count); + + let tos_count = contract_creation_bits.iter().filter(|b| !**b).count() as u64; + let (tos, data) = decode_tos(data, tos_count); + + let (tx_datas, data) = decode_tx_data(data, tx_count); + let (tx_nonces, data) = decode_uvarint_list(data, tx_count); + let (tx_gas_limits, data) = decode_uvarint_list(data, tx_count); + + let legacy_tx_count = tx_datas + .iter() + .filter(|tx| matches!(tx, TxData::Legacy { .. })) + .count() as u64; + + let (protected_bits, data) = decode_bitlist(data, legacy_tx_count); + + let mut txs = Vec::new(); + let mut legacy_i = 0; + let mut tos_i = 0; + + for i in 0..tx_count as usize { + let mut encoder = RlpStream::new(); + encoder.begin_unbounded_list(); + + match &tx_datas[i] { + TxData::Legacy { + value, + gas_price, + data, + } => { + encoder.append(&tx_nonces[i]); + encoder.append(gas_price); + encoder.append(&tx_gas_limits[i]); + + if contract_creation_bits[i] { + encoder.append(&""); + } else { + encoder.append(&tos[tos_i]); + tos_i += 1; + } + + encoder.append(value); + encoder.append(&data.to_vec()); + + let parity = if y_parity_bits[i] { 1 } else { 0 }; + let v = if protected_bits[legacy_i] { + chain_id * 2 + 35 + parity + } else { + 27 + parity + }; + + encoder.append(&v); + encoder.append(&signatures[i].0); + encoder.append(&signatures[i].1); + + encoder.finalize_unbounded_list(); + let raw_tx = RawTransaction(encoder.out().to_vec()); + txs.push(raw_tx); + + legacy_i += 1; + } + TxData::Type1 { + value, + gas_price, + data, + access_list, + } => { + encoder.append(&chain_id); + encoder.append(&tx_nonces[i]); + encoder.append(gas_price); + encoder.append(&tx_gas_limits[i]); + + if contract_creation_bits[i] { + encoder.append(&""); + } else { + encoder.append(&tos[tos_i]); + tos_i += 1; + } + + encoder.append(value); + encoder.append(&data.to_vec()); + encoder.append(access_list); + + let parity = if y_parity_bits[i] { 1u64 } else { 0u64 }; + encoder.append(&parity); + encoder.append(&signatures[i].0); + encoder.append(&signatures[i].1); + + encoder.finalize_unbounded_list(); + let mut raw = encoder.out().to_vec(); + raw.insert(0, 1); + let raw_tx = RawTransaction(raw); + txs.push(raw_tx); + } + TxData::Type2 { + value, + max_fee, + max_priority_fee, + data, + access_list, + } => { + encoder.append(&chain_id); + encoder.append(&tx_nonces[i]); + encoder.append(max_priority_fee); + encoder.append(max_fee); + encoder.append(&tx_gas_limits[i]); + + if contract_creation_bits[i] { + encoder.append(&""); + } else { + encoder.append(&tos[tos_i]); + tos_i += 1; + } + + encoder.append(value); + encoder.append(&data.to_vec()); + encoder.append(access_list); + + let parity = if y_parity_bits[i] { 1u64 } else { 0u64 }; + + encoder.append(&parity); + encoder.append(&signatures[i].0); + encoder.append(&signatures[i].1); + + encoder.finalize_unbounded_list(); + let mut raw = encoder.out().to_vec(); + raw.insert(0, 2); + let raw_tx = RawTransaction(raw); + txs.push(raw_tx); + } + } + } + + Ok((txs, data)) +} + +fn decode_uvarint_list(data: &[u8], count: u64) -> (Vec, &[u8]) { + let mut list = Vec::new(); + let mut data_ref = data; + + for _ in 0..count { + let (nonce, d) = unsigned_varint::decode::u64(data_ref).unwrap(); + data_ref = d; + list.push(nonce); + } + + (list, data_ref) +} + +fn decode_tx_data(data: &[u8], tx_count: u64) -> (Vec, &[u8]) { + let mut data_ref = data; + let mut tx_datas = Vec::new(); + + for _ in 0..tx_count { + let (next, data) = match data_ref[0] { + 1 => { + let rlp = Rlp::new(&data_ref[1..]); + let value = rlp.val_at::(0).unwrap(); + let gas_price = rlp.val_at::(1).unwrap(); + let data = rlp.val_at::>(2).unwrap(); + let access_list = rlp.val_at::(3).unwrap(); + + let next = rlp.payload_info().unwrap().total() + 1; + let data = TxData::Type1 { + value, + gas_price, + data: data.into(), + access_list, + }; + + (next, data) + } + 2 => { + let rlp = Rlp::new(&data_ref[1..]); + let value = rlp.val_at::(0).unwrap(); + let max_priority_fee = rlp.val_at::(1).unwrap(); + let max_fee = rlp.val_at::(2).unwrap(); + let data = rlp.val_at::>(3).unwrap(); + let access_list = rlp.val_at::(4).unwrap(); + + let next = rlp.payload_info().unwrap().total() + 1; + let data = TxData::Type2 { + value, + max_fee, + max_priority_fee, + data: data.into(), + access_list, + }; + + (next, data) + } + _ => { + let rlp = Rlp::new(&data_ref[0..]); + let value = rlp.val_at::(0).unwrap(); + let gas_price = rlp.val_at::(1).unwrap(); + let data = rlp.val_at::>(2).unwrap(); + + let next = rlp.payload_info().unwrap().total(); + let data = TxData::Legacy { + value, + gas_price, + data: data.into(), + }; + + (next, data) + } + }; + + tx_datas.push(data); + data_ref = &data_ref[next..]; + } + + (tx_datas, data_ref) +} + +#[derive(Debug)] +enum TxData { + Legacy { + value: U256, + gas_price: U256, + data: Bytes, + }, + Type1 { + value: U256, + gas_price: U256, + data: Bytes, + access_list: AccessList, + }, + Type2 { + value: U256, + max_fee: U256, + max_priority_fee: U256, + data: Bytes, + access_list: AccessList, + }, +} + +fn decode_tos(data: &[u8], count: u64) -> (Vec
, &[u8]) { + let mut data_ref = data; + let mut tos = Vec::new(); + for _ in 0..count { + let (addr, d) = decode_address(data_ref); + tos.push(addr); + data_ref = d; + } + + (tos, data_ref) +} + +fn decode_address(data: &[u8]) -> (Address, &[u8]) { + let (address_bytes, data) = take_data(data, 20); + let address = Address::from_slice(address_bytes); + (address, data) +} + +fn decode_signatures(data: &[u8], tx_count: u64) -> (Vec<(U256, U256)>, &[u8]) { + let mut sigs = Vec::new(); + let mut data_ref = data; + for _ in 0..tx_count { + let (r, d) = decode_u256(data_ref); + data_ref = d; + + let (s, d) = decode_u256(data_ref); + data_ref = d; + + sigs.push((r, s)); + } + + (sigs, data_ref) +} + +fn decode_u256(data: &[u8]) -> (U256, &[u8]) { + let (bytes, data) = take_data(data, 32); + let value = U256::from_big_endian(bytes); + (value, data) +} + +#[cfg(test)] +mod test { + use std::io::Read; + + use ethers::{ + types::H256, + utils::{keccak256, rlp::Rlp}, + }; + use libflate::zlib::Decoder; + + use crate::{ + config::ChainConfig, + derive::stages::{ + batcher_transactions::BatcherTransaction, + channels::{Channel, PendingChannel}, + span_batch::SpanBatch, + }, + }; + + #[test] + fn test_decode() { + let batcher_tx_data = "00656531d7fca1ad32740ea3adca85922a0000000005dc78dadac9f58b71c9d7edacb77bd6323dd823c8ffeb44c059dee7ffb405f9b68b2feb9a3ef3508cc78be9f9edab1ea8557c09e3b1e83cffc05f2a8445c09141c08145c0914580010e181930012332c588a68c114323238c603cffb8e3e20ecb8f4f0d365a15b4ffe09abf6ddad1b7755a79ac67ff39b7bb9ddf3c67ab929e46cd439bf56c7757a8f67dddd968dbf1fc647b4498f6929c0b75a5f2d5557d491b6293a37343b33f681e2c37ae551763b8fc8c598271c67aed7426ff8e2dd7170a31ffbdfce97bb5d9ed0b1dfb94efcb6eb5efdb1bfb7152f8c4b9ae321c5b73af7f12517f3ec15e6effd5f0ddae251cd7673eb65b5d26a1b1e5e68e4b328587b5e6dd56717fb93d6cb3d5ea07b7ffdc0c0af2f86ab8485c73cd3fef280316fe282d96b4be42fd9df28d562c77edecef9c923fe9f6a069a346c1b7b33e9cc76c3e46dc4bacfc191cd3c8afcbc12e52eeaa7c9127ed6412c70ebee6b52dbc825971322c5eaea9adfb6673a54fddf37696757ff4aafa433f6da3531b23988abba61d3ba7beeecbb40db56935f1e7661d3812798fb95131b69eefe68f25fbf7ee7dd870517a79b4cecf0bb73ac439d5a7b7942c3cdef156ac284f31467ba5e0b39a4d8f569c303bba2c52e1b8f98c0ce91d4a96b33ffcaa985c94b2c06ec781a0c9e9d3bc2670ef1429e09b782fb323d9692607dbe9a30589dbbb6e479efbbe72d62af9f038b605f38ced7d32266f751189ff6a68f2d4b63d94c5f88cf575f7cfbbc3e3fae64b5cdc7d4cadf8ebc24bb2894b657e733d78fb3e6d47dca4bdfc1d264c9d2562dfaff4396cb83cfd94c2dc7766cbd3d218fde61f12e6b9767ed36dc625138d6778f7187a28075597196a6d522f9ac9b8e60a77dc094daf395ec7175c0f63f1326a5f257762b172c517dfbdf6ce7ed7f518129fac14fa77d84140d9e2f92791a34b7e3d7f27a4e82c7c66fbf38589266a16d3a2db4eba4e0d7b646e98fdbdea9af4e3a7739a0acb5c53f65c70c24ca002361a978eee8e5a59adbce3c786730719839d1fce3e894d8c12bdc48a31fd64126c68e6777268e677cedbc9c4a2bf26538a011f60725ecb801f24e097665c40403fe7fefa0f719efb64a6f1b7ca591d5aaa36bfece6cb15dfc37ea65d6cf37fd3b971b6848de6dc1bd7debe378909b2bdd6afc061fd29fa6e59a3935dea85d34213658e093f3a776abee3b523ab2eb933771ee2f0718c8d55ce0fff7e4b4a3395fba9bd8949656292c2a18d5cb97dcfcfccaeba72f6d59b2f824df5f5ca6eff5f1db96e57b14fe370a9b0cca7aeca4e7d4b5b33a9b06496a936455325669e8b489e2c1e5bf5e55666cf0b57070f7585cf35d922eaf6a57f4d583f2e8d8e6cbf31b7f1d3c9d432b377166db5f61bf7695b6ed67cc4f2e58bc4d1a7b39fe79e63f1582adbac7831454fc322c952de71f9d463ff73b86ec5bcd0e5519176645bc29572fa7df1cf49d3df24ea2e10d00b9f1fdd2c3c4b32d0f3e8a6355bf57708142c6ae3e8e0ff97ae2fe0e9f1a09b5b488140f8317dbed5ba6f8acc3e09bb0299aae517394dea2eb96419548530587fbffde1a7c734b7a625d2193a179630bf3634942998f4517fd6c71b0155779c7f7ff9686daf705934ed00d38f9dedfc5a8b58ba2f30b44466e88308831f3b96186d67c845b6e8de5a7488c75550f328040d84141c60faf181bb59e0e45710def1242c523632b128a984814ae088bb4a55457efea747cf9ec61a2a7aaf7f74cc600b012d5c145a49483f37162f2715270f772f6f6ac097342f74698aa7dafab9714c563029fcc0c0a1f6dbc1049769bc0fb66d5e9ec230104933a9b8b86058c7d3ab866681ea0b4b362847edd3ecff7e22df3661dd5a9eb50c6c4e57171c5c67bebef4ec9e87d33bb9773f9e9f701a49a9492dd781dfb5075a6f58cfdb32d3edd0546dbd035167b8c4266d0c083cb22f5479fa8f6eae66c12d293b5a18577c48fd3355d363bdd5ef7cb6acc5fb7630cf3feda55f5678d57b87f786794f055d8eb1c5d23a8c7e08c91cf439e4237bd867c71da69d779876dd61dab794e5e73ef6090bf9272ce46f5fca3161217fcb69c923b7246ecc976407000000ffff01"; + let batcher_transaction = + BatcherTransaction::new(&hex::decode(batcher_tx_data).unwrap(), 10254359).unwrap(); + + let mut pending_channel = PendingChannel::new(batcher_transaction.frames[0].clone()); + + for frame in batcher_transaction.frames[1..].iter() { + pending_channel.push_frame(frame.clone()) + } + + let channel = Channel::from(pending_channel); + let d = Decoder::new(channel.data.as_slice()).unwrap(); + + let mut vec = Vec::new(); + for b in d.bytes() { + if let Ok(b) = b { + vec.push(b); + } else { + break; + } + } + + let raw = vec.as_slice(); + let batch_rlp = Rlp::new(raw); + let batch_data: Vec = batch_rlp.as_val().unwrap(); + + let version = batch_data[0]; + assert_eq!(version, 1); + + let config = crate::config::Config { + chain: ChainConfig::optimism_sepolia(), + ..Default::default() + }; + let batch = SpanBatch::decode(&batch_data[1..], 0, config.chain.l2_chain_id).unwrap(); + + assert_eq!( + batch.transactions.len(), + batch.block_tx_counts.iter().sum::() as usize + ); + + assert_eq!(batch.l1_inclusion_block, 0); + + println!("starting epoch: {}", batch.start_epoch_num()); + + let inputs = batch.block_inputs(&config); + inputs.iter().for_each(|input| { + let block_number = (input.timestamp - config.chain.l2_genesis.timestamp) / 2; + println!("block: {}, epoch: {}", block_number, input.epoch); + input.transactions.iter().for_each(|tx| { + println!("{:?}", H256::from(keccak256(&tx.0))); + }); + }); + + // println!("{:?}", batch.block_inputs(&config)) + } +} diff --git a/src/derive/state.rs b/src/derive/state.rs index eb7041cc..c4ead277 100644 --- a/src/derive/state.rs +++ b/src/derive/state.rs @@ -1,16 +1,21 @@ use std::{collections::BTreeMap, sync::Arc}; -use ethers::types::H256; +use ethers::{ + providers::{Http, Middleware, Provider}, + types::H256, +}; use crate::{ common::{BlockInfo, Epoch}, config::Config, + driver::HeadInfo, l1::L1Info, }; pub struct State { l1_info: BTreeMap, l1_hashes: BTreeMap, + l2_refs: BTreeMap, pub safe_head: BlockInfo, pub safe_epoch: Epoch, pub current_epoch_num: u64, @@ -18,10 +23,18 @@ pub struct State { } impl State { - pub fn new(finalized_head: BlockInfo, finalized_epoch: Epoch, config: Arc) -> Self { + pub async fn new( + finalized_head: BlockInfo, + finalized_epoch: Epoch, + provider: &Provider, + config: Arc, + ) -> Self { + let l2_refs = l2_refs(finalized_head.number, provider, &config).await; + Self { l1_info: BTreeMap::new(), l1_hashes: BTreeMap::new(), + l2_refs, safe_head: finalized_head, safe_epoch: finalized_epoch, current_epoch_num: 0, @@ -39,6 +52,14 @@ impl State { .and_then(|hash| self.l1_info.get(hash)) } + pub fn l2_info_by_timestamp(&self, timestmap: u64) -> Option<&(BlockInfo, Epoch)> { + let block_num = (timestmap - self.config.chain.l2_genesis.timestamp) + / self.config.chain.blocktime + + self.config.chain.l2_genesis.number; + + self.l2_refs.get(&block_num) + } + pub fn epoch_by_hash(&self, hash: H256) -> Option { self.l1_info_by_hash(hash).map(|info| Epoch { number: info.block_info.number, @@ -70,13 +91,15 @@ impl State { self.l1_info.clear(); self.l1_hashes.clear(); - self.safe_head = safe_head; - self.safe_epoch = safe_epoch; + self.update_safe_head(safe_head, safe_epoch); } pub fn update_safe_head(&mut self, safe_head: BlockInfo, safe_epoch: Epoch) { self.safe_head = safe_head; self.safe_epoch = safe_epoch; + + self.l2_refs + .insert(self.safe_head.number, (self.safe_head, self.safe_epoch)); } fn prune(&mut self) { @@ -93,5 +116,42 @@ impl State { self.l1_info.remove(block_hash); self.l1_hashes.pop_first(); } + + let prune_until = + self.safe_head.number - self.config.chain.max_seq_drift / self.config.chain.blocktime; + + while let Some((num, _)) = self.l2_refs.first_key_value() { + if *num >= prune_until { + break; + } + + self.l2_refs.pop_first(); + } + } +} + +async fn l2_refs( + head_num: u64, + provider: &Provider, + config: &Config, +) -> BTreeMap { + let lookback = config.chain.max_seq_drift / config.chain.blocktime; + let start = head_num + .saturating_sub(lookback) + .max(config.chain.l2_genesis.number); + + let mut refs = BTreeMap::new(); + for i in start..=head_num { + let block = provider.get_block_with_txs(i).await; + if let Ok(Some(block)) = block { + if let Ok(head_info) = HeadInfo::try_from(block) { + refs.insert( + head_info.l2_block_info.number, + (head_info.l2_block_info, head_info.l1_epoch), + ); + } + } } + + refs } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 4553d036..a405ad55 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -88,11 +88,8 @@ impl Driver { let chain_watcher = ChainWatcher::new(l1_start_block, finalized_head.number, config.clone())?; - let state = Arc::new(RwLock::new(State::new( - finalized_head, - finalized_epoch, - config.clone(), - ))); + let state = State::new(finalized_head, finalized_epoch, &provider, config.clone()).await; + let state = Arc::new(RwLock::new(state)); let engine_driver = EngineDriver::new(finalized_head, finalized_epoch, provider, &config)?; let pipeline = Pipeline::new(state.clone(), config.clone(), finalized_seq)?; diff --git a/src/network/service/mod.rs b/src/network/service/mod.rs index 68fdaf2a..a0859539 100644 --- a/src/network/service/mod.rs +++ b/src/network/service/mod.rs @@ -174,6 +174,7 @@ impl Behaviour { } enum Event { + #[allow(dead_code)] Ping(ping::Event), Gossipsub(gossipsub::Event), }