From b8d819468a0f545f4d08a7b1e43bc81153d07041 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Thu, 19 Sep 2024 08:29:25 -0700 Subject: [PATCH] [Indexer] A few improvements to backfill tool (#19441) ## Description This PR adds 3 improvements to the sql backfill tool: 1. It allows ON CONFLICT DO NOTHING, so that we can safely backfill gaps without been too precise. 2. It tunes down the default concurrency and chunk size, and allows for override through command line args. 3. It prints out the minimum in-progress checkpoint so that if it ever stops for some reason, you can restart using that number. ## Test plan Run again locally --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-indexer/src/config.rs | 23 ++++++++++++++++++ crates/sui-indexer/src/main.rs | 2 ++ crates/sui-indexer/src/sql_backfill.rs | 33 +++++++++++++++++++------- 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index d96c719ea10b8..648f9eba06baf 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -143,6 +143,27 @@ impl Default for IngestionConfig { } } +#[derive(Args, Debug, Clone)] +pub struct SqlBackFillConfig { + /// Maximum number of concurrent tasks to run. + #[arg( + long, + default_value_t = Self::DEFAULT_MAX_CONCURRENCY, + )] + pub max_concurrency: usize, + /// Number of checkpoints to backfill in a single SQL command. + #[arg( + long, + default_value_t = Self::DEFAULT_CHUNK_SIZE, + )] + pub chunk_size: usize, +} + +impl SqlBackFillConfig { + const DEFAULT_MAX_CONCURRENCY: usize = 10; + const DEFAULT_CHUNK_SIZE: usize = 1000; +} + #[derive(Subcommand, Clone, Debug)] pub enum Command { Indexer { @@ -177,6 +198,8 @@ pub enum Command { checkpoint_column_name: String, first_checkpoint: u64, last_checkpoint: u64, + #[command(flatten)] + backfill_config: SqlBackFillConfig, }, } diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 1c1523efbbb31..038c0f3c8af0a 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -79,6 +79,7 @@ async fn main() -> anyhow::Result<()> { checkpoint_column_name, first_checkpoint, last_checkpoint, + backfill_config, } => { run_sql_backfill( &sql, @@ -86,6 +87,7 @@ async fn main() -> anyhow::Result<()> { first_checkpoint, last_checkpoint, pool, + backfill_config, ) .await; } diff --git a/crates/sui-indexer/src/sql_backfill.rs b/crates/sui-indexer/src/sql_backfill.rs index a594e19d2be91..ea8103b6a2539 100644 --- a/crates/sui-indexer/src/sql_backfill.rs +++ b/crates/sui-indexer/src/sql_backfill.rs @@ -1,13 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::config::SqlBackFillConfig; use crate::database::ConnectionPool; use diesel_async::RunQueryDsl; use futures::{stream, StreamExt}; +use std::collections::BTreeSet; +use std::sync::Arc; use std::time::Instant; - -const CHUNK_SIZE: u64 = 10000; -const MAX_CONCURRENCY: usize = 100; +use tokio::sync::Mutex; pub async fn run_sql_backfill( sql: &str, @@ -15,24 +16,40 @@ pub async fn run_sql_backfill( first_checkpoint: u64, last_checkpoint: u64, pool: ConnectionPool, + backfill_config: SqlBackFillConfig, ) { let cur_time = Instant::now(); + // Keeps track of the checkpoint ranges (using starting checkpoint number) + // that are in progress. + let in_progress = Arc::new(Mutex::new(BTreeSet::new())); let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint) - .step_by(CHUNK_SIZE as usize) + .step_by(backfill_config.chunk_size) .map(|chunk_start| { - let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint); + let chunk_end = std::cmp::min( + chunk_start + backfill_config.chunk_size as u64 - 1, + last_checkpoint, + ); (chunk_start, chunk_end) }) .collect(); stream::iter(chunks) - .for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| { + .for_each_concurrent(backfill_config.max_concurrency, |(start_id, end_id)| { let pool_clone = pool.clone(); // Clone the pool for async operation + let in_progress_clone = in_progress.clone(); async move { + in_progress_clone.lock().await.insert(start_id); // Run the copy in a batch and add a delay backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone) .await; - println!("Finished checkpoint range: {} - {}", start_id, end_id); + println!("Finished checkpoint range: {} - {}.", start_id, end_id); + in_progress_clone.lock().await.remove(&start_id); + let cur_min_in_progress = in_progress_clone.lock().await.iter().next().cloned(); + println!( + "Minimum checkpoint number still in progress: {:?}.\ + If the binary ever fails, you can restart from this checkpoint", + cur_min_in_progress + ); } }) .await; @@ -49,7 +66,7 @@ async fn backfill_data_batch( let mut conn = pool.get().await.unwrap(); let query = format!( - "{} WHERE {} BETWEEN {} AND {}", + "{} WHERE {} BETWEEN {} AND {} ON CONFLICT DO NOTHING", sql, checkpoint_column_name, first_checkpoint, last_checkpoint );