Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SVLS-5714] Add lambda network enhanced metrics #424

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ async fn extension_loop_active(
request_id, deadline_ms, invoked_function_arn
);
lambda_enhanced_metrics.increment_invocation_metric();
let mut p = invocation_processor.lock().await;
p.on_invoke_event(request_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder since this is always invoked first, if we might need to change the constructor for the context buffer, simplifying our logic. As I'm not sure if this would guarantee us to always have this operation to exist before any other hook from the telemetry API, we might need some experiment to check!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a discussion about this with Jordan, will do in a future PR!

drop(p);
}
Ok(NextEventResponse::Shutdown {
shutdown_reason,
Expand Down Expand Up @@ -473,10 +476,11 @@ async fn extension_loop_active(
);
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
let mut p = invocation_processor.lock().await;
if let Some(post_runtime_duration_ms) = p.on_platform_report(&request_id, metrics.duration_ms) {
if let Some((post_runtime_duration_ms, network_offset)) = p.on_platform_report(&request_id, metrics.duration_ms) {
lambda_enhanced_metrics.set_post_runtime_duration_metric(
post_runtime_duration_ms,
);
lambda_enhanced_metrics.set_network_enhanced_metrics(network_offset);
}
drop(p);

Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod lifecycle;
pub mod logger;
pub mod logs;
pub mod metrics;
pub mod proc;
pub mod secrets;
pub mod tags;
pub mod telemetry;
Expand Down
81 changes: 69 additions & 12 deletions bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::proc::NetworkData;
use std::collections::VecDeque;

use tracing::debug;
Expand All @@ -8,6 +9,7 @@ pub struct Context {
pub runtime_duration_ms: f64,
pub init_duration_ms: f64,
pub start_time: i64,
pub network_offset: Option<NetworkData>,
}

impl Context {
Expand All @@ -17,12 +19,14 @@ impl Context {
runtime_duration_ms: f64,
init_duration_ms: f64,
start_time: i64,
network_offset: Option<NetworkData>,
) -> Self {
Context {
request_id,
runtime_duration_ms,
init_duration_ms,
start_time,
network_offset,
}
}
}
Expand Down Expand Up @@ -100,7 +104,13 @@ impl ContextBuffer {
{
context.init_duration_ms = init_duration_ms;
} else {
self.insert(Context::new(request_id.clone(), 0.0, init_duration_ms, 0));
self.insert(Context::new(
request_id.clone(),
0.0,
init_duration_ms,
0,
None,
));
}
}

Expand All @@ -115,7 +125,7 @@ impl ContextBuffer {
{
context.start_time = start_time;
} else {
self.insert(Context::new(request_id.clone(), 0.0, 0.0, start_time));
self.insert(Context::new(request_id.clone(), 0.0, 0.0, start_time, None));
}
}

Expand All @@ -135,10 +145,26 @@ impl ContextBuffer {
runtime_duration_ms,
0.0,
0,
None,
));
}
}

/// Adds the network offset to a `Context` in the buffer. If the `Context` is not found, a new
/// `Context` is created and added to the buffer.
///
pub fn add_network_offset(&mut self, request_id: &String, network_data: Option<NetworkData>) {
if let Some(context) = self
.buffer
.iter_mut()
.find(|context| context.request_id == *request_id)
{
context.network_offset = network_data;
} else {
self.insert(Context::new(request_id.clone(), 0.0, 0.0, 0, network_data));
}
}

/// Returns the size of the buffer.
///
#[must_use]
Expand All @@ -157,20 +183,20 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);

// This should replace the first context
let request_id_3 = String::from("3");
let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id_3.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_3).unwrap(), &context);
Expand All @@ -184,13 +210,13 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);
Expand All @@ -211,13 +237,13 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let request_id_2 = String::from("2");
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id_2.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 2);
assert_eq!(buffer.get(&request_id_2).unwrap(), &context);
Expand All @@ -232,7 +258,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -255,7 +281,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -275,7 +301,7 @@ mod tests {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0);
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);
Expand All @@ -295,4 +321,35 @@ mod tests {
200.0
);
}

#[test]
fn test_add_network_offset() {
let mut buffer = ContextBuffer::with_capacity(2);

let request_id = String::from("1");
let context = Context::new(request_id.clone(), 0.0, 0.0, 0, None);
buffer.insert(context.clone());
assert_eq!(buffer.size(), 1);
assert_eq!(buffer.get(&request_id).unwrap(), &context);

let network_offset = Some(NetworkData {
rx_bytes: 180.0,
tx_bytes: 254.0,
});

buffer.add_network_offset(&request_id, network_offset);
assert_eq!(
buffer.get(&request_id).unwrap().network_offset,
network_offset,
);

// Add network offset to a context that doesn't exist
let unexistent_request_id = String::from("unexistent");
buffer.add_network_offset(&unexistent_request_id, network_offset);
assert_eq!(buffer.size(), 2);
assert_eq!(
buffer.get(&unexistent_request_id).unwrap().network_offset,
network_offset
);
}
}
19 changes: 17 additions & 2 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing::debug;
use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{context::ContextBuffer, span_inferrer::SpanInferrer},
proc::{self, NetworkData},
tags::provider,
traces::trace_processor,
};
Expand Down Expand Up @@ -63,6 +64,14 @@ impl Processor {
}
}

/// Given a `request_id`, add the enhanced metric offsets to the context buffer.
///
pub fn on_invoke_event(&mut self, request_id: String) {
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
self.context_buffer
.add_network_offset(&request_id, network_offset);
}

/// Given a `request_id` and the time of the platform start, add the start time to the context buffer.
///
/// Also, set the start time of the current span.
Expand Down Expand Up @@ -151,13 +160,19 @@ impl Processor {
/// If the `request_id` is not found in the context buffer, return `None`.
/// If the `runtime_duration_ms` hasn't been seen, return `None`.
///
pub fn on_platform_report(&mut self, request_id: &String, duration_ms: f64) -> Option<f64> {
pub fn on_platform_report(
&mut self,
request_id: &String,
duration_ms: f64,
) -> Option<(f64, Option<NetworkData>)> {
if let Some(context) = self.context_buffer.remove(request_id) {
if context.runtime_duration_ms == 0.0 {
return None;
}

return Some(duration_ms - context.runtime_duration_ms);
let post_runtime_duration_ms = duration_ms - context.runtime_duration_ms;

return Some((post_runtime_duration_ms, context.network_offset));
}

None
Expand Down
2 changes: 1 addition & 1 deletion bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl LambdaProcessor {
service,
tags,
rules,
invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0),
invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0, None),
orphan_logs: Vec::new(),
ready_logs: Vec::new(),
event_bus,
Expand Down
3 changes: 3 additions & 0 deletions bottlecap/src/metrics/enhanced/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ pub const OUT_OF_MEMORY_METRIC: &str = "aws.lambda.enhanced.out_of_memory";
pub const TIMEOUTS_METRIC: &str = "aws.lambda.enhanced.timeouts";
pub const ERRORS_METRIC: &str = "aws.lambda.enhanced.errors";
pub const INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.invocations";
pub const RX_BYTES_METRIC: &str = "aws.lambda.enhanced.rx_bytes";
pub const TX_BYTES_METRIC: &str = "aws.lambda.enhanced.tx_bytes";
pub const TOTAL_NETWORK_METRIC: &str = "aws.lambda.enhanced.total_network";
//pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations";
pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS";
Loading
Loading