From f3cf5e20bbd875ab1b692cc71c286a7d29d03389 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 4 Jul 2024 22:15:40 +0800 Subject: [PATCH] feat: add temporality state in update --- console-api/proto/instrument.proto | 57 +++++++++++-------- .../generated/rs.tokio.console.instrument.rs | 32 +++++++++++ console-subscriber/src/aggregator/mod.rs | 21 +++---- tokio-console/src/state/mod.rs | 23 ++++++-- 4 files changed, 92 insertions(+), 41 deletions(-) diff --git a/console-api/proto/instrument.proto b/console-api/proto/instrument.proto index b2b96c47a..f218445b7 100644 --- a/console-api/proto/instrument.proto +++ b/console-api/proto/instrument.proto @@ -10,14 +10,14 @@ import "async_ops.proto"; // `InstrumentServer` implements `Instrument` as a service. service Instrument { - // Produces a stream of updates representing the behavior of the instrumented async runtime. - rpc WatchUpdates(InstrumentRequest) returns (stream Update) {} - // Produces a stream of updates describing the activity of a specific task. - rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {} - // Registers that the console observer wants to pause the stream. - rpc Pause(PauseRequest) returns (PauseResponse) {} - // Registers that the console observer wants to resume the stream. - rpc Resume(ResumeRequest) returns (ResumeResponse) {} + // Produces a stream of updates representing the behavior of the instrumented async runtime. + rpc WatchUpdates(InstrumentRequest) returns (stream Update) {} + // Produces a stream of updates describing the activity of a specific task. + rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {} + // Registers that the console observer wants to pause the stream. + rpc Pause(PauseRequest) returns (PauseResponse) {} + // Registers that the console observer wants to resume the stream. + rpc Resume(ResumeRequest) returns (ResumeResponse) {} } // InstrumentRequest requests the stream of updates @@ -32,8 +32,8 @@ message InstrumentRequest { // TaskDetailsRequest requests the stream of updates about // the specific task identified in the request. message TaskDetailsRequest { - // Identifies the task for which details were requested. - common.Id id = 1; + // Identifies the task for which details were requested. + common.Id id = 1; } // PauseRequest requests the stream of updates to pause. @@ -53,23 +53,34 @@ message ResumeRequest { // - things such as async ops and resource ops do not make sense // on their own as they have relations to tasks and resources message Update { - // The system time when this update was recorded. - // - // This is the timestamp any durations in the included `Stats` were - // calculated relative to. - google.protobuf.Timestamp now = 1; + // The system time when this update was recorded. + // + // This is the timestamp any durations in the included `Stats` were + // calculated relative to. + google.protobuf.Timestamp now = 1; - // Task state update. - tasks.TaskUpdate task_update = 2; + // Task state update. + tasks.TaskUpdate task_update = 2; - // Resource state update. - resources.ResourceUpdate resource_update = 3; + // Resource state update. + resources.ResourceUpdate resource_update = 3; - // Async operations state update - async_ops.AsyncOpUpdate async_op_update = 4; + // Async operations state update + async_ops.AsyncOpUpdate async_op_update = 4; - // Any new span metadata that was registered since the last update. - common.RegisterMetadata new_metadata = 5; + // Any new span metadata that was registered since the last update. + common.RegisterMetadata new_metadata = 5; + + // The time "state" of the aggregator, such as paused or live. + Temporality temporality = 6; +} + +// The time "state" of the aggregator. +enum Temporality { + // The aggregator is currently live. + LIVE = 0; + // The aggregator is currently paused. + PAUSED = 1; } // `PauseResponse` is the value returned after a pause request. diff --git a/console-api/src/generated/rs.tokio.console.instrument.rs b/console-api/src/generated/rs.tokio.console.instrument.rs index 978f994fa..563dcb9c6 100644 --- a/console-api/src/generated/rs.tokio.console.instrument.rs +++ b/console-api/src/generated/rs.tokio.console.instrument.rs @@ -53,6 +53,9 @@ pub struct Update { /// Any new span metadata that was registered since the last update. #[prost(message, optional, tag = "5")] pub new_metadata: ::core::option::Option, + /// The time "state" of the aggregator, such as paused or live. + #[prost(enumeration = "Temporality", tag = "6")] + pub temporality: i32, } /// `PauseResponse` is the value returned after a pause request. #[allow(clippy::derive_partial_eq_without_eq)] @@ -62,6 +65,35 @@ pub struct PauseResponse {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ResumeResponse {} +/// The time "state" of the aggregator. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum Temporality { + /// The aggregator is currently live. + Live = 0, + /// The aggregator is currently paused. + Paused = 1, +} +impl Temporality { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Temporality::Live => "LIVE", + Temporality::Paused => "PAUSED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "LIVE" => Some(Self::Live), + "PAUSED" => Some(Self::Paused), + _ => None, + } + } +} /// Generated client implementations. pub mod instrument_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index b6846d62e..e05c8d45c 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -6,17 +6,17 @@ use std::{ time::{Duration, Instant}, }; -use console_api as proto; -use prost::Message; -use proto::resources::resource; -use tokio::sync::{mpsc, Notify}; -use tracing_core::{span::Id, Metadata}; - use super::{Command, Event, Shared, Watch}; use crate::{ stats::{self, Unsent}, ToProto, WatchRequest, }; +use console_api as proto; +use console_api::instrument::Temporality; +use prost::Message; +use proto::resources::resource; +use tokio::sync::{mpsc, Notify}; +use tracing_core::{span::Id, Metadata}; mod id_data; mod shrink; @@ -89,7 +89,7 @@ pub struct Aggregator { poll_ops: Vec, /// The time "state" of the aggregator, such as paused or live. - temporality: Temporality, + temporality: proto::instrument::Temporality, /// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a /// timestamp that can be sent over the wire. @@ -102,11 +102,6 @@ pub(crate) struct Flush { triggered: AtomicBool, } -#[derive(Debug)] -enum Temporality { - Live, - Paused, -} // Represent static data for resources struct Resource { id: Id, @@ -293,6 +288,7 @@ impl Aggregator { new_metadata: Some(proto::RegisterMetadata { metadata: (*self.all_metadata).clone(), }), + temporality: self.temporality.into(), }; let message_size = update.encoded_len(); if message_size < MAX_MESSAGE_SIZE { @@ -417,6 +413,7 @@ impl Aggregator { task_update, resource_update, async_op_update, + temporality: self.temporality.into(), }; self.watchers diff --git a/tokio-console/src/state/mod.rs b/tokio-console/src/state/mod.rs index 4b9928a8f..4864294c5 100644 --- a/tokio-console/src/state/mod.rs +++ b/tokio-console/src/state/mod.rs @@ -77,6 +77,22 @@ enum Temporality { Paused, } +impl Default for Temporality { + fn default() -> Self { + Self::Live + } +} + +impl From for Temporality { + fn from(value: i32) -> Self { + match value { + 0 => Temporality::Live, + 1 => Temporality::Paused, + _ => unreachable!(), + } + } +} + #[derive(Debug, Eq, PartialEq)] pub(crate) struct Attribute { field: Field, @@ -107,6 +123,7 @@ impl State { current_view: &view::ViewState, update: proto::instrument::Update, ) { + self.temporality = update.temporality.into(); if let Some(now) = update.now.map(|v| v.try_into().unwrap()) { self.last_updated_at = Some(now); } @@ -250,12 +267,6 @@ impl State { } } -impl Default for Temporality { - fn default() -> Self { - Self::Live - } -} - impl Metadata { fn from_proto(pb: proto::Metadata, id: u64, strings: &mut intern::Strings) -> Self { Self {