Skip to content

Commit

Permalink
[Indexer] A few improvements to backfill tool (#19441)
Browse files Browse the repository at this point in the history
## Description 

This PR adds 3 improvements to the sql backfill tool:
1. It allows ON CONFLICT DO NOTHING, so that we can safely backfill gaps
without been too precise.
2. It tunes down the default concurrency and chunk size, and allows for
override through command line args.
3. It prints out the minimum in-progress checkpoint so that if it ever
stops for some reason, you can restart using that number.

## Test plan 

Run again locally

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Sep 19, 2024
1 parent f9698a6 commit b8d8194
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
23 changes: 23 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,27 @@ impl Default for IngestionConfig {
}
}

#[derive(Args, Debug, Clone)]
pub struct SqlBackFillConfig {
/// Maximum number of concurrent tasks to run.
#[arg(
long,
default_value_t = Self::DEFAULT_MAX_CONCURRENCY,
)]
pub max_concurrency: usize,
/// Number of checkpoints to backfill in a single SQL command.
#[arg(
long,
default_value_t = Self::DEFAULT_CHUNK_SIZE,
)]
pub chunk_size: usize,
}

impl SqlBackFillConfig {
const DEFAULT_MAX_CONCURRENCY: usize = 10;
const DEFAULT_CHUNK_SIZE: usize = 1000;
}

#[derive(Subcommand, Clone, Debug)]
pub enum Command {
Indexer {
Expand Down Expand Up @@ -177,6 +198,8 @@ pub enum Command {
checkpoint_column_name: String,
first_checkpoint: u64,
last_checkpoint: u64,
#[command(flatten)]
backfill_config: SqlBackFillConfig,
},
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ async fn main() -> anyhow::Result<()> {
checkpoint_column_name,
first_checkpoint,
last_checkpoint,
backfill_config,
} => {
run_sql_backfill(
&sql,
&checkpoint_column_name,
first_checkpoint,
last_checkpoint,
pool,
backfill_config,
)
.await;
}
Expand Down
33 changes: 25 additions & 8 deletions crates/sui-indexer/src/sql_backfill.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,55 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::config::SqlBackFillConfig;
use crate::database::ConnectionPool;
use diesel_async::RunQueryDsl;
use futures::{stream, StreamExt};
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Instant;

const CHUNK_SIZE: u64 = 10000;
const MAX_CONCURRENCY: usize = 100;
use tokio::sync::Mutex;

pub async fn run_sql_backfill(
sql: &str,
checkpoint_column_name: &str,
first_checkpoint: u64,
last_checkpoint: u64,
pool: ConnectionPool,
backfill_config: SqlBackFillConfig,
) {
let cur_time = Instant::now();
// Keeps track of the checkpoint ranges (using starting checkpoint number)
// that are in progress.
let in_progress = Arc::new(Mutex::new(BTreeSet::new()));
let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint)
.step_by(CHUNK_SIZE as usize)
.step_by(backfill_config.chunk_size)
.map(|chunk_start| {
let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint);
let chunk_end = std::cmp::min(
chunk_start + backfill_config.chunk_size as u64 - 1,
last_checkpoint,
);
(chunk_start, chunk_end)
})
.collect();

stream::iter(chunks)
.for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| {
.for_each_concurrent(backfill_config.max_concurrency, |(start_id, end_id)| {
let pool_clone = pool.clone(); // Clone the pool for async operation
let in_progress_clone = in_progress.clone();
async move {
in_progress_clone.lock().await.insert(start_id);
// Run the copy in a batch and add a delay
backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone)
.await;
println!("Finished checkpoint range: {} - {}", start_id, end_id);
println!("Finished checkpoint range: {} - {}.", start_id, end_id);
in_progress_clone.lock().await.remove(&start_id);
let cur_min_in_progress = in_progress_clone.lock().await.iter().next().cloned();
println!(
"Minimum checkpoint number still in progress: {:?}.\
If the binary ever fails, you can restart from this checkpoint",
cur_min_in_progress
);
}
})
.await;
Expand All @@ -49,7 +66,7 @@ async fn backfill_data_batch(
let mut conn = pool.get().await.unwrap();

let query = format!(
"{} WHERE {} BETWEEN {} AND {}",
"{} WHERE {} BETWEEN {} AND {} ON CONFLICT DO NOTHING",
sql, checkpoint_column_name, first_checkpoint, last_checkpoint
);

Expand Down

0 comments on commit b8d8194

Please sign in to comment.