Skip to content

Commit

Permalink
feat: add s3 metrics (#322)
Browse files Browse the repository at this point in the history
* feat: add cloud linter metrics for S3

* chore: add request metrics

* chore: adding metric targets for all storage types

* chore: add type prefix to storage metrics

* chore: remove logic for unused metadata

* chore: clean up metric target logic

* chore: detect and use appropriate request metrics filter if any

* chore: search expression WIP

* chore: remove old storage type handling logic

* chore: support both expressions and metric stats

* chore: inline bucket size metric target creation

* chore: formatting

* chore: remove unused limiter

* chore: clippy formatting, make request_metrics_filter non optional

* chore: more formatting

* chore: cleanup

* chore: stop processing directory buckets and handle bucket redirects

* chore: formatting

* chore: remove clippy allow directive

* chore: change print to log
  • Loading branch information
pgautier404 authored May 29, 2024
1 parent b6541fb commit 5aab591
Show file tree
Hide file tree
Showing 10 changed files with 790 additions and 70 deletions.
413 changes: 383 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions momento/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ aws-config = "1.1.8"
aws-sdk-cloudwatch = "1.19.0"
aws-sdk-dynamodb = "1.19.0"
aws-sdk-elasticache = "1.18.0"
aws-sdk-s3 = "1.28.0"
indicatif = "0.17.8"
flate2 = "1.0.28"

Expand Down
12 changes: 7 additions & 5 deletions momento/src/commands/cloud_linter/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ pub(crate) struct GsiMetadata {
}

impl ResourceWithMetrics for DynamoDbResource {
fn create_metric_target(&self) -> Result<MetricTarget, CliError> {
fn create_metric_targets(&self) -> Result<Vec<MetricTarget>, CliError> {
match self.resource_type {
ResourceType::DynamoDbTable => Ok(MetricTarget {
ResourceType::DynamoDbTable => Ok(vec![MetricTarget {
namespace: "AWS/DynamoDB".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([("TableName".to_string(), self.id.clone())]),
targets: DDB_TABLE_METRICS,
}),
}]),
ResourceType::DynamoDbGsi => {
let gsi_name = self
.metadata
Expand All @@ -145,14 +146,15 @@ impl ResourceWithMetrics for DynamoDbResource {
.ok_or(CliError {
msg: "Global secondary index name not found".to_string(),
})?;
Ok(MetricTarget {
Ok(vec![MetricTarget {
namespace: "AWS/DynamoDB".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([
("TableName".to_string(), self.id.clone()),
("GlobalSecondaryIndexName".to_string(), gsi_name),
]),
targets: DDB_GSI_METRICS,
})
}])
}
_ => Err(CliError {
msg: "Invalid resource type".to_string(),
Expand Down
12 changes: 7 additions & 5 deletions momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ pub(crate) struct ElastiCacheMetadata {
}

impl ResourceWithMetrics for ElastiCacheResource {
fn create_metric_target(&self) -> Result<MetricTarget, CliError> {
fn create_metric_targets(&self) -> Result<Vec<MetricTarget>, CliError> {
match self.resource_type {
ResourceType::ElastiCacheRedisNode => Ok(MetricTarget {
ResourceType::ElastiCacheRedisNode => Ok(vec![MetricTarget {
namespace: "AWS/ElastiCache".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([
("CacheClusterId".to_string(), self.id.clone()),
("CacheNodeId".to_string(), "0001".to_string()),
]),
targets: CACHE_METRICS,
}),
ResourceType::ElastiCacheMemcachedNode => Ok(MetricTarget {
}]),
ResourceType::ElastiCacheMemcachedNode => Ok(vec![MetricTarget {
namespace: "AWS/ElastiCache".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([
(
"CacheClusterId".to_string(),
Expand All @@ -87,7 +89,7 @@ impl ResourceWithMetrics for ElastiCacheResource {
("CacheNodeId".to_string(), self.id.clone()),
]),
targets: CACHE_METRICS,
}),
}]),
_ => Err(CliError {
msg: "Invalid resource type".to_string(),
}),
Expand Down
9 changes: 9 additions & 0 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::fs::{metadata, File};
use tokio::sync::mpsc::{self, Sender};

use crate::commands::cloud_linter::dynamodb::process_ddb_resources;
use crate::commands::cloud_linter::s3::process_s3_resources;
use crate::commands::cloud_linter::serverless_elasticache::process_serverless_elasticache_resources;
use crate::commands::cloud_linter::utils::check_aws_credentials;
use crate::error::CliError;
Expand Down Expand Up @@ -75,6 +76,14 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl
Quota::per_second(core::num::NonZeroU32::new(20).expect("should create non-zero quota"));
let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota));

process_s3_resources(
&config,
Arc::clone(&metrics_limiter),
Arc::clone(&control_plane_limiter),
sender.clone(),
)
.await?;

process_ddb_resources(
&config,
Arc::clone(&control_plane_limiter),
Expand Down
74 changes: 49 additions & 25 deletions momento/src/commands/cloud_linter/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ pub(crate) struct Metric {

pub(crate) struct MetricTarget {
pub(crate) namespace: String,
// a metric target should have either an expression or dimensions but not both
pub(crate) expression: String,
pub(crate) dimensions: HashMap<String, String>,
pub(crate) targets: Map<&'static str, &'static [&'static str]>,
}

pub(crate) trait ResourceWithMetrics {
fn create_metric_target(&self) -> Result<MetricTarget, CliError>;
fn create_metric_targets(&self) -> Result<Vec<MetricTarget>, CliError>;

fn set_metrics(&mut self, metrics: Vec<Metric>);

Expand All @@ -50,10 +52,14 @@ where
metrics_client: &Client,
limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let metric_target = self.create_metric_target()?;
let metrics =
query_metrics_for_target(metrics_client, Arc::clone(&limiter), metric_target).await?;
self.set_metrics(metrics);
let metric_targets = self.create_metric_targets()?;
let mut metrics: Vec<Vec<Metric>> = Vec::new();
for target in metric_targets {
metrics.push(
query_metrics_for_target(metrics_client, Arc::clone(&limiter), target).await?,
);
}
self.set_metrics(metrics.into_iter().flatten().collect());
self.set_metric_period_seconds(60 * 60 * 24);

Ok(())
Expand All @@ -71,29 +77,47 @@ async fn query_metrics_for_target(
.into_iter()
.map(|(name, value)| Dimension::builder().name(name).value(value).build())
.collect();
let mut metric_data_query: MetricDataQuery;
for (stat_type, metrics) in metric_target.targets.entries() {
let mut metric_data_queries: Vec<MetricDataQuery> = Vec::with_capacity(metrics.len());
for metric in *metrics {
let metric_data_query = MetricDataQuery::builder()
.metric_stat(
MetricStat::builder()
.metric(
CloudwatchMetric::builder()
.metric_name(metric.to_string())
.namespace(metric_target.namespace.clone())
.set_dimensions(Some(dimensions.clone()))
.build(),
)
.period(60 * 60 * 24)
.stat(stat_type.to_string())
.build(),
)
.id(format!(
"{}_{}",
metric.to_lowercase(),
stat_type.to_lowercase()
))
.build();
if metric_target.expression.is_empty() {
metric_data_query = MetricDataQuery::builder()
.metric_stat(
MetricStat::builder()
.metric(
CloudwatchMetric::builder()
.metric_name(metric.to_string())
.namespace(metric_target.namespace.clone())
.set_dimensions(Some(dimensions.clone()))
.build(),
)
.period(60 * 60 * 24)
.stat(stat_type.to_string())
.build(),
)
.id(format!(
"{}_{}",
metric.to_lowercase(),
stat_type.to_lowercase()
))
.build();
} else {
let search_expression = format!(
"SEARCH(\' {} \', \'{}\')",
metric_target.expression, stat_type
);
metric_data_query = MetricDataQuery::builder()
.expression(search_expression)
.period(60 * 60 * 24)
.return_data(true)
.id(format!(
"{}_{}",
metric.to_lowercase(),
stat_type.to_lowercase()
))
.build();
}
metric_data_queries.push(metric_data_query);
}

Expand Down
1 change: 1 addition & 0 deletions momento/src/commands/cloud_linter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ mod elasticache;
pub mod linter_cli;
mod metrics;
mod resource;
mod s3;
mod serverless_elasticache;
mod utils;
15 changes: 13 additions & 2 deletions momento/src/commands/cloud_linter/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::Serialize;
use crate::commands::cloud_linter::dynamodb::DynamoDbMetadata;
use crate::commands::cloud_linter::elasticache::ElastiCacheMetadata;
use crate::commands::cloud_linter::metrics::Metric;
use crate::commands::cloud_linter::s3::S3Metadata;
use crate::commands::cloud_linter::serverless_elasticache::ServerlessElastiCacheMetadata;

#[derive(Serialize, Debug)]
Expand All @@ -11,6 +12,7 @@ pub(crate) enum Resource {
DynamoDb(DynamoDbResource),
ElastiCache(ElastiCacheResource),
ServerlessElastiCache(ServerlessElastiCacheResource),
S3(S3Resource),
}

#[derive(Debug, Serialize, PartialEq)]
Expand All @@ -25,6 +27,8 @@ pub(crate) enum ResourceType {
ElastiCacheMemcachedNode,
#[serde(rename = "AWS::Elasticache::Serverless")]
ServerlessElastiCache,
#[serde(rename = "AWS::S3::Bucket")]
S3,
}

#[derive(Serialize, Debug)]
Expand Down Expand Up @@ -64,6 +68,13 @@ pub(crate) struct ServerlessElastiCacheResource {
}

#[derive(Serialize, Debug)]
pub(crate) struct DataFormat {
pub(crate) resources: Vec<Resource>,
pub(crate) struct S3Resource {
#[serde(rename = "type")]
pub(crate) resource_type: ResourceType,
pub(crate) region: String,
pub(crate) id: String,
pub(crate) metrics: Vec<Metric>,
#[serde(rename = "metricPeriodSeconds")]
pub(crate) metric_period_seconds: i32,
pub(crate) metadata: S3Metadata,
}
Loading

0 comments on commit 5aab591

Please sign in to comment.