Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track transaction performance through various stage using random mask #34789

Closed
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ members = [
"tokens",
"tpu-client",
"transaction-dos",
"transaction-metrics-tracker",
"transaction-status",
"turbine",
"udp-client",
Expand Down Expand Up @@ -378,6 +379,7 @@ solana-system-program = { path = "programs/system", version = "=1.19.0" }
solana-test-validator = { path = "test-validator", version = "=1.19.0" }
solana-thin-client = { path = "thin-client", version = "=1.19.0" }
solana-tpu-client = { path = "tpu-client", version = "=1.19.0", default-features = false }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.19.0" }
solana-transaction-status = { path = "transaction-status", version = "=1.19.0" }
solana-turbine = { path = "turbine", version = "=1.19.0" }
solana-udp-client = { path = "udp-client", version = "=1.19.0" }
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ solana-send-transaction-service = { workspace = true }
solana-streamer = { workspace = true }
solana-svm = { workspace = true }
solana-tpu-client = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
solana-transaction-status = { workspace = true }
solana-turbine = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
Expand Down
26 changes: 26 additions & 0 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,32 @@ impl Consumer {
.slot_metrics_tracker
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);

// Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes
// We assume the retryable_transaction_indexes is already sorted.
let mut retryable_idx = 0;
for (index, packet) in packets_to_process.iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there somewhere that we can put this where we aren't adding an iteration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see an easy way to do that -- I would like avoid smudge on the SanitizedTransaction. But I think I can make this more efficient -- I do not need the binary search on the retryable index, I could do it in one simple loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the actual iterations not like one stack frame deeper in most cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done many levels down using SanitizedTransaction which does not have information about the start_time.

Copy link
Contributor

@apfitzge apfitzge Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs to be done elsewhere; this is never called with the new scheduler (for non-votes), so we'd never get any metrics if that's enabled. By the time this change gets in, it will be the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apfitzge what is the new scheduler's name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to @apfitzge offline, will address both issue raised by Trent and Andrew.

if packet.original_packet().meta().is_perf_track_packet() {
if let Some(start_time) = packet.start_time() {
if retryable_idx >= retryable_transaction_indexes.len()
|| retryable_transaction_indexes[retryable_idx] != index
{
let duration = Instant::now().duration_since(*start_time);

debug!(
"Banking stage processing took {duration:?} for transaction {:?}",
packet.transaction().get_signatures().first()
);
Comment on lines +222 to +225
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really misleading. It didn't take that amount of time to process this transaction. It took that time to process the batch of transactions.

payload
.slot_metrics_tracker
.increment_process_sampled_packets_us(duration.as_micros() as u64);
} else {
// This packet is retried, advance the retry index to the next, as the next packet's index will
// certainly be > than this.
retryable_idx += 1;
}
}
}
}
Some(retryable_transaction_indexes)
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/banking_stage/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
VersionedTransaction,
},
},
std::{cmp::Ordering, mem::size_of, sync::Arc},
std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant},
thiserror::Error,
};

Expand Down Expand Up @@ -41,10 +41,16 @@ pub struct ImmutableDeserializedPacket {
message_hash: Hash,
is_simple_vote: bool,
compute_budget_details: ComputeBudgetDetails,
banking_stage_start_time: Option<Instant>,
}

impl ImmutableDeserializedPacket {
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
let banking_stage_start_time = packet
.meta()
.is_perf_track_packet()
.then_some(Instant::now());

let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
Expand All @@ -67,6 +73,7 @@ impl ImmutableDeserializedPacket {
message_hash,
is_simple_vote,
compute_budget_details,
banking_stage_start_time,
})
}

Expand Down Expand Up @@ -98,6 +105,10 @@ impl ImmutableDeserializedPacket {
self.compute_budget_details.clone()
}

pub fn start_time(&self) -> &Option<Instant> {
&self.banking_stage_start_time
}

// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions.
pub fn build_sanitized_transaction(
Expand Down
11 changes: 11 additions & 0 deletions core/src/banking_stage/leader_slot_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,17 @@ impl LeaderSlotMetricsTracker {
);
}
}

pub(crate) fn increment_process_sampled_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
leader_slot_metrics
.timing_metrics
.process_packets_timings
.process_sampled_packets_us_hist
.increment(us)
.unwrap();
}
}
}

#[cfg(test)]
Expand Down
25 changes: 25 additions & 0 deletions core/src/banking_stage/leader_slot_timing_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ pub(crate) struct ProcessPacketsTimings {
// Time spent running the cost model in processing transactions before executing
// transactions
pub cost_model_us: u64,

// banking stage processing time histogram for sampled packets
pub process_sampled_packets_us_hist: histogram::Histogram,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will almost certainly have between 0-2 counts per block on mnb, meaning it will probably be so noisy as to be useless.

Copy link
Contributor

@apfitzge apfitzge Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we care much more about time from sigverify to banking stage picking up the packet from channel, i.e. "time-to-scheduler"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sampling mechanism goal was to sample the system without tracking everything. Even a couple of data points per slot over long time time can still provide insight into where the time is spent over various stage. Time to scheduler is good stats to have. I will defer to future PRs to reduce this change set.

}

