Skip to content

Commit

Permalink
feat: speed up resource collection, add resource specific cli flags (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bruuuuuuuce authored May 31, 2024
1 parent c5a2376 commit 81f77fc
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 86 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ client-cli.iml
bin/
momento.exe
obj/

linter_results.json
linter_results.json.gz

25 changes: 25 additions & 0 deletions momento-cli-opts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,34 @@ to help find opportunities for optimizations with Momento.
CloudLinter {
#[arg(long, short, help = "The AWS region to examine")]
region: String,
#[arg(
long = "enable-ddb-ttl-check",
help = "Opt in to check whether ddb tables have ttl enabled. If there are lots of tables, could slow down data collection"
)]
enable_ddb_ttl_check: bool,
#[arg(
value_enum,
long = "resource",
help = "Pass in a specific resource type to only collect data on that resource. Example: --resource dynamo"
)]
resource: Option<CloudLinterResources>,
#[arg(
long = "metric-collection-rate",
help = "tps at which to invoke the aws `get-metric-data` api",
default_value = "20"
)]
metric_collection_rate: u32,
},
}

#[derive(clap::ValueEnum, PartialEq, Eq, Debug, Clone, Copy)]
pub enum CloudLinterResources {
ApiGateway,
S3,
Dynamo,
ElastiCache,
}

#[derive(Debug, Parser)]
pub enum CloudSignupCommand {
#[command(about = SIGNUP_DEPRECATED_MSG)]
Expand Down
2 changes: 1 addition & 1 deletion momento/src/commands/cloud_linter/api_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const API_GATEWAY_METRICS: Map<&'static str, &'static [&'static str]> = phf_map!
],
};

#[derive(Serialize, Clone, Debug)]
#[derive(Serialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct ApiGatewayMetadata {
#[serde(rename = "name")]
name: String,
Expand Down
68 changes: 53 additions & 15 deletions momento/src/commands/cloud_linter/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use aws_config::SdkConfig;
use aws_sdk_dynamodb::types::{TimeToLiveDescription, TimeToLiveStatus};
use futures::stream::FuturesUnordered;
use governor::DefaultDirectRateLimiter;
use indicatif::{ProgressBar, ProgressStyle};
use phf::{phf_map, Map};
Expand Down Expand Up @@ -70,7 +71,7 @@ const DDB_GSI_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
],
};

