diff --git a/Cargo.lock b/Cargo.lock index 530db77742..2bf83813a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8035,7 +8035,6 @@ dependencies = [ "katana-core", "katana-executor", "katana-primitives", - "katana-provider", "rstest 0.18.2", "serde", "serde_json", diff --git a/crates/katana/primitives/src/event.rs b/crates/katana/primitives/src/event.rs index 4fbbc470e9..3767f30d1c 100644 --- a/crates/katana/primitives/src/event.rs +++ b/crates/katana/primitives/src/event.rs @@ -1,6 +1,10 @@ use core::fmt; use std::num::ParseIntError; +pub type ChunkSize = u64; +pub type EventContinuationToken = Option; +pub type EventFilter = starknet::core::types::EventFilter; +pub type EventsPage = starknet::core::types::EventsPage; use crate::FieldElement; #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/katana/primitives/src/transaction.rs b/crates/katana/primitives/src/transaction.rs index e4a4d0537d..81659356bf 100644 --- a/crates/katana/primitives/src/transaction.rs +++ b/crates/katana/primitives/src/transaction.rs @@ -16,6 +16,7 @@ use crate::{utils, FieldElement}; pub type TxHash = FieldElement; /// The sequential number for all the transactions. pub type TxNumber = u64; +pub type Transaction = starknet::core::types::Transaction; #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] diff --git a/crates/katana/rpc/rpc-types/Cargo.toml b/crates/katana/rpc/rpc-types/Cargo.toml index bc351aa0fc..82aba9df66 100644 --- a/crates/katana/rpc/rpc-types/Cargo.toml +++ b/crates/katana/rpc/rpc-types/Cargo.toml @@ -10,7 +10,6 @@ version.workspace = true katana-core.workspace = true katana-executor.workspace = true katana-primitives.workspace = true -katana-provider.workspace = true anyhow.workspace = true derive_more.workspace = true diff --git a/crates/katana/storage/provider/Cargo.toml b/crates/katana/storage/provider/Cargo.toml index b9c0379152..4f1f5f4e41 100644 --- a/crates/katana/storage/provider/Cargo.toml +++ b/crates/katana/storage/provider/Cargo.toml @@ -9,7 +9,6 @@ version.workspace = true [dependencies] katana-db = { workspace = true, features = [ "test-utils" ] } katana-primitives = { workspace = true, features = [ "rpc" ] } - anyhow.workspace = true auto_impl.workspace = true parking_lot.workspace = true diff --git a/crates/katana/storage/provider/src/providers/fork/backend.rs b/crates/katana/storage/provider/src/providers/fork/backend.rs index 5d3bfd9f6e..a9fa8fffbb 100644 --- a/crates/katana/storage/provider/src/providers/fork/backend.rs +++ b/crates/katana/storage/provider/src/providers/fork/backend.rs @@ -11,16 +11,18 @@ use futures::channel::mpsc::{channel as async_channel, Receiver, SendError, Send use futures::future::BoxFuture; use futures::stream::Stream; use futures::{Future, FutureExt}; -use katana_primitives::block::BlockHashOrNumber; +use katana_primitives::block::{BlockHashOrNumber, BlockIdOrTag}; use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass}; use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue}; use katana_primitives::conversion::rpc::{ compiled_class_hash_from_flattened_sierra_class, flattened_sierra_to_compiled_class, legacy_rpc_to_compiled_class, }; +use katana_primitives::event::{ChunkSize, EventContinuationToken, EventFilter, EventsPage}; +use katana_primitives::transaction::{Transaction, TxHash, TxNumber}; use katana_primitives::FieldElement; use parking_lot::Mutex; -use starknet::core::types::{BlockId, ContractClass as RpcContractClass, StarknetError}; +use starknet::core::types::{BlockId, ContractClass as RpcContractClass, MaybePendingBlockWithTxHashes, MaybePendingBlockWithTxs, MaybePendingTransactionReceipt, StarknetError}; use starknet::providers::{Provider, ProviderError as StarknetProviderError}; use tracing::{error, info, trace}; @@ -38,6 +40,11 @@ type GetNonceResult = BackendResult; type GetStorageResult = BackendResult; type GetClassHashAtResult = BackendResult; type GetClassAtResult = BackendResult; +type GetEventResult = BackendResult; +type GetBlockWithTxsResult = BackendResult; +type GetBlockWithTxHashesResult = BackendResult; +type GetTransactionResult = BackendResult; +type GetTransactionReceiptResult = BackendResult; #[derive(Debug, thiserror::Error)] pub enum BackendError { @@ -67,6 +74,12 @@ enum BackendRequest { Class(Request), ClassHash(Request), Storage(Request<(ContractAddress, StorageKey), StorageValue>), + Events(Request<(EventFilter, EventContinuationToken, ChunkSize), EventsPage>), + BlockWithTxHash(Request), + BlockWithTxs(Request), + TransactionByBlockIdAndIndex(Request<(BlockIdOrTag, TxNumber), Transaction>), + TransactionByHash(Request), + TransactionReceipt(Request), // Test-only request kind for requesting the backend stats #[cfg(test)] Stats(OneshotSender), @@ -107,6 +120,44 @@ impl BackendRequest { let (sender, receiver) = oneshot(); (BackendRequest::Stats(sender), receiver) } + + /// Create a new request for fetching events. + fn events(filter: EventFilter, continuation_token: EventContinuationToken, chunk_size: ChunkSize) -> (BackendRequest, OneshotReceiver) { + let (sender, receiver) = oneshot(); + (BackendRequest::Events(Request { payload: (filter, continuation_token, chunk_size), sender }), receiver) + } + + /// Create a new request for fetching blocks with transaction hash. + fn block_with_tx_hashes(block_id: BlockIdOrTag) -> (BackendRequest, OneshotReceiver) { + let (sender, receiver) = oneshot(); + (BackendRequest::BlockWithTxHash(Request { payload: block_id, sender }), receiver) + } + + /// Create a new request for fetching blocks. + fn block_with_txs(block_id: BlockIdOrTag) -> (BackendRequest, OneshotReceiver) { + let (sender, receiver) = oneshot(); + (BackendRequest::BlockWithTxs(Request { payload: block_id, sender }), receiver) + } + + /// Create a new request for fetching transaction with block id and index. + fn transaction_by_block_id_and_index(block_id: BlockIdOrTag, tx_number: TxNumber) -> (BackendRequest, OneshotReceiver>) { + let (sender, receiver) = oneshot(); + let payload = (block_id, tx_number); + (BackendRequest::TransactionByBlockIdAndIndex(Request { payload, sender }), receiver) + } + + /// Create a new request for fetching transaction with transaction hash + fn transaction_by_hash(tx_hash: TxHash) -> (BackendRequest, OneshotReceiver>) { + let (sender, receiver) = oneshot(); + (BackendRequest::TransactionByHash(Request { payload: tx_hash, sender }), receiver) + } + + /// Create a new request for fetching transaction receipt with transaction hash + fn transaction_receipt(tx_hash: TxHash) -> (BackendRequest, OneshotReceiver>) { + let (sender, receiver) = oneshot(); + (BackendRequest::TransactionReceipt(Request { payload: tx_hash, sender }), receiver) + } + } type BackendRequestFuture = BoxFuture<'static, ()>; @@ -234,11 +285,91 @@ where self.pending_requests.push(fut); } + #[cfg(test)] BackendRequest::Stats(sender) => { let total_ongoing_request = self.pending_requests.len(); sender.send(total_ongoing_request).expect("failed to send backend stats"); } + + BackendRequest::Events(Request { payload: (filter, continuation_token, chunks_size), sender }) => { + let fut = Box::pin(async move { + let res = provider + .get_events(filter, continuation_token, chunks_size) + .await + .map_err(BackendError::StarknetProvider); + + sender.send(res).expect("failed to send events result"); + }); + + self.pending_requests.push(fut); + } + + BackendRequest::BlockWithTxHash(Request{payload: block_id, sender}) => { + let fut = Box::pin(async move { + let res = provider + .get_block_with_tx_hashes(block_id) + .await + .map_err(BackendError::StarknetProvider); + + sender.send(res).expect("failed to send block result") + }); + + self.pending_requests.push(fut); + } + + BackendRequest::BlockWithTxs(Request{payload: block_id, sender}) => { + let fut = Box::pin(async move { + let res = provider + .get_block_with_txs(block_id) + .await + .map_err(BackendError::StarknetProvider); + + sender.send(res).expect("failed to send block result") + }); + + self.pending_requests.push(fut); + } + + BackendRequest::TransactionByBlockIdAndIndex(Request{payload: (block_id, index), sender}) => { + let fut = Box::pin(async move { + let res = provider + .get_transaction_by_block_id_and_index(block_id, index) + .await + .map_err(BackendError::StarknetProvider); + + sender.send(res).expect("failed to send transaction result") + }); + + self.pending_requests.push(fut); + } + + BackendRequest::TransactionByHash(Request { payload: transaction_hash, sender }) => { + let fut = Box::pin(async move { + let res = provider + .get_transaction_by_hash(transaction_hash) + .await + .map_err(BackendError::StarknetProvider); + + sender.send(res).expect("failed to send transaction result") + }); + + self.pending_requests.push(fut); + } + + BackendRequest::TransactionReceipt(Request { payload: transaction_hash, sender }) => { + let fut = Box::pin(async move { + let res = provider + .get_transaction_receipt(transaction_hash) + .await + .map_err(BackendError::StarknetProvider); + + sender.send(res).expect("failed to send transaction result") + }); + + self.pending_requests.push(fut); + + } } } } @@ -314,6 +445,7 @@ impl BackendHandle { &self, address: ContractAddress, key: StorageKey, + ) -> Result { trace!(target: LOG_TARGET, %address, key = %format!("{key:#x}"), "Requesting contract storage."); let (req, rx) = BackendRequest::storage(address, key); @@ -352,6 +484,7 @@ impl BackendHandle { } } + /// Send a request to the backend thread. fn request(&self, req: BackendRequest) -> Result<(), BackendError> { self.0.lock().try_send(req).map_err(|e| e.into_send_error())?; @@ -364,6 +497,70 @@ impl BackendHandle { self.request(req)?; Ok(rx.recv()?) } + + pub fn do_get_events( + &self, + filter: EventFilter, + continuation_token: Option, + chunks_size: ChunkSize, + ) -> Result { + trace!(target: LOG_TARGET, events = %format!("{filter:#?}, {continuation_token:#?}, {chunks_size:#?}"), "Requesting events."); + let (req, rx) = BackendRequest::events(filter, continuation_token, chunks_size); + self.request(req)?; + rx.recv()? + } + + pub fn do_get_block_with_tx_hashes( + &self, + block_id: BlockIdOrTag, + ) -> Result { + trace!(target: LOG_TARGET, block_id = %format!("{block_id:#?}"), "Requesting blocks with tx hash."); + let (req, rx) = BackendRequest::block_with_tx_hashes(block_id); + self.request(req)?; + rx.recv()? + } + + pub fn do_get_block_with_txs( + &self, + block_id: BlockIdOrTag, + ) -> Result { + trace!(target: LOG_TARGET, block_id = %format!("{block_id:#?}"), "Requesting blocks with txs"); + let (req, rx) = BackendRequest::block_with_txs(block_id); + self.request(req)?; + rx.recv()? + } + + pub fn do_get_transaction_by_block_id_and_index( + &self, + block_id: BlockIdOrTag, + index: TxNumber, + ) -> Result { + trace!(target: LOG_TARGET, block_id = %format!("{block_id:#?}"), index = %format!("{index:#?}"), "Requesting transaction with block_id and index"); + let (req, rx) = BackendRequest::transaction_by_block_id_and_index(block_id, index); + self.request(req)?; + rx.recv()? + } + + pub fn do_get_transaction_by_hash( + &self, + transaction_hash: TxHash, + ) -> Result { + trace!(target: LOG_TARGET, transaction_hash = %format!("{transaction_hash:#?}"), "Requesting transaction with trasanction hash"); + let (req, rx) = BackendRequest::transaction_by_hash(transaction_hash); + self.request(req)?; + rx.recv()? + } + + pub fn do_get_transaction_receipt( + &self, + transaction_hash: TxHash, + ) -> Result { + trace!(target: LOG_TARGET, transaction_hash = %format!("{transaction_hash:#?}"), "Requesting transaction receipt with trasanction hash"); + let (req, rx) = BackendRequest::transaction_receipt(transaction_hash); + self.request(req)?; + rx.recv()? + + } } /// A shared cache that stores data fetched from the forked network. @@ -409,10 +606,12 @@ impl StateProvider for SharedStateProvider { return Ok(nonce); } + if let Some(nonce) = handle_not_found_err(self.0.get_nonce(address)).map_err(|error| { error!(target: LOG_TARGET, %address, %error, "Fetching nonce."); error })? { + self.0.contract_state.write().entry(address).or_default().nonce = nonce; Ok(Some(nonce)) } else { @@ -430,7 +629,6 @@ impl StateProvider for SharedStateProvider { { return Ok(value.copied()); } - let value = handle_not_found_err(self.0.get_storage(address, storage_key)).map_err(|error| { error!(target: LOG_TARGET, %address, storage_key = %format!("{storage_key:#x}"), %error, "Fetching storage value."); @@ -599,6 +797,7 @@ fn handle_not_found_err(result: Result) -> Result, } #[cfg(test)] + pub(crate) mod test_utils { use std::sync::mpsc::sync_channel; @@ -645,6 +844,7 @@ mod tests { use std::time::Duration; + use katana_primitives::contract::GenericContractInfo; use starknet::macros::felt; @@ -788,4 +988,61 @@ mod tests { assert_eq!(class_hash, class_hash_in_cache, "value must be stored in cache"); assert_eq!(storage_value, storage_value_in_cache, "value must be stored in cache"); } + + #[test] + fn fetch_non_state_data_from_fork() { + let (backend, _) = create_forked_backend(LOCAL_RPC_URL.into(), 1); + + assert!( + backend + .do_get_events( + EventFilter { + from_block: Some(starknet::core::types::BlockId::Tag(BlockTag::Latest)), + to_block: Some(starknet::core::types::BlockId::Tag(BlockTag::Latest)), + address: None, + keys: None, + }, + None, + 10, + ) + .is_ok() + ); + + assert!( + backend.do_get_block_with_tx_hashes(starknet::core::types::BlockId::Number(0)).is_ok() + ); + + assert!(backend.do_get_block_with_txs(starknet::core::types::BlockId::Number(0)).is_ok()); + + assert!( + backend + .do_get_transaction_by_block_id_and_index( + starknet::core::types::BlockId::Number(0), + 1 + ) + .is_ok() + ); + + assert!( + backend + .do_get_transaction_by_hash( + FieldElement::from_hex_be( + "0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf", + ) + .unwrap() + ) + .is_err() + ); + + assert!( + backend + .do_get_transaction_receipt( + FieldElement::from_hex_be( + "0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf", + ) + .unwrap() + ) + .is_err() + ); + } }