From c6cc9d99a8c54305db45fc58b2b426c457596f4b Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 5 Jan 2024 18:23:33 -0800 Subject: [PATCH 01/13] output txn signature for debug purpose trying txn mask matching output txn to figure out why txn is not exactly matched Use 62 and 61 portion track fetch performance using random txn mask track sigverify performance using random txn mask track banking stage performance using random txn mask adding missing cargo lock file add debug messages Revert "add debug messages" This reverts commit 96aead5cbc4acb5b2fc9d8a37fcd506c73ddf552. fixed some clippy issues check-crate issue Fix a clippy issue Fix a clippy issue debug why txns in banking stage shows fewer performance tracking points debug why txns in banking stage shows fewer performance tracking points debug why txns in banking stage shows fewer performance tracking points debug why txns in banking stage shows fewer performance tracking points get higher PPS for testing purpose more debug messages on why txn is skipped display if tracer packet in log add debug before calling processing_function debug at the initial of banking stage track if a txn is forwarded dependency order missing cargo file clean up debug messages Do not use TRACER_PACKET, use its own bit rename some functions addressed some comments from Trent Update core/src/banking_stage/immutable_deserialized_packet.rs Co-authored-by: Trent Nelson addressed some comments from Trent Do not use binary_search, do simple compare in one loop --- Cargo.lock | 16 ++++++ Cargo.toml | 2 + core/Cargo.toml | 1 + core/src/banking_stage/consumer.rs | 27 ++++++++++ .../immutable_deserialized_packet.rs | 13 ++++- .../unprocessed_transaction_storage.rs | 1 + core/src/sigverify_stage.rs | 29 +++++++++- programs/sbf/Cargo.lock | 16 ++++++ sdk/src/packet.rs | 13 +++++ sdk/src/transaction/versioned/sanitized.rs | 4 ++ streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 42 ++++++++++++++- transaction-metrics-tracker/Cargo.toml | 25 +++++++++ transaction-metrics-tracker/src/lib.rs | 54 +++++++++++++++++++ 14 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 transaction-metrics-tracker/Cargo.toml create mode 100644 transaction-metrics-tracker/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index cc42e6da02df20..fc83d74372bed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5828,6 +5828,7 @@ dependencies = [ "solana-streamer", "solana-svm", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-unified-scheduler-pool", @@ -7193,6 +7194,7 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -7359,6 +7361,20 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.19.0" +dependencies = [ + "Inflector", + "base64 0.21.7", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 8cc38b69144d3d..8d467818a23b0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ members = [ "tokens", "tpu-client", "transaction-dos", + "transaction-metrics-tracker", "transaction-status", "turbine", "udp-client", @@ -378,6 +379,7 @@ solana-system-program = { path = "programs/system", version = "=1.19.0" } solana-test-validator = { path = "test-validator", version = "=1.19.0" } solana-thin-client = { path = "thin-client", version = "=1.19.0" } solana-tpu-client = { path = "tpu-client", version = "=1.19.0", default-features = false } +solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.19.0" } solana-transaction-status = { path = "transaction-status", version = "=1.19.0" } solana-turbine = { path = "turbine", version = "=1.19.0" } solana-udp-client = { path = "udp-client", version = "=1.19.0" } diff --git a/core/Cargo.toml b/core/Cargo.toml index e2a936cdabc4c1..1fd25ec38a8d3b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -67,6 +67,7 @@ solana-send-transaction-service = { workspace = true } solana-streamer = { workspace = true } solana-svm = { workspace = true } solana-tpu-client = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } solana-unified-scheduler-pool = { workspace = true } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index f4ac6c6040eda8..55f57c0a684b5a 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -208,6 +208,33 @@ impl Consumer { .slot_metrics_tracker .increment_retryable_packets_count(retryable_transaction_indexes.len() as u64); + // Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes + // We assume the retryable_transaction_indexes is already sorted. + let mut retryable_idx = 0; + for (index, packet) in packets_to_process.iter().enumerate() { + if packet.original_packet().meta().is_perf_track_packet() { + if let Some(start_time) = packet.start_time() { + if retryable_idx >= retryable_transaction_indexes.len() + || retryable_transaction_indexes[retryable_idx] != index + { + let duration = Instant::now().duration_since(*start_time); + + debug!( + "Banking stage processing took {duration:?} for transaction {:?}", + packet.transaction().get_signatures().first() + ); + inc_new_counter_info!( + "txn-metrics-banking-stage-process-us", + duration.as_micros() as usize + ); + } else { + // This packet is retried, advance the retry index to the next, as the next packet's index will + // certainly be > than this. + retryable_idx += 1; + } + } + } + } Some(retryable_transaction_indexes) } diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index 26ede7045d3480..6eb5d68ecaaca5 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -13,7 +13,7 @@ use { VersionedTransaction, }, }, - std::{cmp::Ordering, mem::size_of, sync::Arc}, + std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant}, thiserror::Error, }; @@ -41,10 +41,16 @@ pub struct ImmutableDeserializedPacket { message_hash: Hash, is_simple_vote: bool, compute_budget_details: ComputeBudgetDetails, + banking_stage_start_time: Option, } impl ImmutableDeserializedPacket { pub fn new(packet: Packet) -> Result { + let banking_stage_start_time = packet + .meta() + .is_perf_track_packet() + .then_some(Instant::now()); + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let message_bytes = packet_message(&packet)?; @@ -67,6 +73,7 @@ impl ImmutableDeserializedPacket { message_hash, is_simple_vote, compute_budget_details, + banking_stage_start_time, }) } @@ -98,6 +105,10 @@ impl ImmutableDeserializedPacket { self.compute_budget_details.clone() } + pub fn start_time(&self) -> &Option { + &self.banking_stage_start_time + } + // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. pub fn build_sanitized_transaction( diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index fcc68050b72d4c..52706f8c2bf63b 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -924,6 +924,7 @@ impl ThreadLocalUnprocessedPackets { .iter() .map(|p| (*p).clone()) .collect_vec(); + let retryable_packets = if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process, payload) { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index e5e06a3bc701c9..eb6f92f6bfaf7e 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -18,9 +18,11 @@ use { count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches, }, }, - solana_sdk::timing, + solana_sdk::{signature::Signature, timing}, solana_streamer::streamer::{self, StreamerError}, + solana_transaction_metrics_tracker::get_signature_from_packet, std::{ + collections::HashMap, thread::{self, Builder, JoinHandle}, time::Instant, }, @@ -296,8 +298,20 @@ impl SigVerifyStage { verifier: &mut T, stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { - let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); + let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + // track sigverify start time for interested packets + for batch in &batches { + for packet in batch.iter() { + if packet.meta().is_perf_track_packet() { + let signature = get_signature_from_packet(packet); + if let Ok(signature) = signature { + packet_perf_measure.insert(*signature, Instant::now()); + } + } + } + } let batches_len = batches.len(); debug!( "@{:?} verifier: verifying: {}", @@ -370,6 +384,17 @@ impl SigVerifyStage { (num_packets as f32 / verify_time.as_s()) ); + for (signature, start_time) in packet_perf_measure.drain() { + let duration = Instant::now().duration_since(start_time); + debug!( + "Sigverify took {duration:?} for transaction {:?}", + Signature::from(signature) + ); + inc_new_counter_info!( + "txn-metrics-sigverify-packet-verify-us", + duration.as_micros() as usize + ); + } stats .recv_batches_us_hist .increment(recv_duration.as_micros() as u64) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 93e2a243e2004d..d1be0ac6a7d351 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4908,6 +4908,7 @@ dependencies = [ "solana-streamer", "solana-svm", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-unified-scheduler-pool", @@ -6267,6 +6268,7 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -6369,6 +6371,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.19.0" +dependencies = [ + "Inflector", + "base64 0.21.7", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.19.0" diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index faea9ab4753c67..8300b57218c696 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,6 +33,8 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For tracking performance + const PERF_TRACK_PACKET = 0b0100_0000; } } @@ -228,6 +230,12 @@ impl Meta { self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); } + #[inline] + pub fn set_track_performance(&mut self, is_performance_track: bool) { + self.flags + .set(PacketFlags::PERF_TRACK_PACKET, is_performance_track); + } + #[inline] pub fn set_simple_vote(&mut self, is_simple_vote: bool) { self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote); @@ -261,6 +269,11 @@ impl Meta { self.flags.contains(PacketFlags::TRACER_PACKET) } + #[inline] + pub fn is_perf_track_packet(&self) -> bool { + self.flags.contains(PacketFlags::PERF_TRACK_PACKET) + } + #[inline] pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) diff --git a/sdk/src/transaction/versioned/sanitized.rs b/sdk/src/transaction/versioned/sanitized.rs index 61ecdfea56bb2a..b6311d5886b0e3 100644 --- a/sdk/src/transaction/versioned/sanitized.rs +++ b/sdk/src/transaction/versioned/sanitized.rs @@ -33,6 +33,10 @@ impl SanitizedVersionedTransaction { &self.message } + pub fn get_signatures(&self) -> &Vec { + &self.signatures + } + /// Consumes the SanitizedVersionedTransaction, returning the fields individually. pub fn destruct(self) -> (Vec, SanitizedVersionedMessage) { (self.signatures, self.message) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 8e1eb12dff1d42..22170e6426c433 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -29,6 +29,7 @@ rustls = { workspace = true, features = ["dangerous_configuration"] } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } x509-parser = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 225412dd08b315..23234231fd2750 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -27,10 +27,14 @@ use { QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, - signature::Keypair, + signature::{Keypair, Signature}, timing, }, + solana_transaction_metrics_tracker::{ + get_signature_from_packet, signature_if_should_track_packet, + }, std::{ + collections::HashMap, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ @@ -81,6 +85,7 @@ struct PacketChunk { struct PacketAccumulator { pub meta: Meta, pub chunks: Vec, + pub start_time: Instant, } #[derive(Copy, Clone, Debug)] @@ -628,6 +633,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { + let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -647,6 +653,7 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); + track_streamer_fetch_packet_performance(&packet_batch, &mut packet_perf_measure); if let Err(e) = packet_sender.send(packet_batch) { stats .total_packet_batch_send_err @@ -692,6 +699,13 @@ async fn packet_batch_sender( total_bytes += packet_batch[i].meta().size; + if let Some(signature) = + signature_if_should_track_packet(&packet_batch[i]).unwrap_or(None) + { + packet_perf_measure.insert(*signature, packet_accumulator.start_time); + // we set the PERF_TRACK_PACKET on + packet_batch[i].meta_mut().set_track_performance(true); + } stats .total_chunks_processed_by_batcher .fetch_add(num_chunks, Ordering::Relaxed); @@ -700,6 +714,30 @@ async fn packet_batch_sender( } } +fn track_streamer_fetch_packet_performance( + packet_batch: &PacketBatch, + packet_perf_measure: &mut HashMap<[u8; 64], Instant>, +) { + for packet in packet_batch.iter() { + if packet.meta().is_perf_track_packet() { + let signature = get_signature_from_packet(packet); + if let Ok(signature) = signature { + if let Some(start_time) = packet_perf_measure.remove(signature) { + let duration = Instant::now().duration_since(start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + inc_new_counter_info!( + "txn-metrics-quic-streamer-packet-fetch-us", + duration.as_micros() as usize + ); + } + } + } + } +} + async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -854,6 +892,7 @@ async fn handle_chunk( *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), + start_time: Instant::now(), }); } @@ -1453,6 +1492,7 @@ pub mod test { offset, end_of_chunk: size, }], + start_time: Instant::now(), }; ptk_sender.send(packet_accum).await.unwrap(); } diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml new file mode 100644 index 00000000000000..9bd82702a3ebb4 --- /dev/null +++ b/transaction-metrics-tracker/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "solana-transaction-metrics-tracker" +description = "Solana transaction metrics tracker" +documentation = "https://docs.rs/solana-transaction-metrics-tracker" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = false + +[dependencies] +Inflector = { workspace = true } +base64 = { workspace = true } +bincode = { workspace = true } +# Update this borsh dependency to the workspace version once +lazy_static = { workspace = true } +log = { workspace = true } +rand = { workspace = true } +solana-perf = { workspace = true } +solana-sdk = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs new file mode 100644 index 00000000000000..02ae2c14fa8ca5 --- /dev/null +++ b/transaction-metrics-tracker/src/lib.rs @@ -0,0 +1,54 @@ +use { + lazy_static::lazy_static, + log::*, + rand::Rng, + solana_perf::sigverify::PacketError, + solana_sdk::{packet::Packet, short_vec::decode_shortu16_len, signature::SIGNATURE_BYTES}, +}; + +// The mask is 12 bits long (1<<12 = 4096), it means the probability of matching +// the transaction is 1/4096 assuming the portion being matched is random. +lazy_static! { + static ref TXN_MASK: u16 = rand::thread_rng().gen_range(0..4096); +} + +/// Check if a transaction given its signature matches the randomly selected mask. +/// The signaure should be from the reference of Signature +pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { + // We do not use the highest signature byte as it is not really random + let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; + trace!("Matching txn: {match_portion:b} {:b}", *TXN_MASK); + *TXN_MASK == match_portion +} + +/// Check if a transaction packet's signature matches the mask. +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn signature_if_should_track_packet( + packet: &Packet, +) -> Result, PacketError> { + let signature = get_signature_from_packet(packet)?; + Ok(should_track_transaction(signature).then_some(signature)) +} + +/// Get the signature of the transaction packet +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTES], PacketError> { + let (sig_len_untrusted, sig_start) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(PacketError::InvalidShortVec)?; + + if sig_len_untrusted < 1 { + return Err(PacketError::InvalidSignatureLen); + } + + let signature = packet + .data(sig_start..sig_start.saturating_add(SIGNATURE_BYTES)) + .ok_or(PacketError::InvalidSignatureLen)?; + let signature = signature + .try_into() + .map_err(|_| PacketError::InvalidSignatureLen)?; + Ok(signature) +} From 792095cbec41996365f0f121cf1c41b5c113f769 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 03:03:57 -0800 Subject: [PATCH 02/13] Use datapoint as opposed to counters --- core/src/banking_stage/consumer.rs | 7 ++-- core/src/banking_stage/leader_slot_metrics.rs | 11 ++++++ .../leader_slot_timing_metrics.rs | 25 +++++++++++++ core/src/sigverify_stage.rs | 35 +++++++++++++++---- streamer/src/nonblocking/quic.rs | 23 ++++++++---- streamer/src/quic.rs | 26 +++++++++++++- 6 files changed, 109 insertions(+), 18 deletions(-) diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 55f57c0a684b5a..9d7cdca17f35dd 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -223,10 +223,9 @@ impl Consumer { "Banking stage processing took {duration:?} for transaction {:?}", packet.transaction().get_signatures().first() ); - inc_new_counter_info!( - "txn-metrics-banking-stage-process-us", - duration.as_micros() as usize - ); + payload + .slot_metrics_tracker + .increment_process_sampled_packets_us(duration.as_micros() as u64); } else { // This packet is retried, advance the retry index to the next, as the next packet's index will // certainly be > than this. diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 88ea6b5ee340cf..1c255ca019bfe7 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -936,6 +936,17 @@ impl LeaderSlotMetricsTracker { ); } } + + pub(crate) fn increment_process_sampled_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + leader_slot_metrics + .timing_metrics + .process_packets_timings + .process_sampled_packets_us_hist + .increment(us) + .unwrap(); + } + } } #[cfg(test)] diff --git a/core/src/banking_stage/leader_slot_timing_metrics.rs b/core/src/banking_stage/leader_slot_timing_metrics.rs index 7727b6cf6c6563..34ce64b31c34f3 100644 --- a/core/src/banking_stage/leader_slot_timing_metrics.rs +++ b/core/src/banking_stage/leader_slot_timing_metrics.rs @@ -244,6 +244,9 @@ pub(crate) struct ProcessPacketsTimings { // Time spent running the cost model in processing transactions before executing // transactions pub cost_model_us: u64, + + // banking stage processing time histogram for sampled packets + pub process_sampled_packets_us_hist: histogram::Histogram, } impl ProcessPacketsTimings { @@ -264,6 +267,28 @@ impl ProcessPacketsTimings { i64 ), ("cost_model_us", self.cost_model_us, i64), + ( + "process_sampled_packets_us_90pct", + self.process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + self.process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + self.process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + self.process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), ); } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index eb6f92f6bfaf7e..5bf6410328c68c 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -80,8 +80,9 @@ struct SigVerifierStats { verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch - batches_hist: histogram::Histogram, // number of packet batches per verify call - packets_hist: histogram::Histogram, // number of packets per verify call + sampled_packets_pp_us_hist: histogram::Histogram, // per-packet time do do overall verify for sampled packets + batches_hist: histogram::Histogram, // number of packet batches per verify call + packets_hist: histogram::Histogram, // number of packets per verify call num_deduper_saturations: usize, total_batches: usize, total_packets: usize, @@ -183,6 +184,28 @@ impl SigVerifierStats { self.dedup_packets_pp_us_hist.mean().unwrap_or(0), i64 ), + ( + "sampled_verify_packets_pp_us_90pct", + self.sampled_packets_pp_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "sampled_verify_packets_pp_us_min", + self.sampled_packets_pp_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "sampled_verify_packets_pp_us_max", + self.sampled_packets_pp_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "sampled_verify_packets_pp_us_mean", + self.sampled_packets_pp_us_hist.mean().unwrap_or(0), + i64 + ), ( "batches_90pct", self.batches_hist.percentile(90.0).unwrap_or(0), @@ -390,10 +413,10 @@ impl SigVerifyStage { "Sigverify took {duration:?} for transaction {:?}", Signature::from(signature) ); - inc_new_counter_info!( - "txn-metrics-sigverify-packet-verify-us", - duration.as_micros() as usize - ); + stats + .sampled_packets_pp_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); } stats .recv_batches_us_hist diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 23234231fd2750..b3d02d8287b77e 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -653,7 +653,12 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); - track_streamer_fetch_packet_performance(&packet_batch, &mut packet_perf_measure); + track_streamer_fetch_packet_performance( + &packet_batch, + &mut packet_perf_measure, + stats.clone(), + ); + if let Err(e) = packet_sender.send(packet_batch) { stats .total_packet_batch_send_err @@ -699,8 +704,9 @@ async fn packet_batch_sender( total_bytes += packet_batch[i].meta().size; - if let Some(signature) = - signature_if_should_track_packet(&packet_batch[i]).unwrap_or(None) + if let Some(signature) = signature_if_should_track_packet(&packet_batch[i]) + .ok() + .flatten() { packet_perf_measure.insert(*signature, packet_accumulator.start_time); // we set the PERF_TRACK_PACKET on @@ -717,6 +723,7 @@ async fn packet_batch_sender( fn track_streamer_fetch_packet_performance( packet_batch: &PacketBatch, packet_perf_measure: &mut HashMap<[u8; 64], Instant>, + stats: Arc, ) { for packet in packet_batch.iter() { if packet.meta().is_perf_track_packet() { @@ -728,10 +735,12 @@ fn track_streamer_fetch_packet_performance( "QUIC streamer fetch stage took {duration:?} for transaction {:?}", Signature::from(*signature) ); - inc_new_counter_info!( - "txn-metrics-quic-streamer-packet-fetch-us", - duration.as_micros() as usize - ); + stats + .process_sampled_packets_us_hist + .lock() + .unwrap() + .increment(duration.as_micros() as u64) + .unwrap(); } } } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 69a75532b8ca68..99c4b7c63f23f4 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -17,7 +17,7 @@ use { net::UdpSocket, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, RwLock, + Arc, Mutex, RwLock, }, thread, time::{Duration, SystemTime}, @@ -175,10 +175,12 @@ pub struct StreamStats { pub(crate) stream_load_ema: AtomicUsize, pub(crate) stream_load_ema_overflow: AtomicUsize, pub(crate) stream_load_capacity_overflow: AtomicUsize, + pub(crate) process_sampled_packets_us_hist: Mutex, } impl StreamStats { pub fn report(&self, name: &'static str) { + let process_sampled_packets_us_hist = self.process_sampled_packets_us_hist.lock().unwrap(); datapoint_info!( name, ( @@ -425,6 +427,28 @@ impl StreamStats { self.stream_load_capacity_overflow.load(Ordering::Relaxed), i64 ), + ( + "process_sampled_packets_us_90pct", + process_sampled_packets_us_hist + .percentile(90.0) + .unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_min", + process_sampled_packets_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_max", + process_sampled_packets_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "process_sampled_packets_us_mean", + process_sampled_packets_us_hist.mean().unwrap_or(0), + i64 + ), ); } } From 80a510e280da973eeb9e2b49e438f2b5abaf3f8b Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 04:07:20 -0800 Subject: [PATCH 03/13] Add a unit test --- transaction-metrics-tracker/src/lib.rs | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 02ae2c14fa8ca5..4b770eddb6c59b 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -40,6 +40,7 @@ pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTE .and_then(|bytes| decode_shortu16_len(bytes).ok()) .ok_or(PacketError::InvalidShortVec)?; + println!("Sig length is okay!!"); if sig_len_untrusted < 1 { return Err(PacketError::InvalidSignatureLen); } @@ -52,3 +53,31 @@ pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTE .map_err(|_| PacketError::InvalidSignatureLen)?; Ok(signature) } + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, + }; + + #[test] + fn test_get_signature_from_packet() { + // default invalid txn packet + let packet = Packet::default(); + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction, it should succeed + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, tx).unwrap(); + + let sig = get_signature_from_packet(&packet); + assert!(sig.is_ok()); + } +} From d2a5bdfffc0ced1f886637356306d4597efff546 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 04:10:07 -0800 Subject: [PATCH 04/13] removed a print --- transaction-metrics-tracker/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 4b770eddb6c59b..02d7c03d94f068 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -40,7 +40,6 @@ pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTE .and_then(|bytes| decode_shortu16_len(bytes).ok()) .ok_or(PacketError::InvalidShortVec)?; - println!("Sig length is okay!!"); if sig_len_untrusted < 1 { return Err(PacketError::InvalidSignatureLen); } From 921c9f5ae370467d146ad35f16d29f412bbcf300 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 12:29:26 -0800 Subject: [PATCH 05/13] Added more unit tests --- transaction-metrics-tracker/src/lib.rs | 83 ++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 02d7c03d94f068..4ffe7fda9d7ece 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -17,7 +17,7 @@ lazy_static! { pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { // We do not use the highest signature byte as it is not really random let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; - trace!("Matching txn: {match_portion:b} {:b}", *TXN_MASK); + trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK); *TXN_MASK == match_portion } @@ -57,12 +57,16 @@ pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTE mod tests { use { super::*, - solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signature}, + system_transaction, + }, }; #[test] fn test_get_signature_from_packet() { - // default invalid txn packet + // Default invalid txn packet let packet = Packet::default(); let sig = get_signature_from_packet(&packet); assert_eq!(sig, Err(PacketError::InvalidShortVec)); @@ -74,9 +78,80 @@ mod tests { 1, Hash::new_unique(), ); - let packet = Packet::from_data(None, tx).unwrap(); + let mut packet = Packet::from_data(None, tx).unwrap(); let sig = get_signature_from_packet(&packet); assert!(sig.is_ok()); + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = get_signature_from_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); + } + + #[test] + fn test_should_track_transaction() { + let mut sig = [0x0; SIGNATURE_BYTES]; + let track = should_track_transaction(&sig); + assert!(!track); + + // Intentionally matching the randomly generated mask + // The lower four bits are ignored as only 12 highest bits from + // signature's 61 and 62 u8 are used for matching. + // We generate a random one + let mut rng = rand::thread_rng(); + let random_number: u8 = rng.gen_range(0..=15); + sig[61] = ((*TXN_MASK & 0xf as u16) << 4) as u8 | random_number; + sig[62] = (*TXN_MASK >> 4) as u8; + + let track = should_track_transaction(&sig); + assert!(track); + } + + #[test] + fn test_signature_if_should_track_packet() { + // Default invalid txn packet + let packet = Packet::default(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidShortVec)); + + // Use a valid transaction which is not matched + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, tx).unwrap(); + let sig = signature_if_should_track_packet(&packet); + assert_eq!(Ok(None), sig); + + // Now simulate a txn matching the signature mask + let mut tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut sig = [0x0; SIGNATURE_BYTES]; + sig[61] = ((*TXN_MASK & 0xf as u16) << 4) as u8; + sig[62] = (*TXN_MASK >> 4) as u8; + + let sig = Signature::from(sig); + tx.signatures[0] = sig; + let mut packet = Packet::from_data(None, tx).unwrap(); + let sig2 = signature_if_should_track_packet(&packet); + + match sig2 { + Ok(sig) => { + assert!(sig.is_some()); + } + Err(_) => assert!(false, "Expected to get a matching signature!"), + } + + // Invalid signature length + packet.buffer_mut()[0] = 0x0; + let sig = signature_if_should_track_packet(&packet); + assert_eq!(sig, Err(PacketError::InvalidSignatureLen)); } } From 15eb1cb706ec377186b9235e9a310a6b86e8fa1e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 13:08:46 -0800 Subject: [PATCH 06/13] Making stats names consistent across layers --- core/src/sigverify_stage.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 5bf6410328c68c..ff88851cad65cf 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -80,9 +80,9 @@ struct SigVerifierStats { verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch discard_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch dedup_packets_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch - sampled_packets_pp_us_hist: histogram::Histogram, // per-packet time do do overall verify for sampled packets - batches_hist: histogram::Histogram, // number of packet batches per verify call - packets_hist: histogram::Histogram, // number of packets per verify call + process_sampled_packets_us_hist: histogram::Histogram, // per-packet time do do overall verify for sampled packets + batches_hist: histogram::Histogram, // number of packet batches per verify call + packets_hist: histogram::Histogram, // number of packets per verify call num_deduper_saturations: usize, total_batches: usize, total_packets: usize, @@ -185,25 +185,25 @@ impl SigVerifierStats { i64 ), ( - "sampled_verify_packets_pp_us_90pct", - self.sampled_packets_pp_us_hist + "process_sampled_packets_us_90pct", + self.process_sampled_packets_us_hist .percentile(90.0) .unwrap_or(0), i64 ), ( - "sampled_verify_packets_pp_us_min", - self.sampled_packets_pp_us_hist.minimum().unwrap_or(0), + "process_sampled_packets_us_min", + self.process_sampled_packets_us_hist.minimum().unwrap_or(0), i64 ), ( - "sampled_verify_packets_pp_us_max", - self.sampled_packets_pp_us_hist.maximum().unwrap_or(0), + "process_sampled_packets_us_max", + self.process_sampled_packets_us_hist.maximum().unwrap_or(0), i64 ), ( - "sampled_verify_packets_pp_us_mean", - self.sampled_packets_pp_us_hist.mean().unwrap_or(0), + "process_sampled_packets_us_mean", + self.process_sampled_packets_us_hist.mean().unwrap_or(0), i64 ), ( @@ -414,7 +414,7 @@ impl SigVerifyStage { Signature::from(signature) ); stats - .sampled_packets_pp_us_hist + .process_sampled_packets_us_hist .increment(duration.as_micros() as u64) .unwrap(); } From f252ff1ae70f551963256e3c25bcf675fc2ed428 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:17:44 -0800 Subject: [PATCH 07/13] Added more unit tests --- transaction-metrics-tracker/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 4ffe7fda9d7ece..36c0349620275e 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -134,7 +134,7 @@ mod tests { Hash::new_unique(), ); let mut sig = [0x0; SIGNATURE_BYTES]; - sig[61] = ((*TXN_MASK & 0xf as u16) << 4) as u8; + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8; sig[62] = (*TXN_MASK >> 4) as u8; let sig = Signature::from(sig); @@ -146,7 +146,7 @@ mod tests { Ok(sig) => { assert!(sig.is_some()); } - Err(_) => assert!(false, "Expected to get a matching signature!"), + Err(_) => panic!("Expected to get a matching signature!"), } // Invalid signature length From 4d1e1cc6ab4c566d7a46c74f62f5b2280f06e9d8 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:55:09 -0800 Subject: [PATCH 08/13] Added more unit tests --- transaction-metrics-tracker/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs index 36c0349620275e..2baec195de9b84 100644 --- a/transaction-metrics-tracker/src/lib.rs +++ b/transaction-metrics-tracker/src/lib.rs @@ -101,7 +101,7 @@ mod tests { // We generate a random one let mut rng = rand::thread_rng(); let random_number: u8 = rng.gen_range(0..=15); - sig[61] = ((*TXN_MASK & 0xf as u16) << 4) as u8 | random_number; + sig[61] = ((*TXN_MASK & 0xf_u16) << 4) as u8 | random_number; sig[62] = (*TXN_MASK >> 4) as u8; let track = should_track_transaction(&sig); From b12a19e192b476ba93d6d3d7574fbcf8fa7f9387 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 21:31:14 -0800 Subject: [PATCH 09/13] Do not use binary_search, do simple compare in one loop --- streamer/src/nonblocking/quic.rs | 53 ++++++++++++-------------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b3d02d8287b77e..901076da10f9d0 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -30,11 +30,8 @@ use { signature::{Keypair, Signature}, timing, }, - solana_transaction_metrics_tracker::{ - get_signature_from_packet, signature_if_should_track_packet, - }, + solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ - collections::HashMap, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ @@ -633,7 +630,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { - let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -653,11 +650,7 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); - track_streamer_fetch_packet_performance( - &packet_batch, - &mut packet_perf_measure, - stats.clone(), - ); + track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats); if let Err(e) = packet_sender.send(packet_batch) { stats @@ -708,7 +701,7 @@ async fn packet_batch_sender( .ok() .flatten() { - packet_perf_measure.insert(*signature, packet_accumulator.start_time); + packet_perf_measure.push((*signature, packet_accumulator.start_time)); // we set the PERF_TRACK_PACKET on packet_batch[i].meta_mut().set_track_performance(true); } @@ -721,29 +714,23 @@ async fn packet_batch_sender( } fn track_streamer_fetch_packet_performance( - packet_batch: &PacketBatch, - packet_perf_measure: &mut HashMap<[u8; 64], Instant>, - stats: Arc, + packet_perf_measure: &mut Vec<([u8; 64], Instant)>, + stats: &Arc, ) { - for packet in packet_batch.iter() { - if packet.meta().is_perf_track_packet() { - let signature = get_signature_from_packet(packet); - if let Ok(signature) = signature { - if let Some(start_time) = packet_perf_measure.remove(signature) { - let duration = Instant::now().duration_since(start_time); - debug!( - "QUIC streamer fetch stage took {duration:?} for transaction {:?}", - Signature::from(*signature) - ); - stats - .process_sampled_packets_us_hist - .lock() - .unwrap() - .increment(duration.as_micros() as u64) - .unwrap(); - } - } - } + if packet_perf_measure.is_empty() { + return; + } + let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap(); + + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + process_sampled_packets_us_hist + .increment(duration.as_micros() as u64) + .unwrap(); } } From caf6a3c4b168c094aa61eff4a286ea6af6e7bd6e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 21:47:32 -0800 Subject: [PATCH 10/13] measure perf track overhead --- Cargo.lock | 1 + streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 6 ++++++ streamer/src/quic.rs | 8 +++++++- 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index fc83d74372bed7..540b5632db8627 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7191,6 +7191,7 @@ dependencies = [ "rand 0.8.5", "rustls", "solana-logger", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 22170e6426c433..55d0030e734607 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -26,6 +26,7 @@ quinn = { workspace = true } quinn-proto = { workspace = true } rand = { workspace = true } rustls = { workspace = true, features = ["dangerous_configuration"] } +solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 901076da10f9d0..0a5b0ef3fe6c03 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -17,6 +17,7 @@ use { quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, + solana_measure::measure::Measure, solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH}, solana_sdk::{ packet::{Meta, PACKET_DATA_SIZE}, @@ -720,6 +721,7 @@ fn track_streamer_fetch_packet_performance( if packet_perf_measure.is_empty() { return; } + let mut measure = Measure::start("track_perf"); let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap(); for (signature, start_time) in packet_perf_measure.iter() { @@ -732,6 +734,10 @@ fn track_streamer_fetch_packet_performance( .increment(duration.as_micros() as u64) .unwrap(); } + measure.stop(); + stats + .perf_track_overhead_us + .fetch_add(measure.as_us(), Ordering::Relaxed); } async fn handle_connection( diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 99c4b7c63f23f4..3c9d95b2333c42 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -16,7 +16,7 @@ use { std::{ net::UdpSocket, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, }, thread, @@ -176,6 +176,7 @@ pub struct StreamStats { pub(crate) stream_load_ema_overflow: AtomicUsize, pub(crate) stream_load_capacity_overflow: AtomicUsize, pub(crate) process_sampled_packets_us_hist: Mutex, + pub(crate) perf_track_overhead_us: AtomicU64, } impl StreamStats { @@ -449,6 +450,11 @@ impl StreamStats { process_sampled_packets_us_hist.mean().unwrap_or(0), i64 ), + ( + "perf_track_overhead_us", + self.perf_track_overhead_us.swap(0, Ordering::Relaxed), + i64 + ), ); } } From ee7a5e270312f408682a6989638f6d2cbe07eb5f Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 29 Feb 2024 21:48:45 -0800 Subject: [PATCH 11/13] missing cargo.lock --- programs/sbf/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index d1be0ac6a7d351..f466c7e879476c 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6265,6 +6265,7 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "solana-measure", "solana-metrics", "solana-perf", "solana-sdk", From d566aa0dd1626734b56587bc1752c80a34821b61 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 1 Mar 2024 01:47:03 -0800 Subject: [PATCH 12/13] Do not use Hashmap for perf track. Using vec. Measure the overhead of perf track --- core/src/sigverify_stage.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ff88851cad65cf..f41d2b1d192f16 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -22,7 +22,6 @@ use { solana_streamer::streamer::{self, StreamerError}, solana_transaction_metrics_tracker::get_signature_from_packet, std::{ - collections::HashMap, thread::{self, Builder, JoinHandle}, time::Instant, }, @@ -96,6 +95,7 @@ struct SigVerifierStats { total_discard_random_time_us: usize, total_verify_time_us: usize, total_shrink_time_us: usize, + perf_track_overhead_us: usize, } impl SigVerifierStats { @@ -239,6 +239,7 @@ impl SigVerifierStats { ), ("total_verify_time_us", self.total_verify_time_us, i64), ("total_shrink_time_us", self.total_shrink_time_us, i64), + ("perf_track_overhead_us", self.perf_track_overhead_us, i64), ); } } @@ -321,20 +322,26 @@ impl SigVerifyStage { verifier: &mut T, stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { - let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); + let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default(); let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + + let mut start_perf_track_measure = Measure::start("start_perf_track"); // track sigverify start time for interested packets for batch in &batches { for packet in batch.iter() { if packet.meta().is_perf_track_packet() { let signature = get_signature_from_packet(packet); if let Ok(signature) = signature { - packet_perf_measure.insert(*signature, Instant::now()); + packet_perf_measure.push((*signature, Instant::now())); } } } } + start_perf_track_measure.stop(); + + stats.perf_track_overhead_us = start_perf_track_measure.as_us() as usize; + let batches_len = batches.len(); debug!( "@{:?} verifier: verifying: {}", @@ -407,17 +414,22 @@ impl SigVerifyStage { (num_packets as f32 / verify_time.as_s()) ); - for (signature, start_time) in packet_perf_measure.drain() { - let duration = Instant::now().duration_since(start_time); + let mut perf_track_end_measure = Measure::start("perf_track_end"); + for (signature, start_time) in packet_perf_measure.iter() { + let duration = Instant::now().duration_since(*start_time); debug!( "Sigverify took {duration:?} for transaction {:?}", - Signature::from(signature) + Signature::from(*signature) ); stats .process_sampled_packets_us_hist .increment(duration.as_micros() as u64) .unwrap(); } + + perf_track_end_measure.stop(); + stats.perf_track_overhead_us += perf_track_end_measure.as_us() as usize; + stats .recv_batches_us_hist .increment(recv_duration.as_micros() as u64) From 6a4f9740836625a614e7f1b449c851676a33eba4 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 1 Mar 2024 02:03:37 -0800 Subject: [PATCH 13/13] Clippy issue --- streamer/src/nonblocking/quic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0a5b0ef3fe6c03..3485e4fe585d06 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -715,7 +715,7 @@ async fn packet_batch_sender( } fn track_streamer_fetch_packet_performance( - packet_perf_measure: &mut Vec<([u8; 64], Instant)>, + packet_perf_measure: &mut [([u8; 64], Instant)], stats: &Arc, ) { if packet_perf_measure.is_empty() {