Skip to content

Commit

Permalink
transaction performance tracking -- streamer stage (#257)
Browse files Browse the repository at this point in the history
* transaction performance tracking -- streamer stage

(cherry picked from commit 2b03910)

# Conflicts:
#	Cargo.toml
  • Loading branch information
lijunwangs committed Apr 5, 2024
1 parent a093e23 commit 945020b
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 3 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ members = [
"tokens",
"tpu-client",
"transaction-dos",
"transaction-metrics-tracker",
"transaction-status",
"turbine",
"udp-client",
Expand Down Expand Up @@ -348,6 +349,7 @@ solana-merkle-tree = { path = "merkle-tree", version = "=1.18.10" }
solana-metrics = { path = "metrics", version = "=1.18.10" }
solana-net-utils = { path = "net-utils", version = "=1.18.10" }
solana-nohash-hasher = "0.2.1"
<<<<<<< HEAD
solana-notifier = { path = "notifier", version = "=1.18.10" }
solana-perf = { path = "perf", version = "=1.18.10" }
solana-poh = { path = "poh", version = "=1.18.10" }
Expand Down Expand Up @@ -387,6 +389,49 @@ solana-wen-restart = { path = "wen-restart", version = "=1.18.10" }
solana-zk-keygen = { path = "zk-keygen", version = "=1.18.10" }
solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=1.18.10" }
solana-zk-token-sdk = { path = "zk-token-sdk", version = "=1.18.10" }
=======
solana-notifier = { path = "notifier", version = "=2.0.0" }
solana-perf = { path = "perf", version = "=2.0.0" }
solana-poh = { path = "poh", version = "=2.0.0" }
solana-program = { path = "sdk/program", version = "=2.0.0" }
solana-program-runtime = { path = "program-runtime", version = "=2.0.0" }
solana-program-test = { path = "program-test", version = "=2.0.0" }
solana-pubsub-client = { path = "pubsub-client", version = "=2.0.0" }
solana-quic-client = { path = "quic-client", version = "=2.0.0" }
solana-rayon-threadlimit = { path = "rayon-threadlimit", version = "=2.0.0" }
solana-remote-wallet = { path = "remote-wallet", version = "=2.0.0", default-features = false }
solana-unified-scheduler-logic = { path = "unified-scheduler-logic", version = "=2.0.0" }
solana-unified-scheduler-pool = { path = "unified-scheduler-pool", version = "=2.0.0" }
solana-rpc = { path = "rpc", version = "=2.0.0" }
solana-rpc-client = { path = "rpc-client", version = "=2.0.0", default-features = false }
solana-rpc-client-api = { path = "rpc-client-api", version = "=2.0.0" }
solana-rpc-client-nonce-utils = { path = "rpc-client-nonce-utils", version = "=2.0.0" }
solana-runtime = { path = "runtime", version = "=2.0.0" }
solana-runtime-transaction = { path = "runtime-transaction", version = "=2.0.0" }
solana-sdk = { path = "sdk", version = "=2.0.0" }
solana-sdk-macro = { path = "sdk/macro", version = "=2.0.0" }
solana-send-transaction-service = { path = "send-transaction-service", version = "=2.0.0" }
solana-stake-program = { path = "programs/stake", version = "=2.0.0" }
solana-storage-bigtable = { path = "storage-bigtable", version = "=2.0.0" }
solana-storage-proto = { path = "storage-proto", version = "=2.0.0" }
solana-streamer = { path = "streamer", version = "=2.0.0" }
solana-svm = { path = "svm", version = "=2.0.0" }
solana-system-program = { path = "programs/system", version = "=2.0.0" }
solana-test-validator = { path = "test-validator", version = "=2.0.0" }
solana-thin-client = { path = "thin-client", version = "=2.0.0" }
solana-tpu-client = { path = "tpu-client", version = "=2.0.0", default-features = false }
solana-transaction-status = { path = "transaction-status", version = "=2.0.0" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.0.0" }
solana-turbine = { path = "turbine", version = "=2.0.0" }
solana-udp-client = { path = "udp-client", version = "=2.0.0" }
solana-version = { path = "version", version = "=2.0.0" }
solana-vote = { path = "vote", version = "=2.0.0" }
solana-vote-program = { path = "programs/vote", version = "=2.0.0" }
solana-wen-restart = { path = "wen-restart", version = "=2.0.0" }
solana-zk-keygen = { path = "zk-keygen", version = "=2.0.0" }
solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=2.0.0" }
solana-zk-token-sdk = { path = "zk-token-sdk", version = "=2.0.0" }
>>>>>>> 2b0391049d (transaction performance tracking -- streamer stage (#257))
solana_rbpf = "=0.8.0"
spl-associated-token-account = "=2.3.0"
spl-instruction-padding = "0.1"
Expand Down
16 changes: 16 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;
}
}

Expand Down Expand Up @@ -228,6 +230,12 @@ impl Meta {
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
}

#[inline]
pub fn set_track_performance(&mut self, is_performance_track: bool) {
self.flags
.set(PacketFlags::PERF_TRACK_PACKET, is_performance_track);
}

#[inline]
pub fn set_simple_vote(&mut self, is_simple_vote: bool) {
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
Expand Down Expand Up @@ -261,6 +269,11 @@ impl Meta {
self.flags.contains(PacketFlags::TRACER_PACKET)
}

#[inline]
pub fn is_perf_track_packet(&self) -> bool {
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
}

#[inline]
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
Expand Down
2 changes: 2 additions & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ quinn-proto = { workspace = true }
rand = { workspace = true }
rcgen = { workspace = true }
rustls = { workspace = true, features = ["dangerous_configuration"] }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
x509-parser = { workspace = true }
Expand Down
44 changes: 43 additions & 1 deletion streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use {
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH},
solana_sdk::{
packet::{Meta, PACKET_DATA_SIZE},
Expand All @@ -27,9 +28,10 @@ use {
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
},
signature::Keypair,
signature::{Keypair, Signature},
timing,
},
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -94,6 +96,7 @@ struct PacketChunk {
struct PacketAccumulator {
pub meta: Meta,
pub chunks: Vec<PacketChunk>,
pub start_time: Instant,
}

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -647,6 +650,7 @@ async fn packet_batch_sender(
trace!("enter packet_batch_sender");
let mut batch_start_time = Instant::now();
loop {
let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default();
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0;

Expand All @@ -666,6 +670,8 @@ async fn packet_batch_sender(
|| (!packet_batch.is_empty() && elapsed >= coalesce)
{
let len = packet_batch.len();
track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats);

if let Err(e) = packet_sender.send(packet_batch) {
stats
.total_packet_batch_send_err
Expand Down Expand Up @@ -711,6 +717,14 @@ async fn packet_batch_sender(

total_bytes += packet_batch[i].meta().size;

if let Some(signature) = signature_if_should_track_packet(&packet_batch[i])
.ok()
.flatten()
{
packet_perf_measure.push((*signature, packet_accumulator.start_time));
// we set the PERF_TRACK_PACKET on
packet_batch[i].meta_mut().set_track_performance(true);
}
stats
.total_chunks_processed_by_batcher
.fetch_add(num_chunks, Ordering::Relaxed);
Expand All @@ -719,6 +733,32 @@ async fn packet_batch_sender(
}
}

fn track_streamer_fetch_packet_performance(
packet_perf_measure: &mut [([u8; 64], Instant)],
stats: &Arc<StreamStats>,
) {
if packet_perf_measure.is_empty() {
return;
}
let mut measure = Measure::start("track_perf");
let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap();

for (signature, start_time) in packet_perf_measure.iter() {
let duration = Instant::now().duration_since(*start_time);
debug!(
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
process_sampled_packets_us_hist
.increment(duration.as_micros() as u64)
.unwrap();
}
measure.stop();
stats
.perf_track_overhead_us
.fetch_add(measure.as_us(), Ordering::Relaxed);
}

async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
Expand Down Expand Up @@ -873,6 +913,7 @@ async fn handle_chunk(
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
start_time: Instant::now(),
});
}

Expand Down Expand Up @@ -1476,6 +1517,7 @@ pub mod test {
offset,
end_of_chunk: size,
}],
start_time: Instant::now(),
};
ptk_sender.send(packet_accum).await.unwrap();
}
Expand Down
45 changes: 43 additions & 2 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use {
std::{
net::{IpAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
thread,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -179,10 +179,19 @@ pub struct StreamStats {
pub(crate) stream_load_ema: AtomicUsize,
pub(crate) stream_load_ema_overflow: AtomicUsize,
pub(crate) stream_load_capacity_overflow: AtomicUsize,
pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
pub(crate) perf_track_overhead_us: AtomicU64,
}

impl StreamStats {
pub fn report(&self, name: &'static str) {
let process_sampled_packets_us_hist = {
let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
let process_sampled_packets_us_hist = metrics.clone();
metrics.clear();
process_sampled_packets_us_hist
};

datapoint_info!(
name,
(
Expand Down Expand Up @@ -429,6 +438,38 @@ impl StreamStats {
self.stream_load_capacity_overflow.load(Ordering::Relaxed),
i64
),
(
"process_sampled_packets_us_90pct",
process_sampled_packets_us_hist
.percentile(90.0)
.unwrap_or(0),
i64
),
(
"process_sampled_packets_us_min",
process_sampled_packets_us_hist.minimum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_max",
process_sampled_packets_us_hist.maximum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_mean",
process_sampled_packets_us_hist.mean().unwrap_or(0),
i64
),
(
"process_sampled_packets_count",
process_sampled_packets_us_hist.entries(),
i64
),
(
"perf_track_overhead_us",
self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down
25 changes: 25 additions & 0 deletions transaction-metrics-tracker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "solana-transaction-metrics-tracker"
description = "Solana transaction metrics tracker"
documentation = "https://docs.rs/solana-transaction-metrics-tracker"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = false

[dependencies]
Inflector = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
# Update this borsh dependency to the workspace version once
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
Loading

0 comments on commit 945020b

Please sign in to comment.