Skip to content

Commit

Permalink
feat: add api gateway metrics (#323)
Browse files Browse the repository at this point in the history
* feat: add cloud linter metics for api gateway

* chore: formatting

* chore: add rate limiting and pagination to get rest apis call
  • Loading branch information
pgautier404 authored May 29, 2024
1 parent 5aab591 commit c5a2376
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 1 deletion.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions momento/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ webbrowser = "^0.8.4"
humantime = "2.1.0"
governor = "0.6.3"
aws-config = "1.1.8"
aws-sdk-apigateway = "1.25.0"
aws-sdk-cloudwatch = "1.19.0"
aws-sdk-dynamodb = "1.19.0"
aws-sdk-elasticache = "1.18.0"
Expand Down
177 changes: 177 additions & 0 deletions momento/src/commands/cloud_linter/api_gateway.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use crate::commands::cloud_linter::metrics::{
AppendMetrics, Metric, MetricTarget, ResourceWithMetrics,
};
use crate::commands::cloud_linter::resource::{ApiGatewayResource, Resource, ResourceType};
use crate::commands::cloud_linter::utils::rate_limit;
use crate::error::CliError;
use aws_config::SdkConfig;
use aws_sdk_apigateway::types::RestApi;
use governor::DefaultDirectRateLimiter;
use indicatif::{ProgressBar, ProgressStyle};
use phf::{phf_map, Map};
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

const API_GATEWAY_METRICS: Map<&'static str, &'static [&'static str]> = phf_map! {
"Sum" => &[
"CacheHitCount",
"CacheMissCount",
"Count",
"IntegrationLatency",
"Latency",
],
"Average" => &[
"CacheHitCount",
"CacheMissCount",
"Count",
"IntegrationLatency",
"Latency",
],
"Maximum" => &[
"CacheHitCount",
"CacheMissCount",
"Count",
"IntegrationLatency",
"Latency",
],
};

#[derive(Serialize, Clone, Debug)]
pub(crate) struct ApiGatewayMetadata {
#[serde(rename = "name")]
name: String,
}

impl ResourceWithMetrics for ApiGatewayResource {
fn create_metric_targets(&self) -> Result<Vec<MetricTarget>, CliError> {
let targets = vec![MetricTarget {
namespace: "AWS/ApiGateway".to_string(),
expression: "".to_string(),
dimensions: HashMap::from([("ApiName".to_string(), self.metadata.name.clone())]),
targets: API_GATEWAY_METRICS,
}];
match self.resource_type {
ResourceType::ApiGateway => Ok(targets),
_ => Err(CliError {
msg: "Invalid resource type".to_string(),
}),
}
}

fn set_metrics(&mut self, metrics: Vec<Metric>) {
self.metrics = metrics;
}

fn set_metric_period_seconds(&mut self, period: i32) {
self.metric_period_seconds = period;
}
}

pub(crate) async fn process_api_gateway_resources(
config: &SdkConfig,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let region = config.region().map(|r| r.as_ref()).ok_or(CliError {
msg: "No region configured for client".to_string(),
})?;
let apig_client = aws_sdk_apigateway::Client::new(config);
let metrics_client = aws_sdk_cloudwatch::Client::new(config);

let list_apis_bar = ProgressBar::new_spinner().with_message("Listing API Gateway resources");
list_apis_bar.enable_steady_tick(std::time::Duration::from_millis(100));

let mut apis = Vec::new();
let mut resp_stream = apig_client.get_rest_apis().into_paginator().send();
while let Some(result) =
rate_limit(Arc::clone(&control_plane_limiter), || resp_stream.next()).await
{
match result {
Ok(result) => {
apis.extend(result.items.unwrap_or_default());
}
Err(e) => {
return Err(CliError {
msg: format!("Failed to list API Gateway resources: {}", e),
});
}
}
}
list_apis_bar.finish();
process_apis(
apig_client.clone(),
&apis,
region,
sender,
&metrics_client,
&metrics_limiter,
)
.await?;

Ok(())
}

async fn process_apis(
apig_client: aws_sdk_apigateway::Client,
apis: &[RestApi],
region: &str,
sender: Sender<Resource>,
metrics_client: &aws_sdk_cloudwatch::Client,
metrics_limiter: &Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let mut resources: Vec<Resource> = Vec::with_capacity(apis.len());
let get_apis_bar =
ProgressBar::new((apis.len() * 2) as u64).with_message("Processing API Gateway resources");
get_apis_bar
.set_style(ProgressStyle::with_template(" {msg} {bar} {eta}").expect("invalid template"));
for api in apis {
let the_api = apig_client
.get_rest_api()
.rest_api_id(api.id.clone().unwrap_or_default())
.send()
.await?;

let metadata = ApiGatewayMetadata {
name: the_api.name.clone().unwrap_or_default(),
};

let apig_resource = ApiGatewayResource {
resource_type: ResourceType::ApiGateway,
region: region.to_string(),
id: the_api.id.clone().unwrap_or_default(),
metrics: vec![],
metric_period_seconds: 0,
metadata,
};
resources.push(Resource::ApiGateway(apig_resource));
get_apis_bar.inc(1);
}

for resource in resources {
match resource {
Resource::ApiGateway(mut apig_resource) => {
apig_resource
.append_metrics(metrics_client, Arc::clone(metrics_limiter))
.await?;
sender
.send(Resource::ApiGateway(apig_resource))
.await
.map_err(|_| CliError {
msg: "Failed to send API Gateway resource".to_string(),
})?;
get_apis_bar.inc(1);
}
_ => {
return Err(CliError {
msg: "Invalid resource type".to_string(),
});
}
}
}

get_apis_bar.finish();
Ok(())
}
9 changes: 9 additions & 0 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::{copy, BufReader};
use std::path::Path;
use std::sync::Arc;

use crate::commands::cloud_linter::api_gateway::process_api_gateway_resources;
use aws_config::{BehaviorVersion, Region};
use flate2::write::GzEncoder;
use flate2::Compression;
Expand Down Expand Up @@ -78,8 +79,16 @@ async fn process_data(region: String, sender: Sender<Resource>) -> Result<(), Cl

process_s3_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;

process_api_gateway_resources(
&config,
Arc::clone(&control_plane_limiter),
Arc::clone(&metrics_limiter),
sender.clone(),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions momento/src/commands/cloud_linter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod api_gateway;
mod dynamodb;
mod elasticache;
pub mod linter_cli;
Expand Down
16 changes: 16 additions & 0 deletions momento/src/commands/cloud_linter/resource.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::commands::cloud_linter::api_gateway::ApiGatewayMetadata;
use serde::Serialize;

use crate::commands::cloud_linter::dynamodb::DynamoDbMetadata;
Expand All @@ -9,6 +10,7 @@ use crate::commands::cloud_linter::serverless_elasticache::ServerlessElastiCache
#[derive(Serialize, Debug)]
#[serde(untagged)]
pub(crate) enum Resource {
ApiGateway(ApiGatewayResource),
DynamoDb(DynamoDbResource),
ElastiCache(ElastiCacheResource),
ServerlessElastiCache(ServerlessElastiCacheResource),
Expand All @@ -17,6 +19,8 @@ pub(crate) enum Resource {

#[derive(Debug, Serialize, PartialEq)]
pub(crate) enum ResourceType {
#[serde(rename = "AWS::ApiGateway::API")]
ApiGateway,
#[serde(rename = "AWS::DynamoDB::GSI")]
DynamoDbGsi,
#[serde(rename = "AWS::DynamoDB::Table")]
Expand Down Expand Up @@ -78,3 +82,15 @@ pub(crate) struct S3Resource {
pub(crate) metric_period_seconds: i32,
pub(crate) metadata: S3Metadata,
}

#[derive(Serialize, Debug)]
pub(crate) struct ApiGatewayResource {
#[serde(rename = "type")]
pub(crate) resource_type: ResourceType,
pub(crate) region: String,
pub(crate) id: String,
pub(crate) metrics: Vec<Metric>,
#[serde(rename = "metricPeriodSeconds")]
pub(crate) metric_period_seconds: i32,
pub(crate) metadata: ApiGatewayMetadata,
}
2 changes: 1 addition & 1 deletion momento/src/commands/cloud_linter/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ impl ResourceWithMetrics for S3Resource {

pub(crate) async fn process_s3_resources(
config: &SdkConfig,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let region = config.region().map(|r| r.as_ref()).ok_or(CliError {
Expand Down

0 comments on commit c5a2376

Please sign in to comment.