Skip to content

Commit

Permalink
metrics for unprocessed and batch
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Nov 6, 2024
1 parent 8033aff commit 9caf33e
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
77 changes: 77 additions & 0 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::{BTreeMap, HashMap};
use std::mem;

use tap::tap::TapFallible;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -48,10 +49,33 @@ where
break;
}

metrics.unprocessed_length.set(unprocessed.len() as i64);
metrics
.unprocessed_capacity
.set(unprocessed.capacity() as i64);
metrics
.unprocessed_memory_size
.set(mem::size_of_val(&unprocessed) as i64);

// split the batch into smaller batches per epoch to handle partitioning
for checkpoint in indexed_checkpoint_batch {
unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint);
}

metrics.unprocessed_length.set(unprocessed.len() as i64);
metrics
.unprocessed_capacity
.set(unprocessed.capacity() as i64);
metrics
.unprocessed_memory_size
.set(mem::size_of_val(&unprocessed) as i64);

metrics.batch_length.set(batch.len() as i64);
metrics.batch_capacity.set(batch.capacity() as i64);
metrics
.batch_memory_size
.set(mem::size_of_val(&batch) as i64);

while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) {
let epoch = checkpoint.epoch.clone();
batch.push(checkpoint);
Expand All @@ -60,8 +84,35 @@ where
// The batch will consist of contiguous checkpoints and at most one epoch boundary at
// the end.
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
metrics.unprocessed_length.set(unprocessed.len() as i64);
metrics
.unprocessed_capacity
.set(unprocessed.capacity() as i64);
metrics
.unprocessed_memory_size
.set(mem::size_of_val(&unprocessed) as i64);

metrics.batch_length.set(batch.len() as i64);
metrics.batch_capacity.set(batch.capacity() as i64);
metrics
.batch_memory_size
.set(mem::size_of_val(&batch) as i64);

commit_checkpoints(&state, batch, epoch, &metrics).await;
batch = vec![];
metrics.unprocessed_length.set(unprocessed.len() as i64);
metrics
.unprocessed_capacity
.set(unprocessed.capacity() as i64);
metrics
.unprocessed_memory_size
.set(mem::size_of_val(&unprocessed) as i64);

metrics.batch_length.set(batch.len() as i64);
metrics.batch_capacity.set(batch.capacity() as i64);
metrics
.batch_memory_size
.set(mem::size_of_val(&batch) as i64);
}
if let Some(epoch_number) = epoch_number_option {
state.upload_display(epoch_number).await.tap_err(|e| {
Expand All @@ -73,9 +124,35 @@ where
})?;
}
}
metrics.unprocessed_length.set(unprocessed.len() as i64);
metrics
.unprocessed_capacity
.set(unprocessed.capacity() as i64);
metrics
.unprocessed_memory_size
.set(mem::size_of_val(&unprocessed) as i64);

metrics.batch_length.set(batch.len() as i64);
metrics.batch_capacity.set(batch.capacity() as i64);
metrics
.batch_memory_size
.set(mem::size_of_val(&batch) as i64);
if !batch.is_empty() {
commit_checkpoints(&state, batch, None, &metrics).await;
batch = vec![];
metrics.unprocessed_length.set(unprocessed.len() as i64);
metrics
.unprocessed_capacity
.set(unprocessed.capacity() as i64);
metrics
.unprocessed_memory_size
.set(mem::size_of_val(&unprocessed) as i64);

metrics.batch_length.set(batch.len() as i64);
metrics.batch_capacity.set(batch.capacity() as i64);
metrics
.batch_memory_size
.set(mem::size_of_val(&batch) as i64);
}
}
Ok(())
Expand Down
36 changes: 36 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ pub struct IndexerMetrics {
pub last_pruned_checkpoint: IntGauge,
pub last_pruned_transaction: IntGauge,
pub epoch_pruning_latency: Histogram,
pub unprocessed_length: IntGauge,
pub unprocessed_capacity: IntGauge,
pub unprocessed_memory_size: IntGauge,
pub batch_length: IntGauge,
pub batch_capacity: IntGauge,
pub batch_memory_size: IntGauge,
}

impl IndexerMetrics {
Expand Down Expand Up @@ -790,6 +796,36 @@ impl IndexerMetrics {
DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(),
registry
).unwrap(),
unprocessed_length: register_int_gauge_with_registry!(
"unprocessed_length",
"Number of elements in the unprocessed hashmap",
registry,
).unwrap(),
unprocessed_capacity: register_int_gauge_with_registry!(
"unprocessed_capacity",
"Capacity of the unprocessed hashmap",
registry,
).unwrap(),
unprocessed_memory_size: register_int_gauge_with_registry!(
"unprocessed_memory_size",
"Memory size of the unprocessed hashmap",
registry,
).unwrap(),
batch_length: register_int_gauge_with_registry!(
"batch_length",
"Number of elements in the batch vec",
registry,
).unwrap(),
batch_capacity: register_int_gauge_with_registry!(
"batch_capacity",
"Capacity of the batch vec",
registry,
).unwrap(),
batch_memory_size: register_int_gauge_with_registry!(
"batch_memory_size",
"Memory size of the batch vec",
registry,
).unwrap(),
}
}
}
Expand Down

0 comments on commit 9caf33e

Please sign in to comment.