Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(katana): forked provider fetch remote non-state data #1700

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions crates/katana/primitives/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use core::fmt;
use std::num::ParseIntError;

pub type ChunkSize = u64;
pub type EventContinuationToken = Option<String>;
pub type EventFilter = starknet::core::types::EventFilter;
pub type EventsPage = starknet::core::types::EventsPage;
use crate::FieldElement;

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions crates/katana/primitives/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{utils, FieldElement};
pub type TxHash = FieldElement;
/// The sequential number for all the transactions.
pub type TxNumber = u64;
pub type Transaction = starknet::core::types::Transaction;

#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down
239 changes: 235 additions & 4 deletions crates/katana/storage/provider/src/providers/fork/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::mpsc::{channel, Receiver, SendError, Sender};
use futures::future::BoxFuture;
use futures::stream::Stream;
use futures::{Future, FutureExt};
use katana_primitives::block::BlockHashOrNumber;
use katana_primitives::block::{BlockHashOrNumber, BlockIdOrTag};
use katana_primitives::class::{ClassHash, CompiledClass, CompiledClassHash, FlattenedSierraClass};
use katana_primitives::contract::{
ContractAddress, GenericContractInfo, Nonce, StorageKey, StorageValue,
Expand All @@ -18,6 +18,8 @@ use katana_primitives::conversion::rpc::{
compiled_class_hash_from_flattened_sierra_class, flattened_sierra_to_compiled_class,
legacy_rpc_to_compiled_class,
};
use katana_primitives::event::{ChunkSize, EventContinuationToken, EventFilter, EventsPage};
use katana_primitives::transaction::{Transaction, TxHash, TxNumber};
use katana_primitives::FieldElement;
use parking_lot::Mutex;
use starknet::core::types::{BlockId, ContractClass, StarknetError};
Expand All @@ -35,6 +37,14 @@ type GetNonceResult = Result<Nonce, ForkedBackendError>;
type GetStorageResult = Result<StorageValue, ForkedBackendError>;
type GetClassHashAtResult = Result<ClassHash, ForkedBackendError>;
type GetClassAtResult = Result<starknet::core::types::ContractClass, ForkedBackendError>;
type GetEventResult = Result<EventsPage, ForkedBackendError>;
type GetBlockWithTxHashesResult =
Result<starknet::core::types::MaybePendingBlockWithTxHashes, ForkedBackendError>;
type GetBlockWithTxsResult =
Result<starknet::core::types::MaybePendingBlockWithTxs, ForkedBackendError>;
type GetTransactionResult = Result<Transaction, ForkedBackendError>;
type GetTransactionReceiptResult =
Result<starknet::core::types::MaybePendingTransactionReceipt, ForkedBackendError>;

pub(crate) const LOG_TARGET: &str = "forked_backend";

Expand Down Expand Up @@ -62,6 +72,12 @@ pub enum BackendRequest {
GetNonce(ContractAddress, OneshotSender<GetNonceResult>),
GetClassHashAt(ContractAddress, OneshotSender<GetClassHashAtResult>),
GetStorage(ContractAddress, StorageKey, OneshotSender<GetStorageResult>),
GetEvents(EventFilter, EventContinuationToken, ChunkSize, OneshotSender<GetEventResult>),
GetBlockWithTxHash(BlockIdOrTag, OneshotSender<GetBlockWithTxHashesResult>),
GetBlockWithTxs(BlockIdOrTag, OneshotSender<GetBlockWithTxsResult>),
GetTransactionByBlockIdAndIndex(BlockIdOrTag, TxNumber, OneshotSender<GetTransactionResult>),
GetTransactionByHash(TxHash, OneshotSender<GetTransactionResult>),
GetTransactionReceipt(TxHash, OneshotSender<GetTransactionReceiptResult>),
}

type BackendRequestFuture = BoxFuture<'static, ()>;
Expand Down Expand Up @@ -144,6 +160,84 @@ impl Backend {

self.pending_requests.push(fut);
}

BackendRequest::GetEvents(filter, continuation_token, chunks_size, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_events(filter, continuation_token, chunks_size)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send events result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetBlockWithTxHash(block_id, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_block_with_tx_hashes(block_id)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send block result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetBlockWithTxs(block_id, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_block_with_txs(block_id)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send block result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetTransactionByBlockIdAndIndex(block_id, index, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_transaction_by_block_id_and_index(block_id, index)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send transaction result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetTransactionByHash(transaction_hash, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_transaction_by_hash(transaction_hash)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send transaction result")
});

self.pending_requests.push(fut);
}

BackendRequest::GetTransactionReceipt(transaction_hash, sender) => {
let fut = Box::pin(async move {
let res = provider
.get_transaction_receipt(transaction_hash)
.await
.map_err(ForkedBackendError::StarknetProvider);

sender.send(res).expect("failed to send transaction result")
});

self.pending_requests.push(fut);
}
}
}
}
Expand Down Expand Up @@ -270,6 +364,7 @@ impl ForkedBackend {
key = %format!("{:#x}", key),
"Requesting storage."
);

