Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(client, host): redundant state caching #59

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions crates/executor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use reth_evm_optimism::OpExecutorProvider;
use reth_execution_types::ExecutionOutcome;
use reth_optimism_consensus::validate_block_post_execution as validate_block_post_execution_optimism;
use reth_primitives::{proofs, Block, BlockWithSenders, Bloom, Header, Receipt, Receipts, Request};
use revm::{db::CacheDB, Database};
use revm::{db::WrapDatabaseRef, Database};
use revm_primitives::{address, U256};

/// Chain ID for Ethereum Mainnet.
Expand All @@ -43,7 +43,7 @@ pub trait Variant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>;
Expand Down Expand Up @@ -101,7 +101,6 @@ impl ClientExecutor {
{
// Initialize the witnessed database with verified storage proofs.
let witness_db = input.witness_db()?;
let cache_db = CacheDB::new(&witness_db);

// Execute the block.
let spec = V::spec();
Expand All @@ -113,9 +112,8 @@ impl ClientExecutor {
.ok_or(eyre!("failed to recover senders"))
})?;
let executor_difficulty = input.current_block.header.difficulty;
let executor_output = profile!("execute", {
V::execute(&executor_block_input, executor_difficulty, cache_db)
})?;
let executor_output =
profile!("execute", { V::execute(&executor_block_input, executor_difficulty, witness_db) })?;

// Validate the block post execution.
profile!("validate block post-execution", {
Expand Down Expand Up @@ -165,7 +163,7 @@ impl ClientExecutor {
header.withdrawals_root = input
.current_block
.withdrawals
.clone()
.take()
.map(|w| proofs::calculate_withdrawals_root(w.into_inner().as_slice()));
header.logs_bloom = logs_bloom;
header.requests_root =
Expand All @@ -183,7 +181,7 @@ impl Variant for EthereumVariant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>,
Expand All @@ -192,7 +190,7 @@ impl Variant for EthereumVariant {
Self::spec().into(),
CustomEvmConfig::from_variant(ChainVariant::Ethereum),
)
.executor(cache_db)
.executor(db)
.execute((executor_block_input, executor_difficulty).into())?)
}

Expand All @@ -214,7 +212,7 @@ impl Variant for OptimismVariant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>,
Expand All @@ -223,7 +221,7 @@ impl Variant for OptimismVariant {
Self::spec().into(),
CustomEvmConfig::from_variant(ChainVariant::Optimism),
)
.executor(cache_db)
.executor(db)
.execute((executor_block_input, executor_difficulty).into())?)
}

Expand All @@ -245,7 +243,7 @@ impl Variant for LineaVariant {
fn execute<DB>(
executor_block_input: &BlockWithSenders,
executor_difficulty: U256,
cache_db: DB,
db: DB,
) -> eyre::Result<BlockExecutionOutput<Receipt>>
where
DB: Database<Error: Into<ProviderError> + Display>,
Expand All @@ -254,7 +252,7 @@ impl Variant for LineaVariant {
Self::spec().into(),
CustomEvmConfig::from_variant(ChainVariant::Linea),
)
.executor(cache_db)
.executor(db)
.execute((executor_block_input, executor_difficulty).into())?)
}

Expand Down
12 changes: 5 additions & 7 deletions crates/executor/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use alloy_transport::Transport;
use eyre::{eyre, Ok};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{proofs, Block, Bloom, Receipts, B256};
use revm::db::CacheDB;
use rsp_client_executor::{
io::ClientExecutorInput, ChainVariant, EthereumVariant, LineaVariant, OptimismVariant, Variant,
};
Expand All @@ -15,14 +14,14 @@ use rsp_rpc_db::RpcDb;