impl ProcessPacketsTimings {
Expand All @@ -264,6 +267,28 @@ impl ProcessPacketsTimings {
i64
),
("cost_model_us", self.cost_model_us, i64),
(
"process_sampled_packets_us_90pct",
self.process_sampled_packets_us_hist
.percentile(90.0)
.unwrap_or(0),
i64
),
(
"process_sampled_packets_us_min",
self.process_sampled_packets_us_hist.minimum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_max",
self.process_sampled_packets_us_hist.maximum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_mean",
self.process_sampled_packets_us_hist.mean().unwrap_or(0),
i64
),
);
}
}
1 change: 1 addition & 0 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ impl ThreadLocalUnprocessedPackets {
.iter()
.map(|p| (*p).clone())
.collect_vec();

let retryable_packets = if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process, payload)
{
Expand Down
66 changes: 63 additions & 3 deletions core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use {
count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches,
},
},
solana_sdk::timing,
solana_sdk::{signature::Signature, timing},
solana_streamer::streamer::{self, StreamerError},
solana_transaction_metrics_tracker::get_signature_from_packet,
std::{
thread::{self, Builder, JoinHandle},
time::Instant,
Expand Down Expand Up @@ -78,8 +79,9 @@ struct SigVerifierStats {
verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
batches_hist: histogram::Histogram, // number of packet batches per verify call
packets_hist: histogram::Histogram, // number of packets per verify call
process_sampled_packets_us_hist: histogram::Histogram, // per-packet time do do overall verify for sampled packets
batches_hist: histogram::Histogram, // number of packet batches per verify call
packets_hist: histogram::Histogram, // number of packets per verify call
num_deduper_saturations: usize,
total_batches: usize,
total_packets: usize,
Expand All @@ -93,6 +95,7 @@ struct SigVerifierStats {
total_discard_random_time_us: usize,
total_verify_time_us: usize,
total_shrink_time_us: usize,
perf_track_overhead_us: usize,
}

impl SigVerifierStats {
Expand Down Expand Up @@ -181,6 +184,28 @@ impl SigVerifierStats {
self.dedup_packets_pp_us_hist.mean().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_90pct",
self.process_sampled_packets_us_hist
.percentile(90.0)
.unwrap_or(0),
i64
),
(
"process_sampled_packets_us_min",
self.process_sampled_packets_us_hist.minimum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_max",
self.process_sampled_packets_us_hist.maximum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_mean",
self.process_sampled_packets_us_hist.mean().unwrap_or(0),
i64
),
(
"batches_90pct",
self.batches_hist.percentile(90.0).unwrap_or(0),
Expand Down Expand Up @@ -214,6 +239,7 @@ impl SigVerifierStats {
),
("total_verify_time_us", self.total_verify_time_us, i64),
("total_shrink_time_us", self.total_shrink_time_us, i64),
("perf_track_overhead_us", self.perf_track_overhead_us, i64),
);
}
}
Expand Down Expand Up @@ -296,8 +322,26 @@ impl SigVerifyStage {
verifier: &mut T,
stats: &mut SigVerifierStats,
) -> Result<(), T::SendType> {
lijunwangs marked this conversation as resolved.
Show resolved Hide resolved
let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default();

let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?;

let mut start_perf_track_measure = Measure::start("start_perf_track");
// track sigverify start time for interested packets
for batch in &batches {
for packet in batch.iter() {
if packet.meta().is_perf_track_packet() {
let signature = get_signature_from_packet(packet);
if let Ok(signature) = signature {
packet_perf_measure.push((*signature, Instant::now()));
}
}
}
}
start_perf_track_measure.stop();

stats.perf_track_overhead_us = start_perf_track_measure.as_us() as usize;

let batches_len = batches.len();
debug!(
"@{:?} verifier: verifying: {}",
Expand Down Expand Up @@ -370,6 +414,22 @@ impl SigVerifyStage {
(num_packets as f32 / verify_time.as_s())
);

let mut perf_track_end_measure = Measure::start("perf_track_end");
for (signature, start_time) in packet_perf_measure.iter() {
let duration = Instant::now().duration_since(*start_time);
debug!(
"Sigverify took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
stats
.process_sampled_packets_us_hist
.increment(duration.as_micros() as u64)
.unwrap();
}

perf_track_end_measure.stop();
stats.perf_track_overhead_us += perf_track_end_measure.as_us() as usize;

stats
.recv_batches_us_hist
.increment(recv_duration.as_micros() as u64)
Expand Down
17 changes: 17 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading