Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer] use in memory buffer to store obj changes and update snapsh… #18007

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio_util::sync::CancellationToken;
use tokio::sync::watch;

use diesel::r2d2::R2D2Connection;
use mysten_metrics::metered_channel::Sender;
use std::collections::hash_map::Entry;
use std::collections::HashSet;
use sui_data_ingestion_core::Worker;
Expand All @@ -46,8 +47,9 @@ use crate::errors::IndexerError;
use crate::metrics::IndexerMetrics;

use crate::db::ConnectionPool;
use crate::handlers::objects_snapshot_processor::CheckpointObjectChanges;
use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver};
use crate::store::{IndexerStore, PgIndexerStore};
use crate::store::{IndexerStore, ObjectChangeToCommit, PgIndexerStore};
use crate::types::{
IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject,
IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex,
Expand All @@ -64,9 +66,11 @@ const CHECKPOINT_QUEUE_SIZE: usize = 100;
pub async fn new_handlers<S, T>(
state: S,
client: Client,
object_change_sender: Sender<CheckpointObjectChanges>,
metrics: IndexerMetrics,
next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
backfill_cancel: CancellationToken,
) -> Result<CheckpointHandler<S, T>, IndexerError>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand All @@ -92,11 +96,13 @@ where
spawn_monitored_task!(start_tx_checkpoint_commit_task(
state_clone,
client_clone,
object_change_sender,
metrics_clone,
indexed_checkpoint_receiver,
tx,
next_checkpoint_sequence_number,
cancel.clone()
cancel.clone(),
backfill_cancel,
));
Ok(CheckpointHandler::new(
state,
Expand Down
40 changes: 37 additions & 3 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use futures::TryFutureExt;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Mutex};

use mysten_metrics::metered_channel::Sender;
use tap::tap::TapFallible;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::{error, info};

use crate::handlers::objects_snapshot_processor::CheckpointObjectChanges;
use sui_rest_api::Client;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

Expand All @@ -24,11 +28,13 @@ const OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG: u64 = 900;
pub async fn start_tx_checkpoint_commit_task<S>(
state: S,
client: Client,
object_change_sender: Sender<CheckpointObjectChanges>,
metrics: IndexerMetrics,
tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
commit_notifier: watch::Sender<Option<CheckpointSequenceNumber>>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
backfill_cancel: CancellationToken,
) -> IndexerResult<()>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand All @@ -49,10 +55,11 @@ where
let latest_object_snapshot_seq = state
.get_latest_object_snapshot_checkpoint_sequence_number()
.await?;
let latest_cp_seq = state.get_latest_checkpoint_sequence_number().await?;
let latest_cp_seq = latest_object_snapshot_seq;
if latest_object_snapshot_seq != latest_cp_seq {
info!("Flipping object_snapshot_backfill_mode to false because objects_snapshot is behind already!");
object_snapshot_backfill_mode = false;
backfill_cancel.cancel();
}

let mut unprocessed = HashMap::new();
Expand Down Expand Up @@ -91,6 +98,7 @@ where
&state,
batch,
epoch,
object_change_sender.clone(),
&metrics,
&commit_notifier,
object_snapshot_backfill_mode,
Expand All @@ -104,6 +112,7 @@ where
&state,
batch,
None,
object_change_sender.clone(),
&metrics,
&commit_notifier,
object_snapshot_backfill_mode,
Expand All @@ -113,9 +122,12 @@ where
}
// this is a one-way flip in case indexer falls behind again, so that the objects snapshot
// table will not be populated by both committer and async snapshot processor at the same time.
if latest_committed_cp + OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG > latest_fn_cp {
if object_snapshot_backfill_mode
&& latest_committed_cp + OBJECTS_SNAPSHOT_MAX_CHECKPOINT_LAG > latest_fn_cp
{
info!("Flipping object_snapshot_backfill_mode to false because objects_snapshot is close to up-to-date.");
object_snapshot_backfill_mode = false;
backfill_cancel.cancel();
}
}
Ok(())
Expand All @@ -130,6 +142,7 @@ async fn commit_checkpoints<S>(
state: &S,
indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
epoch: Option<EpochToCommit>,
object_change_sender: Sender<CheckpointObjectChanges>,
metrics: &IndexerMetrics,
commit_notifier: &watch::Sender<Option<CheckpointSequenceNumber>>,
object_snapshot_backfill_mode: bool,
Expand All @@ -144,6 +157,7 @@ async fn commit_checkpoints<S>(
let mut object_changes_batch = vec![];
let mut object_history_changes_batch = vec![];
let mut packages_batch = vec![];
let mut buffered_object_changes = vec![];

for indexed_checkpoint in indexed_checkpoint_batch {
let CheckpointDataToCommit {
Expand All @@ -157,14 +171,19 @@ async fn commit_checkpoints<S>(
packages,
epoch: _,
} = indexed_checkpoint;
let sequence_number = checkpoint.sequence_number;
checkpoint_batch.push(checkpoint);
tx_batch.push(transactions);
events_batch.push(events);
tx_indices_batch.push(tx_indices);
display_updates_batch.extend(display_updates.into_iter());
object_changes_batch.push(object_changes);
object_changes_batch.push(object_changes.clone());
object_history_changes_batch.push(object_history_changes);
packages_batch.push(packages);
buffered_object_changes.push(CheckpointObjectChanges {
checkpoint: sequence_number,
object_changes,
});
}

let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
Expand All @@ -189,9 +208,24 @@ async fn commit_checkpoints<S>(
state.persist_objects(object_changes_batch.clone()),
state.persist_object_history(object_history_changes_batch.clone()),
];

if object_snapshot_backfill_mode {
persist_tasks.push(state.backfill_objects_snapshot(object_changes_batch));
} else {
// Fill up the buffer otherwise
for object_changes in buffered_object_changes {
let _ = object_change_sender
.send(object_changes)
.await
.map_err(|e| {
error!(
"Failed to send object changes to buffer with error: {}",
e.to_string()
);
});
}
}

if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
}
Expand Down
Loading
Loading