/// An executor that fetches data from a [Provider] to execute blocks in the [ClientExecutor].
#[derive(Debug, Clone)]
pub struct HostExecutor<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> {
pub struct HostExecutor<T: Transport + Clone, P: Provider<T, AnyNetwork>> {
/// The provider which fetches data.
pub provider: P,
/// A phantom type to make the struct generic over the transport.
pub phantom: PhantomData<T>,
}

impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P> {
impl<T: Transport + Clone, P: Provider<T, AnyNetwork>> HostExecutor<T, P> {
/// Create a new [`HostExecutor`] with a specific [Provider] and [Transport].
pub fn new(provider: P) -> Self {
Self { provider, phantom: PhantomData }
Expand Down Expand Up @@ -68,8 +67,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P

// Setup the database for the block executor.
tracing::info!("setting up the database for the block executor");
let rpc_db = RpcDb::new(self.provider.clone(), block_number - 1);
let cache_db = CacheDB::new(&rpc_db);
let mut rpc_db = RpcDb::new(&self.provider, block_number - 1);

// Execute the block and fetch all the necessary data along the way.
tracing::info!(
Expand All @@ -82,7 +80,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P
.with_recovered_senders()
.ok_or(eyre!("failed to recover senders"))?;
let executor_difficulty = current_block.header.difficulty;
let executor_output = V::execute(&executor_block_input, executor_difficulty, cache_db)?;
let executor_output = V::execute(&executor_block_input, executor_difficulty, &mut rpc_db)?;

// Validate the block post execution.
tracing::info!("validating the block post execution");
Expand Down Expand Up @@ -196,7 +194,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> HostExecutor<T, P
);

// Fetch the parent headers needed to constrain the BLOCKHASH opcode.
let oldest_ancestor = *rpc_db.oldest_ancestor.borrow();
let oldest_ancestor = rpc_db.oldest_ancestor;
let mut ancestor_headers = vec![];
tracing::info!("fetching {} ancestor headers", block_number - oldest_ancestor);
for height in (oldest_ancestor..=(block_number - 1)).rev() {
Expand Down
9 changes: 7 additions & 2 deletions crates/executor/host/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ where
.try_init();

// Setup the provider.
let rpc_url =
Url::parse(std::env::var(env_var_key).unwrap().as_str()).expect("invalid rpc url");
let rpc_url = Url::parse(
std::env::var(env_var_key)
.unwrap_or_else(|_| panic!("Env var {} to be set", env_var_key))
.as_str(),
)
.expect("invalid rpc url");

let provider = ReqwestProvider::new_http(rpc_url);

// Setup the host executor.
Expand Down
87 changes: 56 additions & 31 deletions crates/storage/rpc-db/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
cell::RefCell,
collections::{BTreeMap, BTreeSet},
marker::PhantomData,
};
Expand All @@ -11,23 +10,28 @@ use reth_primitives::{
revm_primitives::{AccountInfo, Bytecode},
Address, B256, U256,
};
use reth_revm::DatabaseRef;
use reth_revm::Database;
use reth_storage_errors::{db::DatabaseError, provider::ProviderError};
use revm_primitives::HashMap;

/// A database that fetches data from a [Provider] over a [Transport].
///
/// This type is very similar to a CacheDb as exposed by revm
/// in the sense that it will cache any RPC responses in memory.
#[derive(Debug, Clone)]
pub struct RpcDb<T, P> {
/// The provider which fetches data.
pub provider: P,
/// The block to fetch data from.
pub block: BlockId,
/// The cached accounts.
pub accounts: RefCell<HashMap<Address, AccountInfo>>,
pub accounts: HashMap<Address, AccountInfo>,
/// The cached storage values.
pub storage: RefCell<HashMap<Address, HashMap<U256, U256>>>,
pub storage: HashMap<Address, HashMap<U256, U256>>,
/// The cached block hashes
pub block_hash_by_number: HashMap<u64, B256>,
/// The oldest block whose header/hash has been requested.
pub oldest_ancestor: RefCell<u64>,
pub oldest_ancestor: u64,
/// A phantom type to make the struct generic over the transport.
pub _phantom: PhantomData<T>,
}
Expand All @@ -43,23 +47,35 @@ pub enum RpcDbError {
PreimageNotFound,
}

impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
impl<T, P> RpcDb<T, P>
where
T: Transport + Clone,
P: Provider<T, AnyNetwork>,
{
/// Create a new [`RpcDb`].
pub fn new(provider: P, block: u64) -> Self {
RpcDb {
provider,
block: block.into(),
accounts: RefCell::new(HashMap::new()),
storage: RefCell::new(HashMap::new()),
oldest_ancestor: RefCell::new(block),
accounts: HashMap::new(),
storage: HashMap::new(),
block_hash_by_number: HashMap::new(),
oldest_ancestor: block,
_phantom: PhantomData,
}
}

/// Fetch the [AccountInfo] for an [Address].
pub async fn fetch_account_info(&self, address: Address) -> Result<AccountInfo, RpcDbError> {
pub async fn fetch_account_info(
&mut self,
address: Address,
) -> Result<AccountInfo, RpcDbError> {
tracing::info!("fetching account info for address: {}", address);

if let Some(account_info) = self.accounts.get(&address) {
return Ok(account_info.clone());
}

// Fetch the proof for the account.
let proof = self
.provider
Expand All @@ -86,19 +102,25 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
};

// Record the account info to the state.
self.accounts.borrow_mut().insert(address, account_info.clone());
self.accounts.insert(address, account_info.clone());

Ok(account_info)
}

/// Fetch the storage value at an [Address] and [U256] index.
pub async fn fetch_storage_at(
&self,
&mut self,
address: Address,
index: U256,
) -> Result<U256, RpcDbError> {
tracing::info!("fetching storage value at address: {}, index: {}", address, index);

if let Some(account) = self.storage.get(&address) {
if let Some(value) = account.get(&index) {
return Ok(*value);
}
}

// Fetch the storage value.
let value = self
.provider
Expand All @@ -108,17 +130,20 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
.map_err(|e| RpcDbError::RpcError(e.to_string()))?;

// Record the storage value to the state.
let mut storage_values = self.storage.borrow_mut();
let entry = storage_values.entry(address).or_default();
let entry = self.storage.entry(address).or_default();
entry.insert(index, value);

Ok(value)
}

/// Fetch the block hash for a block number.
pub async fn fetch_block_hash(&self, number: u64) -> Result<B256, RpcDbError> {
pub async fn fetch_block_hash(&mut self, number: u64) -> Result<B256, RpcDbError> {
tracing::info!("fetching block hash for block number: {}", number);

if let Some(hash) = self.block_hash_by_number.get(&number) {
return Ok(*hash);
}

// Fetch the block.
let block = self
.provider
Expand All @@ -130,22 +155,20 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
let block = block.ok_or(RpcDbError::BlockNotFound)?;
let hash = block.header.hash;

let mut oldest_ancestor = self.oldest_ancestor.borrow_mut();
*oldest_ancestor = number.min(*oldest_ancestor);
self.block_hash_by_number.insert(number, hash);
self.oldest_ancestor = number.min(self.oldest_ancestor);

Ok(hash)
}

/// Gets all the state keys used. The client uses this to read the actual state data from tries.
pub fn get_state_requests(&self) -> HashMap<Address, Vec<U256>> {
let accounts = self.accounts.borrow();
let storage = self.storage.borrow();

accounts
self.accounts
.keys()
.chain(storage.keys())
.chain(self.storage.keys())
.map(|&address| {
let storage_keys_for_address: BTreeSet<U256> = storage
let storage_keys_for_address: BTreeSet<U256> = self
.storage
.get(&address)
.map(|storage_map| storage_map.keys().cloned().collect())
.unwrap_or_default();
Expand All @@ -157,9 +180,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {

/// Gets all account bytecodes.
pub fn get_bytecodes(&self) -> Vec<Bytecode> {
let accounts = self.accounts.borrow();

accounts
self.accounts
.values()
.flat_map(|account| account.code.clone())
.map(|code| (code.hash_slow(), code))
Expand All @@ -169,10 +190,14 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> RpcDb<T, P> {
}
}

impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> DatabaseRef for RpcDb<T, P> {
impl<T, P> Database for RpcDb<T, P>
where
T: Transport + Clone,
P: Provider<T, AnyNetwork>,
{
type Error = ProviderError;

fn basic_ref(&self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
fn basic(&mut self, address: Address) -> Result<Option<AccountInfo>, Self::Error> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
ProviderError::Database(DatabaseError::Other("no tokio runtime found".to_string()))
})?;
Expand All @@ -183,11 +208,11 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> DatabaseRef for R
Ok(Some(account_info))
}

fn code_by_hash_ref(&self, _code_hash: B256) -> Result<Bytecode, Self::Error> {
fn code_by_hash(&mut self, _code_hash: B256) -> Result<Bytecode, Self::Error> {
unimplemented!()
}

fn storage_ref(&self, address: Address, index: U256) -> Result<U256, Self::Error> {
fn storage(&mut self, address: Address, index: U256) -> Result<U256, Self::Error> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
ProviderError::Database(DatabaseError::Other("no tokio runtime found".to_string()))
})?;
Expand All @@ -198,7 +223,7 @@ impl<T: Transport + Clone, P: Provider<T, AnyNetwork> + Clone> DatabaseRef for R
Ok(value)
}

fn block_hash_ref(&self, number: u64) -> Result<B256, Self::Error> {
fn block_hash(&mut self, number: u64) -> Result<B256, Self::Error> {
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
ProviderError::Database(DatabaseError::Other("no tokio runtime found".to_string()))
})?;
Expand Down
Loading
Loading