let (sender, rx) = oneshot();
self.0
.lock()
Expand Down Expand Up @@ -328,6 +423,87 @@ impl ForkedBackend {
}
}
}

pub fn do_get_events(
&self,
filter: EventFilter,
continuation_token: Option<String>,
chunks_size: ChunkSize,
) -> Result<EventsPage, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting evetns at filter{filter:#?}, continuation_token {continuation_token:#?}, and chunks_size {chunks_size:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetEvents(filter, continuation_token, chunks_size, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_block_with_tx_hashes(
&self,
block_id: BlockIdOrTag,
) -> Result<starknet::core::types::MaybePendingBlockWithTxHashes, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting block with tx_hashes at block {block_id:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetBlockWithTxHash(block_id, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_block_with_txs(
&self,
block_id: BlockIdOrTag,
) -> Result<starknet::core::types::MaybePendingBlockWithTxs, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting block with txs at block {block_id:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetBlockWithTxs(block_id, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_transaction_by_block_id_and_index(
&self,
block_id: BlockIdOrTag,
index: TxNumber,
) -> Result<Transaction, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting transaction at block {block_id:#?}, index {index:#?}");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetTransactionByBlockIdAndIndex(block_id, index, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_transaction_by_hash(
&self,
transaction_hash: TxHash,
) -> Result<Transaction, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting transaction at trasanction hash {transaction_hash:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetTransactionByHash(transaction_hash, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}

pub fn do_get_transaction_receipt(
&self,
transaction_hash: TxHash,
) -> Result<starknet::core::types::MaybePendingTransactionReceipt, ForkedBackendError> {
trace!(target: LOG_TARGET, "requesting transaction receipt at trasanction hash {transaction_hash:#?} ");
let (sender, rx) = oneshot();
self.0
.lock()
.try_send(BackendRequest::GetTransactionReceipt(transaction_hash, sender))
.map_err(|e| e.into_send_error())?;
rx.recv()?
}
}

/// A shared cache that stores data fetched from the forked network.
Expand Down Expand Up @@ -356,7 +532,6 @@ impl StateProvider for SharedStateProvider {
if let nonce @ Some(_) = self.contract(address)?.map(|i| i.nonce) {
return Ok(nonce);
}

if let Some(nonce) = handle_contract_or_class_not_found_err(self.0.do_get_nonce(address))
.map_err(|e| {
error!(
Expand Down Expand Up @@ -385,7 +560,6 @@ impl StateProvider for SharedStateProvider {
{
return Ok(value.copied());
}

let value =
handle_contract_or_class_not_found_err(self.0.do_get_storage(address, storage_key))
.map_err(|e| {
Expand Down Expand Up @@ -585,7 +759,7 @@ fn handle_contract_or_class_not_found_err<T>(

#[cfg(test)]
mod tests {
use katana_primitives::block::BlockNumber;
use katana_primitives::block::{BlockNumber, BlockTag};
use katana_primitives::contract::GenericContractInfo;
use starknet::macros::felt;
use url::Url;
Expand Down Expand Up @@ -704,4 +878,61 @@ mod tests {
assert_eq!(class_hash, class_hash_in_cache, "value must be stored in cache");
assert_eq!(storage_value, storage_value_in_cache, "value must be stored in cache");
}

#[test]
fn fetch_non_state_data_from_fork() {
let (backend, _) = create_forked_backend(LOCAL_RPC_URL.into(), 1);

assert!(
backend
.do_get_events(
EventFilter {
from_block: Some(starknet::core::types::BlockId::Tag(BlockTag::Latest)),
to_block: Some(starknet::core::types::BlockId::Tag(BlockTag::Latest)),
Comment on lines +1000 to +1001
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't pending that you want to use here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glihm Thank you for your comments. Could you share more details or relevant documentation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently you're reading BlockTag::Latest, and I was wondering if instead we want to use BlockTag::Pending.
But re-reading your test, I think it's better what you've did with the latest. @kariy could you confirm?

address: None,
keys: None,
},
None,
10,
)
.is_ok()
);

assert!(
backend.do_get_block_with_tx_hashes(starknet::core::types::BlockId::Number(0)).is_ok()
);

assert!(backend.do_get_block_with_txs(starknet::core::types::BlockId::Number(0)).is_ok());

assert!(
backend
.do_get_transaction_by_block_id_and_index(
starknet::core::types::BlockId::Number(0),
1
)
.is_ok()
);

assert!(
backend
.do_get_transaction_by_hash(
FieldElement::from_hex_be(
"0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf",
)
.unwrap()
)
.is_err()
);

assert!(
backend
.do_get_transaction_receipt(
FieldElement::from_hex_be(
"0x41a78e741e5af2fec34b695679bc6891742439f7afb8484ecd7766661ad02bf",
)
.unwrap()
)
.is_err()
);
}
}
Loading