From 9caf33e4662642b4138c18d8a2a8cb4f1d63de03 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Tue, 5 Nov 2024 17:54:29 -0800 Subject: [PATCH] metrics for unprocessed and batch --- crates/sui-indexer/src/handlers/committer.rs | 77 ++++++++++++++++++++ crates/sui-indexer/src/metrics.rs | 36 +++++++++ 2 files changed, 113 insertions(+) diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index 2a4eb386828c5..66308d309edc3 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::{BTreeMap, HashMap}; +use std::mem; use tap::tap::TapFallible; use tokio_util::sync::CancellationToken; @@ -48,10 +49,33 @@ where break; } + metrics.unprocessed_length.set(unprocessed.len() as i64); + metrics + .unprocessed_capacity + .set(unprocessed.capacity() as i64); + metrics + .unprocessed_memory_size + .set(mem::size_of_val(&unprocessed) as i64); + // split the batch into smaller batches per epoch to handle partitioning for checkpoint in indexed_checkpoint_batch { unprocessed.insert(checkpoint.checkpoint.sequence_number, checkpoint); } + + metrics.unprocessed_length.set(unprocessed.len() as i64); + metrics + .unprocessed_capacity + .set(unprocessed.capacity() as i64); + metrics + .unprocessed_memory_size + .set(mem::size_of_val(&unprocessed) as i64); + + metrics.batch_length.set(batch.len() as i64); + metrics.batch_capacity.set(batch.capacity() as i64); + metrics + .batch_memory_size + .set(mem::size_of_val(&batch) as i64); + while let Some(checkpoint) = unprocessed.remove(&next_checkpoint_sequence_number) { let epoch = checkpoint.epoch.clone(); batch.push(checkpoint); @@ -60,8 +84,35 @@ where // The batch will consist of contiguous checkpoints and at most one epoch boundary at // the end. if batch.len() == checkpoint_commit_batch_size || epoch.is_some() { + metrics.unprocessed_length.set(unprocessed.len() as i64); + metrics + .unprocessed_capacity + .set(unprocessed.capacity() as i64); + metrics + .unprocessed_memory_size + .set(mem::size_of_val(&unprocessed) as i64); + + metrics.batch_length.set(batch.len() as i64); + metrics.batch_capacity.set(batch.capacity() as i64); + metrics + .batch_memory_size + .set(mem::size_of_val(&batch) as i64); + commit_checkpoints(&state, batch, epoch, &metrics).await; batch = vec![]; + metrics.unprocessed_length.set(unprocessed.len() as i64); + metrics + .unprocessed_capacity + .set(unprocessed.capacity() as i64); + metrics + .unprocessed_memory_size + .set(mem::size_of_val(&unprocessed) as i64); + + metrics.batch_length.set(batch.len() as i64); + metrics.batch_capacity.set(batch.capacity() as i64); + metrics + .batch_memory_size + .set(mem::size_of_val(&batch) as i64); } if let Some(epoch_number) = epoch_number_option { state.upload_display(epoch_number).await.tap_err(|e| { @@ -73,9 +124,35 @@ where })?; } } + metrics.unprocessed_length.set(unprocessed.len() as i64); + metrics + .unprocessed_capacity + .set(unprocessed.capacity() as i64); + metrics + .unprocessed_memory_size + .set(mem::size_of_val(&unprocessed) as i64); + + metrics.batch_length.set(batch.len() as i64); + metrics.batch_capacity.set(batch.capacity() as i64); + metrics + .batch_memory_size + .set(mem::size_of_val(&batch) as i64); if !batch.is_empty() { commit_checkpoints(&state, batch, None, &metrics).await; batch = vec![]; + metrics.unprocessed_length.set(unprocessed.len() as i64); + metrics + .unprocessed_capacity + .set(unprocessed.capacity() as i64); + metrics + .unprocessed_memory_size + .set(mem::size_of_val(&unprocessed) as i64); + + metrics.batch_length.set(batch.len() as i64); + metrics.batch_capacity.set(batch.capacity() as i64); + metrics + .batch_memory_size + .set(mem::size_of_val(&batch) as i64); } } Ok(()) diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 0b1f8c1e5bed5..391d402430c5d 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -168,6 +168,12 @@ pub struct IndexerMetrics { pub last_pruned_checkpoint: IntGauge, pub last_pruned_transaction: IntGauge, pub epoch_pruning_latency: Histogram, + pub unprocessed_length: IntGauge, + pub unprocessed_capacity: IntGauge, + pub unprocessed_memory_size: IntGauge, + pub batch_length: IntGauge, + pub batch_capacity: IntGauge, + pub batch_memory_size: IntGauge, } impl IndexerMetrics { @@ -790,6 +796,36 @@ impl IndexerMetrics { DB_UPDATE_QUERY_LATENCY_SEC_BUCKETS.to_vec(), registry ).unwrap(), + unprocessed_length: register_int_gauge_with_registry!( + "unprocessed_length", + "Number of elements in the unprocessed hashmap", + registry, + ).unwrap(), + unprocessed_capacity: register_int_gauge_with_registry!( + "unprocessed_capacity", + "Capacity of the unprocessed hashmap", + registry, + ).unwrap(), + unprocessed_memory_size: register_int_gauge_with_registry!( + "unprocessed_memory_size", + "Memory size of the unprocessed hashmap", + registry, + ).unwrap(), + batch_length: register_int_gauge_with_registry!( + "batch_length", + "Number of elements in the batch vec", + registry, + ).unwrap(), + batch_capacity: register_int_gauge_with_registry!( + "batch_capacity", + "Capacity of the batch vec", + registry, + ).unwrap(), + batch_memory_size: register_int_gauge_with_registry!( + "batch_memory_size", + "Memory size of the batch vec", + registry, + ).unwrap(), } } }