From aab98e7e643f0e5a90c30afac5669e01e0fe40b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Mon, 22 Jan 2024 18:05:51 +0800 Subject: [PATCH 1/6] chore: add comments to h2 dependency (#499) We need to keep our `h2` dependency in sync with the version of `tonic` that we use as we're downcasting an error. This is not really supported as stable in the `tonic` API, so it may need to be updated in the future. Signed-off-by: hi-rustin --- tokio-console/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio-console/Cargo.toml b/tokio-console/Cargo.toml index 68740071a..ca58b68f2 100644 --- a/tokio-console/Cargo.toml +++ b/tokio-console/Cargo.toml @@ -44,6 +44,9 @@ prost-types = "0.12" crossterm = { version = "0.26.1", features = ["event-stream"] } color-eyre = { version = "0.6", features = ["issue-url"] } hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] } +# Keep this in sync with the version from `tonic`. +# Because we inspect the error from tonic, we need to make sure that the +# version of h2 we use is compatible with the version of tonic we use. h2 = "0.3" regex = "1.5" once_cell = "1.8" From 1656c791af35bb0500bb6dd3c60344a0ceb12520 Mon Sep 17 00:00:00 2001 From: Graham King Date: Mon, 22 Jan 2024 12:47:51 -0500 Subject: [PATCH 2/6] fix(subscriber): Don't save poll_ops if no-one is receiving them (#501) Do not record poll_ops if there are no current connected clients (watchers). Without this `Aggregator::poll_ops` would grow forever. Follow up to https://github.com/tokio-rs/console/pull/311 and fix for these two: - https://github.com/tokio-rs/console/issues/184 - https://github.com/tokio-rs/console/pull/500 Fixes #184 Co-authored-by: Graham King Co-authored-by: Hayden Stainsby --- console-subscriber/src/aggregator/mod.rs | 32 +++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 88d89ca1f..e3a1eb62a 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,12 +1,3 @@ -use super::{Command, Event, Shared, Watch}; -use crate::{ - stats::{self, Unsent}, - ToProto, WatchRequest, -}; -use console_api as proto; -use proto::resources::resource; -use tokio::sync::{mpsc, Notify}; - use std::{ sync::{ atomic::{AtomicBool, Ordering::*}, @@ -14,8 +5,18 @@ use std::{ }, time::{Duration, Instant}, }; + +use console_api as proto; +use proto::resources::resource; +use tokio::sync::{mpsc, Notify}; use tracing_core::{span::Id, Metadata}; +use super::{Command, Event, Shared, Watch}; +use crate::{ + stats::{self, Unsent}, + ToProto, WatchRequest, +}; + mod id_data; mod shrink; use self::id_data::{IdData, Include}; @@ -269,6 +270,9 @@ impl Aggregator { .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers); self.async_ops .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers); + if !has_watchers { + self.poll_ops.clear(); + } } /// Add the task subscription to the watchers after sending the first update @@ -305,14 +309,10 @@ impl Aggregator { } fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate { - let new_poll_ops = match include { - Include::All => self.poll_ops.clone(), - Include::UpdatedOnly => std::mem::take(&mut self.poll_ops), - }; proto::resources::ResourceUpdate { new_resources: self.resources.as_proto_list(include, &self.base_time), stats_update: self.resource_stats.as_proto(include, &self.base_time), - new_poll_ops, + new_poll_ops: std::mem::take(&mut self.poll_ops), dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, } } @@ -472,6 +472,10 @@ impl Aggregator { task_id, is_ready, } => { + // CLI doesn't show historical poll ops, so don't save them if no-one is watching + if self.watchers.is_empty() { + return; + } let poll_op = proto::resources::PollOp { metadata: Some(metadata.into()), resource_id: Some(resource_id.into()), From 4ce24bffd1efc5384208019a53db1f90e690b395 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 6 Feb 2024 09:25:32 -0800 Subject: [PATCH 3/6] chore(deps): bump shlex from 1.2.0 to 1.3.0 (#511) --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fc6acbc6..6f62162ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1498,9 +1498,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook" From b0e3196bdd25f7b191e76c52e532930212684c66 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 6 Feb 2024 17:30:44 +0000 Subject: [PATCH 4/6] chore(deps): bump h2 from 0.3.18 to 0.3.24 (#509) Bumps [h2](https://github.com/hyperium/h2) from 0.3.18 to 0.3.24. - [Release notes](https://github.com/hyperium/h2/releases) - [Changelog](https://github.com/hyperium/h2/blob/v0.3.24/CHANGELOG.md) - [Commits](https://github.com/hyperium/h2/compare/v0.3.18...v0.3.24) --- updated-dependencies: - dependency-name: h2 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f62162ea..78dda3bb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,6 +463,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.3" @@ -663,9 +669,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.18" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" dependencies = [ "bytes", "fnv", @@ -673,7 +679,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", @@ -686,6 +692,12 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" + [[package]] name = "hdrhistogram" version = "7.5.0" @@ -830,7 +842,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", +] + +[[package]] +name = "indexmap" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +dependencies = [ + "equivalent", + "hashbrown 0.14.3", ] [[package]] @@ -1128,7 +1150,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 1.8.1", ] [[package]] @@ -1811,7 +1833,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5376256e44f2443f8896ac012507c19a012df0fe8758b55246ae51a2279db51f" dependencies = [ "combine", - "indexmap", + "indexmap 1.8.1", "itertools 0.10.3", "serde", ] @@ -1864,7 +1886,7 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "indexmap", + "indexmap 1.8.1", "pin-project", "pin-project-lite", "rand", From bd3dd71eb0645c028858967ed5b3f14ed34d0605 Mon Sep 17 00:00:00 2001 From: Graham King Date: Wed, 7 Feb 2024 12:31:00 -0500 Subject: [PATCH 5/6] feat(subscriber): Reduce retention period to fit in max message size (#503) If the initial update message would be too big for tokio-console's grpc decoder, reduce the retention period and try again. Currently the default retention period is 1 hour. That can easily grow to more than the max grpc decode message size (4 MiB), at which point tokio-console won't connect. There's really no minimum safe duration for retention. It depends on how busy the app is and on how much trace data runtime and tokio log. Here we repeatedly divide the retention period in half until it fits in the message. Co-authored-by: Graham King --- Cargo.lock | 1 + console-subscriber/Cargo.toml | 1 + console-subscriber/src/aggregator/mod.rs | 65 ++++++++++++++++++------ 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78dda3bb1..0b1341c7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,6 +329,7 @@ dependencies = [ "hdrhistogram", "humantime", "parking_lot", + "prost", "prost-types", "serde", "serde_json", diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 113a4ceb3..7bdf8c8ef 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -46,6 +46,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria # feature to also enable `tracing-subscriber`'s parking_lot feature flag. parking_lot_crate = { package = "parking_lot", version = "0.12", optional = true } humantime = "2.1.0" +prost = "0.12" prost-types = "0.12.0" # Required for recording: diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index e3a1eb62a..b6846d62e 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -7,6 +7,7 @@ use std::{ }; use console_api as proto; +use prost::Message; use proto::resources::resource; use tokio::sync::{mpsc, Notify}; use tracing_core::{span::Id, Metadata}; @@ -22,6 +23,9 @@ mod shrink; use self::id_data::{IdData, Include}; use self::shrink::{ShrinkMap, ShrinkVec}; +/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE +const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024; + /// Aggregates instrumentation traces and prepares state for the instrument /// server. /// @@ -278,26 +282,57 @@ impl Aggregator { /// Add the task subscription to the watchers after sending the first update fn add_instrument_subscription(&mut self, subscription: Watch) { tracing::debug!("new instrument subscription"); - - let task_update = Some(self.task_update(Include::All)); - let resource_update = Some(self.resource_update(Include::All)); - let async_op_update = Some(self.async_op_update(Include::All)); let now = Instant::now(); - let update = &proto::instrument::Update { - task_update, - resource_update, - async_op_update, - now: Some(self.base_time.to_timestamp(now)), - new_metadata: Some(proto::RegisterMetadata { - metadata: (*self.all_metadata).clone(), - }), + let update = loop { + let update = proto::instrument::Update { + task_update: Some(self.task_update(Include::All)), + resource_update: Some(self.resource_update(Include::All)), + async_op_update: Some(self.async_op_update(Include::All)), + now: Some(self.base_time.to_timestamp(now)), + new_metadata: Some(proto::RegisterMetadata { + metadata: (*self.all_metadata).clone(), + }), + }; + let message_size = update.encoded_len(); + if message_size < MAX_MESSAGE_SIZE { + // normal case + break Some(update); + } + // If the grpc message is bigger than tokio-console will accept, throw away the oldest + // inactive data and try again + self.retention /= 2; + self.cleanup_closed(); + tracing::debug!( + retention = ?self.retention, + message_size, + max_message_size = MAX_MESSAGE_SIZE, + "Message too big, reduced retention", + ); + + if self.retention <= self.publish_interval { + self.retention = self.publish_interval; + break None; + } }; - // Send the initial state --- if this fails, the subscription is already dead - if subscription.update(update) { - self.watchers.push(subscription) + match update { + // Send the initial state + Some(update) => { + if !subscription.update(&update) { + // If sending the initial update fails, the subscription is already dead, + // so don't add it to `watchers`. + return; + } + } + // User will only get updates. + None => tracing::error!( + min_retention = ?self.publish_interval, + "Message too big. Start with smaller retention.", + ), } + + self.watchers.push(subscription); } fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate { From 45932229fb5aea7a4994a7644bded9baf2776ea8 Mon Sep 17 00:00:00 2001 From: mersey <73640929+javihernant@users.noreply.github.com> Date: Thu, 8 Feb 2024 18:49:29 +0100 Subject: [PATCH 6/6] fix(console): only trigger lints on async tasks (#517) This branch changes existing lints to only trigger with tasks that are async; those are all except those with kind 'blocking' and 'block_on'. Fixes #516 --- tokio-console/src/state/tasks.rs | 4 ++++ tokio-console/src/warnings.rs | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index d39278664..59b966a5b 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -354,6 +354,10 @@ impl Task { self.stats.last_wake > self.stats.last_poll_started } + pub(crate) fn is_blocking(&self) -> bool { + matches!(self.kind.as_ref(), "block_on" | "blocking") + } + pub(crate) fn is_completed(&self) -> bool { self.stats.total.is_some() } diff --git a/tokio-console/src/warnings.rs b/tokio-console/src/warnings.rs index 59d43f99f..b7d6ec44e 100644 --- a/tokio-console/src/warnings.rs +++ b/tokio-console/src/warnings.rs @@ -148,6 +148,10 @@ impl Warn for SelfWakePercent { } fn check(&self, task: &Task) -> Warning { + // Don't fire warning for tasks that are not async + if task.is_blocking() { + return Warning::Ok; + } let self_wakes = task.self_wake_percent(); if self_wakes > self.min_percent { Warning::Warn @@ -174,6 +178,10 @@ impl Warn for LostWaker { } fn check(&self, task: &Task) -> Warning { + // Don't fire warning for tasks that are not async + if task.is_blocking() { + return Warning::Ok; + } if !task.is_completed() && task.waker_count() == 0 && !task.is_running() @@ -222,6 +230,10 @@ impl Warn for NeverYielded { } fn check(&self, task: &Task) -> Warning { + // Don't fire warning for tasks that are not async + if task.is_blocking() { + return Warning::Ok; + } // Don't fire warning for tasks that are waiting to run if task.state() != TaskState::Running { return Warning::Ok;