Skip to content

Commit

Permalink
refactor(indexer-alt): configurable processor
Browse files Browse the repository at this point in the history
## Description

Change the pipeline/handler/processor interface to accept a value
representing the handler/processor. This allows the processor to include
configuration.

This will be used by upcoming pipelines for processing protocol configs
and feature flags.

## Test plan

CI
  • Loading branch information
amnn committed Nov 4, 2024
1 parent 719db00 commit 9923987
Show file tree
Hide file tree
Showing 23 changed files with 72 additions and 50 deletions.
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Processor for EvEmitMod {

type Value = StoredEvEmitMod;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for EvStructInst {

type Value = StoredEvStructInst;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for KvCheckpoints {

type Value = StoredCheckpoint;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let sequence_number = checkpoint.checkpoint_summary.sequence_number as i64;
Ok(vec![StoredCheckpoint {
sequence_number,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/kv_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Processor for KvObjects {
const NAME: &'static str = "kv_objects";
type Value = StoredObject;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let deleted_objects = checkpoint
.eventually_removed_object_refs_post_version()
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for KvTransactions {

type Value = StoredTransaction;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/obj_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Processor for ObjVersions {
const NAME: &'static str = "obj_versions";
type Value = StoredObjVersion;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Processor for SumCoinBalances {

type Value = StoredObjectUpdate<StoredSumCoinBalance>;

fn process(checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/sum_displays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Processor for SumDisplays {

type Value = StoredDisplay;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData { transactions, .. } = checkpoint.as_ref();

let mut values = vec![];
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/sum_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Processor for SumObjTypes {

type Value = StoredObjectUpdate<StoredSumObjType>;

fn process(checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/sum_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Processor for SumPackages {

type Value = StoredPackage;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
checkpoint_summary,
transactions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Processor for TxAffectedAddress {

type Value = StoredTxAffectedAddress;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for TxAffectedObjects {

type Value = StoredTxAffectedObject;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Processor for TxBalanceChanges {

type Value = StoredTxBalanceChange;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls_fun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Processor for TxCallsFun {

type Value = StoredTxCalls;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/tx_digests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl Processor for TxDigests {

type Value = StoredTxDigest;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/handlers/tx_kinds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl Processor for TxKinds {

type Value = StoredTxKind;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
transactions,
checkpoint_summary,
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl Processor for WalCoinBalances {

type Value = StoredObjectUpdate<StoredSumCoinBalance>;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
SumCoinBalances::process(checkpoint)
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
SumCoinBalances.process(checkpoint)
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/sui-indexer-alt/src/handlers/wal_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ impl Processor for WalObjTypes {

type Value = StoredObjectUpdate<StoredSumObjType>;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
SumObjTypes::process(checkpoint)
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
SumObjTypes.process(checkpoint)
}
}

Expand Down
14 changes: 10 additions & 4 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ impl Indexer {
/// Concurrent pipelines commit checkpoint data out-of-order to maximise throughput, and they
/// keep the watermark table up-to-date with the highest point they can guarantee all data
/// exists for, for their pipeline.
pub async fn concurrent_pipeline<H: concurrent::Handler + 'static>(&mut self) -> Result<()> {
pub async fn concurrent_pipeline<H: concurrent::Handler + Send + Sync + 'static>(
&mut self,
handler: H,
) -> Result<()> {
let Some(watermark) = self.add_pipeline::<H>().await? else {
return Ok(());
};
Expand All @@ -163,7 +166,8 @@ impl Indexer {
self.check_first_checkpoint_consistency::<H>(&watermark)?;
}

let (processor, collector, committer, watermark) = concurrent::pipeline::<H>(
let (processor, collector, committer, watermark) = concurrent::pipeline(
handler,
watermark,
self.pipeline_config.clone(),
self.db.clone(),
Expand All @@ -190,8 +194,9 @@ impl Indexer {
///
/// The pipeline can optionally be configured to lag behind the ingestion service by a fixed
/// number of checkpoints (configured by `checkpoint_lag`).
pub async fn sequential_pipeline<H: sequential::Handler + 'static>(
pub async fn sequential_pipeline<H: sequential::Handler + Send + Sync + 'static>(
&mut self,
handler: H,
checkpoint_lag: Option<u64>,
) -> Result<()> {
let Some(watermark) = self.add_pipeline::<H>().await? else {
Expand All @@ -204,7 +209,8 @@ impl Indexer {

let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();

let (processor, committer) = sequential::pipeline::<H>(
let (processor, committer) = sequential::pipeline(
handler,
watermark,
self.pipeline_config.clone(),
checkpoint_lag,
Expand Down
37 changes: 18 additions & 19 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,24 @@ async fn main() -> Result<()> {

bootstrap(&indexer, retry_interval, cancel.clone()).await?;

indexer.concurrent_pipeline::<EvEmitMod>().await?;
indexer.concurrent_pipeline::<EvStructInst>().await?;
indexer.concurrent_pipeline::<KvCheckpoints>().await?;
indexer.concurrent_pipeline::<KvObjects>().await?;
indexer.concurrent_pipeline::<KvTransactions>().await?;
indexer.concurrent_pipeline::<ObjVersions>().await?;
indexer.concurrent_pipeline::<TxAffectedAddress>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;
indexer.concurrent_pipeline::<TxCallsFun>().await?;
indexer.concurrent_pipeline::<TxDigests>().await?;
indexer.concurrent_pipeline::<TxKinds>().await?;
indexer.concurrent_pipeline::<TxKinds>().await?;
indexer.concurrent_pipeline::<WalCoinBalances>().await?;
indexer.concurrent_pipeline::<WalObjTypes>().await?;
indexer.sequential_pipeline::<SumCoinBalances>(lag).await?;
indexer.sequential_pipeline::<SumDisplays>(None).await?;
indexer.sequential_pipeline::<SumObjTypes>(lag).await?;
indexer.sequential_pipeline::<SumPackages>(None).await?;
indexer.concurrent_pipeline(EvEmitMod).await?;
indexer.concurrent_pipeline(EvStructInst).await?;
indexer.concurrent_pipeline(KvCheckpoints).await?;
indexer.concurrent_pipeline(KvObjects).await?;
indexer.concurrent_pipeline(KvTransactions).await?;
indexer.concurrent_pipeline(ObjVersions).await?;
indexer.concurrent_pipeline(TxAffectedAddress).await?;
indexer.concurrent_pipeline(TxAffectedObjects).await?;
indexer.concurrent_pipeline(TxBalanceChanges).await?;
indexer.concurrent_pipeline(TxCallsFun).await?;
indexer.concurrent_pipeline(TxDigests).await?;
indexer.concurrent_pipeline(TxKinds).await?;
indexer.concurrent_pipeline(WalCoinBalances).await?;
indexer.concurrent_pipeline(WalObjTypes).await?;
indexer.sequential_pipeline(SumCoinBalances, lag).await?;
indexer.sequential_pipeline(SumDisplays, None).await?;
indexer.sequential_pipeline(SumObjTypes, lag).await?;
indexer.sequential_pipeline(SumPackages, None).await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

Expand Down
11 changes: 9 additions & 2 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ impl<H: Handler> Batched<H> {
/// channels are created to communicate between its various components. The pipeline can be
/// shutdown using its `cancel` token, and will also shutdown if any of its independent tasks
/// reports an issue.
pub(crate) fn pipeline<H: Handler + 'static>(
pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
handler: H,
initial_watermark: Option<CommitterWatermark<'static>>,
config: PipelineConfig,
db: Db,
Expand All @@ -128,7 +129,13 @@ pub(crate) fn pipeline<H: Handler + 'static>(
let (collector_tx, committer_rx) = mpsc::channel(config.write_concurrency + PIPELINE_BUFFER);
let (committer_tx, watermark_rx) = mpsc::channel(config.write_concurrency + PIPELINE_BUFFER);

let processor = processor::<H>(checkpoint_rx, processor_tx, metrics.clone(), cancel.clone());
let processor = processor(
handler,
checkpoint_rx,
processor_tx,
metrics.clone(),
cancel.clone(),
);

let collector = collector::<H>(
config.clone(),
Expand Down
9 changes: 6 additions & 3 deletions crates/sui-indexer-alt/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub trait Processor {
type Value: Send + Sync + 'static;

/// The processing logic for turning a checkpoint into rows of the table.
fn process(checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
}

/// The processor task is responsible for taking checkpoint data and breaking it down into rows
Expand All @@ -41,7 +41,8 @@ pub trait Processor {
///
/// The task will shutdown if the `cancel` token is cancelled, or if any of the workers encounters
/// an error -- there is no retry logic at this level.
pub(super) fn processor<P: Processor + 'static>(
pub(super) fn processor<P: Processor + Send + Sync + 'static>(
processor: P,
rx: mpsc::Receiver<Arc<CheckpointData>>,
tx: mpsc::Sender<Indexed<P>>,
metrics: Arc<IndexerMetrics>,
Expand All @@ -56,6 +57,8 @@ pub(super) fn processor<P: Processor + 'static>(
let tx = tx.clone();
let metrics = metrics.clone();
let cancel = cancel.clone();
let processor = &processor;

async move {
if cancel.is_cancelled() {
return Err(Break::Cancel);
Expand All @@ -71,7 +74,7 @@ pub(super) fn processor<P: Processor + 'static>(
.with_label_values(&[P::NAME])
.start_timer();

let values = P::process(&checkpoint)?;
let values = processor.process(&checkpoint)?;
let elapsed = guard.stop_and_record();

let epoch = checkpoint.checkpoint_summary.epoch;
Expand Down
11 changes: 9 additions & 2 deletions crates/sui-indexer-alt/src/pipeline/sequential/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ pub trait Handler: Processor {
/// channels are created to communicate between its various components. The pipeline can be
/// shutdown using its `cancel` token, and will also shutdown if any of its input or output
/// channels close, or any of its independent tasks fail.
pub(crate) fn pipeline<H: Handler + 'static>(
pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
handler: H,
initial_watermark: Option<CommitterWatermark<'static>>,
config: PipelineConfig,
checkpoint_lag: Option<u64>,
Expand All @@ -97,7 +98,13 @@ pub(crate) fn pipeline<H: Handler + 'static>(
) -> (JoinHandle<()>, JoinHandle<()>) {
let (processor_tx, committer_rx) = mpsc::channel(H::FANOUT + PIPELINE_BUFFER);

let processor = processor::<H>(checkpoint_rx, processor_tx, metrics.clone(), cancel.clone());
let processor = processor(
handler,
checkpoint_rx,
processor_tx,
metrics.clone(),
cancel.clone(),
);

let committer = committer::<H>(
config.clone(),
Expand Down

0 comments on commit 9923987

Please sign in to comment.