#[derive(Serialize, Clone, Debug)]
#[derive(Serialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct DynamoDbMetadata {
#[serde(rename = "avgItemSizeBytes")]
avg_item_size_bytes: i64,
Expand Down Expand Up @@ -110,7 +111,7 @@ impl DynamoDbMetadata {
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct GsiMetadata {
#[serde(rename = "gsiName")]
gsi_name: String,
Expand Down Expand Up @@ -177,6 +178,7 @@ pub(crate) async fn process_ddb_resources(
metrics_limiter: Arc<DefaultDirectRateLimiter>,
describe_ttl_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
enable_ddb_ttl_check: bool,
) -> Result<(), CliError> {
let ddb_client = aws_sdk_dynamodb::Client::new(config);
let metrics_client = aws_sdk_cloudwatch::Client::new(config);
Expand All @@ -190,18 +192,47 @@ pub(crate) async fn process_ddb_resources(
ProgressBar::new(table_names.len() as u64).with_message("Processing Dynamo DB tables");
describe_ddb_tables_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));

let futures = FuturesUnordered::new();

for table_name in table_names {
process_table_resources(
&ddb_client,
&metrics_client,
&table_name,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
Arc::clone(&describe_ttl_limiter),
sender.clone(),
)
.await?;
describe_ddb_tables_bar.inc(1);
let sender_clone = sender.clone();
let ddb_client_clone = ddb_client.clone();
let metrics_client_clone = metrics_client.clone();
let table_name_clone = table_name.clone();
let control_plane_limiter_clone = control_plane_limiter.clone();
let metrics_limiter_clone = metrics_limiter.clone();
let describe_ttl_limiter_clone = describe_ttl_limiter.clone();
let progress_bar_clone = describe_ddb_tables_bar.clone();
let spawn = tokio::spawn(async move {
let res = process_table_resources(
&ddb_client_clone,
&metrics_client_clone,
&table_name_clone,
control_plane_limiter_clone,
metrics_limiter_clone,
describe_ttl_limiter_clone,
sender_clone,
enable_ddb_ttl_check,
)
.await;
progress_bar_clone.inc(1);
res
});
futures.push(spawn);
}

let all_results = futures::future::join_all(futures).await;
for result in all_results {
match result {
// bubble up any cli errors that we came across
Ok(res) => res?,
Err(_) => {
return Err(CliError {
msg: "failed to wait for all dynamo resources to collect data".to_string(),
})
}
}
}

describe_ddb_tables_bar.finish();
Expand Down Expand Up @@ -282,6 +313,7 @@ async fn is_ddb_ttl_enabled(
Ok(ttl_enabled)
}

#[allow(clippy::too_many_arguments)]
async fn process_table_resources(
ddb_client: &aws_sdk_dynamodb::Client,
metrics_client: &aws_sdk_cloudwatch::Client,
Expand All @@ -290,6 +322,7 @@ async fn process_table_resources(
metrics_limiter: Arc<DefaultDirectRateLimiter>,
describe_ttl_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
enable_ddb_ttl_check: bool,
) -> Result<(), CliError> {
let region = ddb_client
.config()
Expand Down Expand Up @@ -451,8 +484,13 @@ async fn process_table_resources(
resource
.append_metrics(metrics_client, Arc::clone(&metrics_limiter))
.await?;
let ttl_enabled =
is_ddb_ttl_enabled(ddb_client, &resource, Arc::clone(&describe_ttl_limiter)).await?;
let ttl_enabled = match enable_ddb_ttl_check {
true => {
is_ddb_ttl_enabled(ddb_client, &resource, Arc::clone(&describe_ttl_limiter)).await?
}
false => false,
};

resource.metadata.ttl_enabled = ttl_enabled;
sender
.send(Resource::DynamoDb(resource))
Expand Down
2 changes: 1 addition & 1 deletion momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) const CACHE_METRICS: Map<&'static str, &'static [&'static str]> = phf
],
};

#[derive(Serialize, Clone, Debug)]
#[derive(Serialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct ElastiCacheMetadata {
#[serde(rename = "clusterId")]
cluster_id: String,
Expand Down
86 changes: 81 additions & 5 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aws_config::{BehaviorVersion, Region};
use flate2::write::GzEncoder;
use flate2::Compression;
use governor::{Quota, RateLimiter};
use momento_cli_opts::CloudLinterResources;
use struson::writer::{JsonStreamWriter, JsonWriter};
use tokio::fs::{metadata, File};
use tokio::sync::mpsc::{self, Sender};
Expand All @@ -20,7 +21,12 @@ use crate::error::CliError;
use super::elasticache::process_elasticache_resources;
use super::resource::Resource;

pub async fn run_cloud_linter(region: String) -> Result<(), CliError> {
pub async fn run_cloud_linter(
region: String,
enable_ddb_ttl_check: bool,
only_collect_for_resource: Option<CloudLinterResources>,
metric_collection_rate: u32,
) -> Result<(), CliError> {
let (tx, mut rx) = mpsc::channel::<Resource>(32);
let file_path = "linter_results.json";
// first we check to make sure we have perms to write files to the current directory
Expand All @@ -34,7 +40,14 @@ pub async fn run_cloud_linter(region: String) -> Result<(), CliError> {
json_writer.name("resources")?;
json_writer.begin_array()?;
tokio::spawn(async move {
let _ = process_data(region, tx).await;
let _ = process_data(
region,
tx,
enable_ddb_ttl_check,
only_collect_for_resource,
metric_collection_rate,
)
.await;
});
while let Some(message) = rx.recv().await {
let _ = json_writer.serialize_value(&message);
Expand All @@ -56,7 +69,13 @@ pub async fn run_cloud_linter(region: String) -> Result<(), CliError> {
Ok(())
}

async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), CliError> {
async fn process_data(
region: String,
sender: Sender<Resource>,
enable_ddb_ttl_check: bool,
only_collect_for_resource: Option<CloudLinterResources>,
metric_collection_rate: u32,
) -> Result<(), CliError> {
let config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(region))
.load()
Expand All @@ -73,10 +92,66 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl
);
let describe_ttl_limiter = Arc::new(RateLimiter::direct(describe_ttl_quota));

let metrics_quota =
Quota::per_second(core::num::NonZeroU32::new(20).expect("should create non-zero quota"));
let metrics_quota = Quota::per_second(
core::num::NonZeroU32::new(metric_collection_rate).expect("should create non-zero quota"),
);
let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota));

if let Some(resource) = only_collect_for_resource {
match resource {
CloudLinterResources::ApiGateway => {
process_api_gateway_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;
return Ok(());
}
CloudLinterResources::S3 => {
process_s3_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;
return Ok(());
}
CloudLinterResources::Dynamo => {
process_ddb_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
Arc::clone(&describe_ttl_limiter),
sender.clone(),
enable_ddb_ttl_check,
)
.await?;
return Ok(());
}
CloudLinterResources::ElastiCache => {
process_elasticache_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;

process_serverless_elasticache_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;
return Ok(());
}
}
};

process_s3_resources(
&config,
Arc::clone(&control_plane_limiter),
Expand All @@ -99,6 +174,7 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl
Arc::clone(&metrics_limiter),
Arc::clone(&describe_ttl_limiter),
sender.clone(),
enable_ddb_ttl_check,
)
.await?;

Expand Down
29 changes: 25 additions & 4 deletions momento/src/commands/cloud_linter/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ use aws_sdk_cloudwatch::types::Metric as CloudwatchMetric;
use aws_sdk_cloudwatch::types::{Dimension, MetricDataQuery, MetricStat};
use aws_sdk_cloudwatch::Client;
use chrono::{Duration, Utc};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use governor::DefaultDirectRateLimiter;
use phf::Map;
use serde::{Deserialize, Serialize};

use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub(crate) struct Metric {
pub name: String,
pub values: Vec<f64>,
Expand Down Expand Up @@ -54,11 +56,30 @@ where
) -> Result<(), CliError> {
let metric_targets = self.create_metric_targets()?;
let mut metrics: Vec<Vec<Metric>> = Vec::new();
let mut futures = FuturesUnordered::new();

for target in metric_targets {
metrics.push(
query_metrics_for_target(metrics_client, Arc::clone(&limiter), target).await?,
);
let client = metrics_client.clone();
let moved_limiter = Arc::clone(&limiter);
let spawn = tokio::spawn(async move {
query_metrics_for_target(&client, moved_limiter, target).await
});
futures.push(spawn);
}
while let Some(finished_future) = futures.next().await {
match finished_future {
Err(_e) => {
return Err(CliError {
msg: "failed to retrieve metrics from cloudwatch".to_string(),
})
}
Ok(result) => {
let resource_metrics = result?;
metrics.push(resource_metrics);
}
}
}

self.set_metrics(metrics.into_iter().flatten().collect());
self.set_metric_period_seconds(60 * 60 * 24);

Expand Down
Loading

0 comments on commit 81f77fc

Please sign in to comment.