Skip to content

Commit

Permalink
feat: add temporality state in update
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Jul 4, 2024
1 parent 6ad0def commit f3cf5e2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 41 deletions.
57 changes: 34 additions & 23 deletions console-api/proto/instrument.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import "async_ops.proto";

// `InstrumentServer<T>` implements `Instrument` as a service.
service Instrument {
// Produces a stream of updates representing the behavior of the instrumented async runtime.
rpc WatchUpdates(InstrumentRequest) returns (stream Update) {}
// Produces a stream of updates describing the activity of a specific task.
rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {}
// Registers that the console observer wants to pause the stream.
rpc Pause(PauseRequest) returns (PauseResponse) {}
// Registers that the console observer wants to resume the stream.
rpc Resume(ResumeRequest) returns (ResumeResponse) {}
// Produces a stream of updates representing the behavior of the instrumented async runtime.
rpc WatchUpdates(InstrumentRequest) returns (stream Update) {}
// Produces a stream of updates describing the activity of a specific task.
rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {}
// Registers that the console observer wants to pause the stream.
rpc Pause(PauseRequest) returns (PauseResponse) {}
// Registers that the console observer wants to resume the stream.
rpc Resume(ResumeRequest) returns (ResumeResponse) {}
}

// InstrumentRequest requests the stream of updates
Expand All @@ -32,8 +32,8 @@ message InstrumentRequest {
// TaskDetailsRequest requests the stream of updates about
// the specific task identified in the request.
message TaskDetailsRequest {
// Identifies the task for which details were requested.
common.Id id = 1;
// Identifies the task for which details were requested.
common.Id id = 1;
}

// PauseRequest requests the stream of updates to pause.
Expand All @@ -53,23 +53,34 @@ message ResumeRequest {
// - things such as async ops and resource ops do not make sense
// on their own as they have relations to tasks and resources
message Update {
// The system time when this update was recorded.
//
// This is the timestamp any durations in the included `Stats` were
// calculated relative to.
google.protobuf.Timestamp now = 1;
// The system time when this update was recorded.
//
// This is the timestamp any durations in the included `Stats` were
// calculated relative to.
google.protobuf.Timestamp now = 1;

// Task state update.
tasks.TaskUpdate task_update = 2;
// Task state update.
tasks.TaskUpdate task_update = 2;

// Resource state update.
resources.ResourceUpdate resource_update = 3;
// Resource state update.
resources.ResourceUpdate resource_update = 3;

// Async operations state update
async_ops.AsyncOpUpdate async_op_update = 4;
// Async operations state update
async_ops.AsyncOpUpdate async_op_update = 4;

// Any new span metadata that was registered since the last update.
common.RegisterMetadata new_metadata = 5;
// Any new span metadata that was registered since the last update.
common.RegisterMetadata new_metadata = 5;

// The time "state" of the aggregator, such as paused or live.
Temporality temporality = 6;
}

// The time "state" of the aggregator.
enum Temporality {
// The aggregator is currently live.
LIVE = 0;
// The aggregator is currently paused.
PAUSED = 1;
}

// `PauseResponse` is the value returned after a pause request.
Expand Down
32 changes: 32 additions & 0 deletions console-api/src/generated/rs.tokio.console.instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub struct Update {
/// Any new span metadata that was registered since the last update.
#[prost(message, optional, tag = "5")]
pub new_metadata: ::core::option::Option<super::common::RegisterMetadata>,
/// The time "state" of the aggregator, such as paused or live.
#[prost(enumeration = "Temporality", tag = "6")]
pub temporality: i32,
}
/// `PauseResponse` is the value returned after a pause request.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -62,6 +65,35 @@ pub struct PauseResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResumeResponse {}
/// The time "state" of the aggregator.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Temporality {
/// The aggregator is currently live.
Live = 0,
/// The aggregator is currently paused.
Paused = 1,
}
impl Temporality {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Temporality::Live => "LIVE",
Temporality::Paused => "PAUSED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LIVE" => Some(Self::Live),
"PAUSED" => Some(Self::Paused),
_ => None,
}
}
}
/// Generated client implementations.
pub mod instrument_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
Expand Down
21 changes: 9 additions & 12 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ use std::{
time::{Duration, Instant},
};

use console_api as proto;
use prost::Message;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
use tracing_core::{span::Id, Metadata};

use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};
use console_api as proto;
use console_api::instrument::Temporality;
use prost::Message;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
use tracing_core::{span::Id, Metadata};

mod id_data;
mod shrink;
Expand Down Expand Up @@ -89,7 +89,7 @@ pub struct Aggregator {
poll_ops: Vec<proto::resources::PollOp>,

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,
temporality: proto::instrument::Temporality,

/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
Expand All @@ -102,11 +102,6 @@ pub(crate) struct Flush {
triggered: AtomicBool,
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}
// Represent static data for resources
struct Resource {
id: Id,
Expand Down Expand Up @@ -293,6 +288,7 @@ impl Aggregator {
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
temporality: self.temporality.into(),
};
let message_size = update.encoded_len();
if message_size < MAX_MESSAGE_SIZE {
Expand Down Expand Up @@ -417,6 +413,7 @@ impl Aggregator {
task_update,
resource_update,
async_op_update,
temporality: self.temporality.into(),
};

self.watchers
Expand Down
23 changes: 17 additions & 6 deletions tokio-console/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ enum Temporality {
Paused,
}

impl Default for Temporality {
fn default() -> Self {
Self::Live
}
}

impl From<i32> for Temporality {
fn from(value: i32) -> Self {
match value {
0 => Temporality::Live,
1 => Temporality::Paused,
_ => unreachable!(),
}
}
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct Attribute {
field: Field,
Expand Down Expand Up @@ -107,6 +123,7 @@ impl State {
current_view: &view::ViewState,
update: proto::instrument::Update,
) {
self.temporality = update.temporality.into();
if let Some(now) = update.now.map(|v| v.try_into().unwrap()) {
self.last_updated_at = Some(now);
}
Expand Down Expand Up @@ -250,12 +267,6 @@ impl State {
}
}

impl Default for Temporality {
fn default() -> Self {
Self::Live
}
}

impl Metadata {
fn from_proto(pb: proto::Metadata, id: u64, strings: &mut intern::Strings) -> Self {
Self {
Expand Down

0 comments on commit f3cf5e2

Please sign in to comment.