Skip to content

Commit

Permalink
feat(torii): limit number of blocks processed in one go
Browse files Browse the repository at this point in the history
  • Loading branch information
lambda-0x committed Oct 9, 2024
1 parent 5ab6cde commit 975f3e4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
5 changes: 5 additions & 0 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ struct Args {
#[arg(long, default_value = "1024")]
events_chunk_size: u64,

/// Number of blocks to process before commiting to DB
#[arg(long, default_value = "10240")]
blocks_chunk_size: u64,

/// Enable indexing pending blocks
#[arg(long, action = ArgAction::Set, default_value_t = true)]
index_pending: bool,
Expand Down Expand Up @@ -239,6 +243,7 @@ async fn main() -> anyhow::Result<()> {
EngineConfig {
max_concurrent_tasks: args.max_concurrent_tasks,
start_block: 0,
blocks_chunk_size: args.blocks_chunk_size,
events_chunk_size: args.events_chunk_size,
index_pending: args.index_pending,
polling_interval: Duration::from_millis(args.polling_interval),
Expand Down
11 changes: 9 additions & 2 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ bitflags! {
pub struct EngineConfig {
pub polling_interval: Duration,
pub start_block: u64,
pub blocks_chunk_size: u64,
pub events_chunk_size: u64,
pub index_pending: bool,
pub max_concurrent_tasks: usize,
Expand All @@ -142,6 +143,7 @@ impl Default for EngineConfig {
Self {
polling_interval: Duration::from_millis(500),
start_block: 0,
blocks_chunk_size: 10240,
events_chunk_size: 1024,
index_pending: true,
max_concurrent_tasks: 100,
Expand Down Expand Up @@ -286,15 +288,20 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}

// TODO: since we now process blocks in chunks we can parallelize the fetching of data
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;

let from = cursors.head.unwrap_or(0);
let total_remaining_blocks = latest_block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;

let instant = Instant::now();
let result = if from < latest_block_number {
let from = if from == 0 { from } else { from + 1 };
let data = self.fetch_range(from, latest_block_number, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %latest_block_number, "Fetched data for range.");
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
} else if self.config.index_pending {
let data =
Expand Down

0 comments on commit 975f3e4

Please sign in to comment.