Skip to content

Commit

Permalink
feat: add temporality state to update message
Browse files Browse the repository at this point in the history
This field can be used to check if the stream has been paused.
  • Loading branch information
Rustin170506 committed Jul 22, 2024
1 parent 4543901 commit 5f847ec
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 17 deletions.
11 changes: 11 additions & 0 deletions console-api/proto/instrument.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ message Update {

// Any new span metadata that was registered since the last update.
common.RegisterMetadata new_metadata = 5;

// The time "state" of the aggregator, such as paused or live.
Temporality temporality = 6;
}

// The time "state" of the aggregator.
enum Temporality {
// The aggregator is currently live.
LIVE = 0;
// The aggregator is currently paused.
PAUSED = 1;
}

// `PauseResponse` is the value returned after a pause request.
Expand Down
32 changes: 32 additions & 0 deletions console-api/src/generated/rs.tokio.console.instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub struct Update {
/// Any new span metadata that was registered since the last update.
#[prost(message, optional, tag = "5")]
pub new_metadata: ::core::option::Option<super::common::RegisterMetadata>,
/// The time "state" of the aggregator, such as paused or live.
#[prost(enumeration = "Temporality", tag = "6")]
pub temporality: i32,
}
/// `PauseResponse` is the value returned after a pause request.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -63,6 +66,35 @@ pub struct PauseResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ResumeResponse {}
/// The time "state" of the aggregator.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Temporality {
/// The aggregator is currently live.
Live = 0,
/// The aggregator is currently paused.
Paused = 1,
}
impl Temporality {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Temporality::Live => "LIVE",
Temporality::Paused => "PAUSED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LIVE" => Some(Self::Live),
"PAUSED" => Some(Self::Paused),
_ => None,
}
}
}
/// Generated client implementations.
pub mod instrument_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
Expand Down
34 changes: 34 additions & 0 deletions console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ import { TaskUpdate } from "./tasks_pb.js";
import { ResourceUpdate } from "./resources_pb.js";
import { AsyncOpUpdate } from "./async_ops_pb.js";

/**
* The time "state" of the aggregator.
*
* @generated from enum rs.tokio.console.instrument.Temporality
*/
export enum Temporality {
/**
* The aggregator is currently live.
*
* @generated from enum value: LIVE = 0;
*/
LIVE = 0,

/**
* The aggregator is currently paused.
*
* @generated from enum value: PAUSED = 1;
*/
PAUSED = 1,
}
// Retrieve enum metadata with: proto3.getEnumType(Temporality)
proto3.util.setEnumType(Temporality, "rs.tokio.console.instrument.Temporality", [
{ no: 0, name: "LIVE" },
{ no: 1, name: "PAUSED" },
]);

/**
* InstrumentRequest requests the stream of updates
* to observe the async runtime state over time.
Expand Down Expand Up @@ -207,6 +233,13 @@ export class Update extends Message<Update> {
*/
newMetadata?: RegisterMetadata;

/**
* The time "state" of the aggregator, such as paused or live.
*
* @generated from field: rs.tokio.console.instrument.Temporality temporality = 6;
*/
temporality = Temporality.LIVE;

constructor(data?: PartialMessage<Update>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -220,6 +253,7 @@ export class Update extends Message<Update> {
{ no: 3, name: "resource_update", kind: "message", T: ResourceUpdate },
{ no: 4, name: "async_op_update", kind: "message", T: AsyncOpUpdate },
{ no: 5, name: "new_metadata", kind: "message", T: RegisterMetadata },
{ no: 6, name: "temporality", kind: "enum", T: proto3.getEnumType(Temporality) },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): Update {
Expand Down
8 changes: 3 additions & 5 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use console_api as proto;
use console_api::instrument::Temporality;
use prost::Message;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
Expand Down Expand Up @@ -102,11 +103,6 @@ pub(crate) struct Flush {
triggered: AtomicBool,
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}
// Represent static data for resources
struct Resource {
id: Id,
Expand Down Expand Up @@ -293,6 +289,7 @@ impl Aggregator {
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
temporality: self.temporality.into(),
};
let message_size = update.encoded_len();
if message_size < MAX_MESSAGE_SIZE {
Expand Down Expand Up @@ -417,6 +414,7 @@ impl Aggregator {
task_update,
resource_update,
async_op_update,
temporality: self.temporality.into(),
};

self.watchers
Expand Down
21 changes: 9 additions & 12 deletions tokio-console/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
warnings::Linter,
};
use console_api as proto;
use console_api::instrument::Temporality;
use ratatui::{
style::{Color, Modifier},
text::Span,
Expand Down Expand Up @@ -71,12 +72,6 @@ pub(crate) enum FieldValue {
Debug(String),
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct Attribute {
field: Field,
Expand Down Expand Up @@ -107,6 +102,14 @@ impl State {
current_view: &view::ViewState,
update: proto::instrument::Update,
) {
match Temporality::try_from(update.temporality) {
Ok(temporality) => {
self.temporality = temporality;
}
Err(..) => {
tracing::warn!(?update.temporality, "invalid temporality");
}
}
if let Some(now) = update.now.map(|v| v.try_into().unwrap()) {
self.last_updated_at = Some(now);
}
Expand Down Expand Up @@ -250,12 +253,6 @@ impl State {
}
}

impl Default for Temporality {
fn default() -> Self {
Self::Live
}
}

impl Metadata {
fn from_proto(pb: proto::Metadata, id: u64, strings: &mut intern::Strings) -> Self {
Self {
Expand Down

0 comments on commit 5f847ec

Please sign in to comment.