Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw authored Feb 10, 2024
2 parents b70cb12 + 4593222 commit 87ddb11
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 38 deletions.
41 changes: 32 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.12", optional = true }
humantime = "2.1.0"
prost = "0.12"
prost-types = "0.12.0"

# Required for recording:
Expand Down
97 changes: 68 additions & 29 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};
use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};

use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::{Duration, Instant},
};

use console_api as proto;
use prost::Message;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
use tracing_core::{span::Id, Metadata};

use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};

mod id_data;
mod shrink;
use self::id_data::{IdData, Include};
use self::shrink::{ShrinkMap, ShrinkVec};

/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE
const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;

/// Aggregates instrumentation traces and prepares state for the instrument
/// server.
///
Expand Down Expand Up @@ -269,31 +274,65 @@ impl Aggregator {
.drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
self.async_ops
.drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
if !has_watchers {
self.poll_ops.clear();
}
}

/// Add the task subscription to the watchers after sending the first update
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
tracing::debug!("new instrument subscription");

let task_update = Some(self.task_update(Include::All));
let resource_update = Some(self.resource_update(Include::All));
let async_op_update = Some(self.async_op_update(Include::All));
let now = Instant::now();

let update = &proto::instrument::Update {
task_update,
resource_update,
async_op_update,
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
let update = loop {
let update = proto::instrument::Update {
task_update: Some(self.task_update(Include::All)),
resource_update: Some(self.resource_update(Include::All)),
async_op_update: Some(self.async_op_update(Include::All)),
now: Some(self.base_time.to_timestamp(now)),
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
};
let message_size = update.encoded_len();
if message_size < MAX_MESSAGE_SIZE {
// normal case
break Some(update);
}
// If the grpc message is bigger than tokio-console will accept, throw away the oldest
// inactive data and try again
self.retention /= 2;
self.cleanup_closed();
tracing::debug!(
retention = ?self.retention,
message_size,
max_message_size = MAX_MESSAGE_SIZE,
"Message too big, reduced retention",
);

if self.retention <= self.publish_interval {
self.retention = self.publish_interval;
break None;
}
};

// Send the initial state --- if this fails, the subscription is already dead
if subscription.update(update) {
self.watchers.push(subscription)
match update {
// Send the initial state
Some(update) => {
if !subscription.update(&update) {
// If sending the initial update fails, the subscription is already dead,
// so don't add it to `watchers`.
return;
}
}
// User will only get updates.
None => tracing::error!(
min_retention = ?self.publish_interval,
"Message too big. Start with smaller retention.",
),
}

self.watchers.push(subscription);
}

fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {
Expand All @@ -305,14 +344,10 @@ impl Aggregator {
}

fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => self.poll_ops.clone(),
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
new_poll_ops: std::mem::take(&mut self.poll_ops),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}
}
Expand Down Expand Up @@ -472,6 +507,10 @@ impl Aggregator {
task_id,
is_ready,
} => {
// CLI doesn't show historical poll ops, so don't save them if no-one is watching
if self.watchers.is_empty() {
return;
}
let poll_op = proto::resources::PollOp {
metadata: Some(metadata.into()),
resource_id: Some(resource_id.into()),
Expand Down
3 changes: 3 additions & 0 deletions tokio-console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ prost-types = "0.12"
crossterm = { version = "0.26.1", features = ["event-stream"] }
color-eyre = { version = "0.6", features = ["issue-url"] }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
# Keep this in sync with the version from `tonic`.
# Because we inspect the error from tonic, we need to make sure that the
# version of h2 we use is compatible with the version of tonic we use.
h2 = "0.3"
regex = "1.5"
once_cell = "1.8"
Expand Down
4 changes: 4 additions & 0 deletions tokio-console/src/state/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ impl Task {
self.stats.last_wake > self.stats.last_poll_started
}

pub(crate) fn is_blocking(&self) -> bool {
matches!(self.kind.as_ref(), "block_on" | "blocking")
}

pub(crate) fn is_completed(&self) -> bool {
self.stats.total.is_some()
}
Expand Down
12 changes: 12 additions & 0 deletions tokio-console/src/warnings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl Warn<Task> for SelfWakePercent {
}

fn check(&self, task: &Task) -> Warning {
// Don't fire warning for tasks that are not async
if task.is_blocking() {
return Warning::Ok;
}
let self_wakes = task.self_wake_percent();
if self_wakes > self.min_percent {
Warning::Warn
Expand All @@ -174,6 +178,10 @@ impl Warn<Task> for LostWaker {
}

fn check(&self, task: &Task) -> Warning {
// Don't fire warning for tasks that are not async
if task.is_blocking() {
return Warning::Ok;
}
if !task.is_completed()
&& task.waker_count() == 0
&& !task.is_running()
Expand Down Expand Up @@ -222,6 +230,10 @@ impl Warn<Task> for NeverYielded {
}

fn check(&self, task: &Task) -> Warning {
// Don't fire warning for tasks that are not async
if task.is_blocking() {
return Warning::Ok;
}
// Don't fire warning for tasks that are waiting to run
if task.state() != TaskState::Running {
return Warning::Ok;
Expand Down

0 comments on commit 87ddb11

Please sign in to comment.