Skip to content

Commit

Permalink
resolve deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
emmazzz committed Jun 5, 2024
1 parent d7e0710 commit 5c067a4
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 3 deletions.
4 changes: 3 additions & 1 deletion crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub async fn new_handlers<S, T>(
metrics: IndexerMetrics,
next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
backfill_cancel: CancellationToken,
) -> Result<CheckpointHandler<S, T>, IndexerError>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand Down Expand Up @@ -100,7 +101,8 @@ where
indexed_checkpoint_receiver,
tx,
next_checkpoint_sequence_number,
cancel.clone()
cancel.clone(),
backfill_cancel,
));
Ok(CheckpointHandler::new(
state,
Expand Down
7 changes: 6 additions & 1 deletion crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub async fn start_tx_checkpoint_commit_task<S>(
commit_notifier: watch::Sender<Option<CheckpointSequenceNumber>>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
backfill_cancel: CancellationToken,
) -> IndexerResult<()>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand All @@ -58,6 +59,7 @@ where
if latest_object_snapshot_seq != latest_cp_seq {
info!("Flipping object_snapshot_backfill_mode to false because objects_snapshot is behind already!");
object_snapshot_backfill_mode = false;
backfill_cancel.cancel();
}

let mut unprocessed = HashMap::new();
Expand Down Expand Up @@ -120,9 +122,12 @@ where
}
// this is a one-way flip in case indexer falls behind again, so that the objects snapshot
// table will not be populated by both committer and async snapshot processor at the same time.
if latest_committed_cp + OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG > latest_fn_cp {
if object_snapshot_backfill_mode
&& latest_committed_cp + OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG > latest_fn_cp
{
info!("Flipping object_snapshot_backfill_mode to false because objects_snapshot is close to up-to-date.");
object_snapshot_backfill_mode = false;
backfill_cancel.cancel();
}
}
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/handlers/objects_snapshot_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::pin_mut;
use futures::stream::{Peekable, ReadyChunks};
use futures::StreamExt;
use itertools::Itertools;
use rayon::iter::plumbing::bridge_unindexed;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
Expand Down Expand Up @@ -34,6 +35,7 @@ pub struct ObjectsSnapshotProcessor<S> {
metrics: IndexerMetrics,
pub config: SnapshotLagConfig,
cancel: CancellationToken,
backfill_cancel: CancellationToken,
}

pub struct CheckpointObjectChanges {
Expand Down Expand Up @@ -101,13 +103,15 @@ where
metrics: IndexerMetrics,
config: SnapshotLagConfig,
cancel: CancellationToken,
backfill_cancel: CancellationToken,
) -> ObjectsSnapshotProcessor<S> {
Self {
client,
store,
metrics,
config,
cancel,
backfill_cancel,
}
}

Expand Down Expand Up @@ -184,6 +188,10 @@ where
info!("Shutdown signal received, terminating object snapshot processor");
return Ok(());
}
_ = self.backfill_cancel.cancelled() => {
info!("Backfill is done, exiting the loop and start regular syncing");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(self.config.sleep_duration)) => {
let latest_cp = self
.store
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ impl Indexer {
.with_label_values(&["object_change_buffering"]),
);

// let objects_snapshot_buffer = Arc::new(Mutex::new(ObjectChangeBuffer::default()));
let backfill_cancel = CancellationToken::new();

let objects_snapshot_processor = ObjectsSnapshotProcessor::new_with_config(
rest_client.clone(),
store.clone(),
metrics.clone(),
snapshot_config,
cancel.clone(),
backfill_cancel.clone(),
);
spawn_monitored_task!(objects_snapshot_processor.start(object_change_receiver));

Expand All @@ -135,6 +136,7 @@ impl Indexer {
metrics,
watermark,
cancel.clone(),
backfill_cancel,
)
.await?;
let worker_pool = WorkerPool::new(worker, "workflow".to_string(), download_queue_size);
Expand Down

0 comments on commit 5c067a4

Please sign in to comment.