Skip to content

Commit

Permalink
Merge pull request #328 from momentohq/improve-progress-bars
Browse files Browse the repository at this point in the history
chore: improve progress bars in cloud-linter
  • Loading branch information
nand4011 authored Jun 6, 2024
2 parents 5e5a622 + d88d56d commit 7664098
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 159 deletions.
156 changes: 76 additions & 80 deletions momento/src/commands/cloud_linter/elasticache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,73 @@ async fn process_resources(
region: &str,
sender: Sender<Resource>,
) -> Result<(), CliError> {
let bar = ProgressBar::new_spinner().with_message("Describing ElastiCache clusters");
bar.enable_steady_tick(Duration::from_millis(100));
bar.set_style(
ProgressStyle::with_template("{spinner:.green} {pos:>7} {msg}")
.expect("template should be valid")
// For more spinners check out the cli-spinners project:
// https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
.tick_strings(&[
"▹▹▹▹▹",
"▸▹▹▹▹",
"▹▸▹▹▹",
"▹▹▸▹▹",
"▹▹▹▸▹",
"▹▹▹▹▸",
"▪▪▪▪▪",
]),
let describe_bar = ProgressBar::new_spinner().with_message("Listing ElastiCache resources");
describe_bar.enable_steady_tick(Duration::from_millis(100));
let mut resources =
describe_clusters(elasticache_client, control_plane_limiter, region).await?;
describe_bar.finish();

let process_bar =
ProgressBar::new(resources.len() as u64).with_message("Processing ElastiCache resources");
process_bar.set_style(
ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"),
);

while !resources.is_empty() {
let chunk: Vec<ElastiCacheResource> = resources
.drain(..std::cmp::min(10, resources.len()))
.collect();

let futures = FuturesUnordered::new();
for mut resource in chunk {
let metrics_limiter_clone = Arc::clone(&metrics_limiter);
let sender_clone = sender.clone();
let process_bar_clone = process_bar.clone();
let metrics_client_clone = metrics_client.clone();

futures.push(tokio::spawn(async move {
resource
.append_metrics(&metrics_client_clone, metrics_limiter_clone)
.await?;

let wrapped_resource = Resource::ElastiCache(resource);
sender_clone
.send(wrapped_resource)
.await
.map_err(|err| CliError {
msg: format!("Failed to send elasticache resource: {}", err),
})?;
process_bar_clone.inc(1);
Ok::<(), CliError>(())
}));
}

let all_results = futures::future::join_all(futures).await;
for result in all_results {
match result {
// bubble up any cli errors that we came across
Ok(res) => res?,
Err(_) => {
println!("failed to process elasticache resources");
return Err(CliError {
msg: "failed to wait for all elasticache resources to collect data"
.to_string(),
});
}
}
}
}

process_bar.finish();
Ok(())
}

async fn describe_clusters(
elasticache_client: &aws_sdk_elasticache::Client,
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
region: &str,
) -> Result<Vec<ElastiCacheResource>, CliError> {
let mut resources = Vec::new();
let mut elasticache_stream = elasticache_client
.describe_cache_clusters()
.show_cache_node_info(true)
Expand All @@ -175,38 +225,9 @@ async fn process_resources(
chunks.push(chunk.to_owned());
}
for clusters in chunks {
let futures = FuturesUnordered::new();
for cluster in clusters {
let metrics_client_clone = metrics_client.clone();
let region_clone = region.to_string().clone();
let sender_clone = sender.clone();
let metrics_limiter_clone = Arc::clone(&metrics_limiter);
let bar_clone = bar.clone();
let spawn = tokio::spawn(async move {
write_resource(
cluster,
metrics_client_clone,
region_clone.as_str(),
sender_clone,
metrics_limiter_clone,
bar_clone,
)
.await
});
futures.push(spawn);
}
let all_results = futures::future::join_all(futures).await;
for result in all_results {
match result {
// bubble up any cli errors that we came across
Ok(res) => res?,
Err(_) => {
println!("failed to process elasticache resources");
return Err(CliError {
msg: "failed to wait for all elasticache resources to collect data".to_string(),
});
}
}
let cluster_resources = convert_to_resources(cluster, region).await?;
resources.extend(cluster_resources);
}
}
}
Expand All @@ -218,19 +239,14 @@ async fn process_resources(
}
}
}
bar.finish();

Ok(())
Ok(resources)
}

async fn write_resource(
async fn convert_to_resources(
cluster: CacheCluster,
metrics_client: aws_sdk_cloudwatch::Client,
region: &str,
sender: Sender<Resource>,
metrics_limiter: Arc<DefaultDirectRateLimiter>,
bar: ProgressBar,
) -> Result<(), CliError> {
) -> Result<Vec<ElastiCacheResource>, CliError> {
let mut resources = Vec::new();

let cache_cluster_id = cluster.cache_cluster_id.ok_or(CliError {
Expand Down Expand Up @@ -267,14 +283,14 @@ async fn write_resource(
cluster_mode_enabled,
};

let resource = Resource::ElastiCache(ElastiCacheResource {
let resource = ElastiCacheResource {
resource_type: ResourceType::ElastiCacheRedisNode,
region: region.to_string(),
id: cache_cluster_id.clone(),
metrics: vec![],
metric_period_seconds: 0,
metadata,
});
};

resources.push(resource);
}
Expand All @@ -292,14 +308,14 @@ async fn write_resource(
let cache_node_id = node.cache_node_id.ok_or(CliError {
msg: "Cache node has no ID".to_string(),
})?;
let resource = Resource::ElastiCache(ElastiCacheResource {
let resource = ElastiCacheResource {
resource_type: ResourceType::ElastiCacheMemcachedNode,
region: region.to_string(),
id: cache_node_id,
metrics: vec![],
metric_period_seconds: 0,
metadata: metadata.clone(),
});
};
resources.push(resource)
}
}
Expand All @@ -311,25 +327,5 @@ async fn write_resource(
}
};

for resource in resources {
match resource {
Resource::ElastiCache(mut er) => {
er.append_metrics(&metrics_client, Arc::clone(&metrics_limiter))
.await?;
sender
.send(Resource::ElastiCache(er))
.await
.map_err(|err| CliError {
msg: format!("Failed to send elasticache resource: {}", err),
})?;
bar.inc(1);
}
_ => {
return Err(CliError {
msg: "Invalid resource type".to_string(),
});
}
}
}
Ok(())
Ok(resources)
}
21 changes: 14 additions & 7 deletions momento/src/commands/cloud_linter/linter_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use aws_config::{BehaviorVersion, Region};
use flate2::write::GzEncoder;
use flate2::Compression;
use governor::{Quota, RateLimiter};
use indicatif::ProgressBar;
use momento_cli_opts::CloudLinterResources;
use struson::writer::{JsonStreamWriter, JsonWriter};
use tokio::fs::{metadata, File};
Expand Down Expand Up @@ -61,15 +62,21 @@ pub async fn run_cloud_linter(
json_writer.finish_document()?;

// now we compress the json into a .gz file for the customer to upload
let compression_bar = ProgressBar::new_spinner().with_message(format!(
"Compressing and writing to {} and {}.gz",
file_path, file_path
));
compression_bar.enable_steady_tick(Duration::from_millis(100));
let opened_file_tokio = File::open(file_path).await?;
let opened_file = opened_file_tokio.into_std().await;
let mut unzipped_file = BufReader::new(opened_file);
let zipped_file_output_tokio = File::create("linter_results.json.gz").await?;
let zipped_file_output_tokio = File::create(format!("{}.gz", file_path)).await?;
let zipped_file_output = zipped_file_output_tokio.into_std().await;
let mut gz = GzEncoder::new(zipped_file_output, Compression::default());
copy(&mut unzipped_file, &mut gz)?;
gz.finish()?;

compression_bar.finish();
Ok(())
}

Expand Down Expand Up @@ -108,7 +115,7 @@ async fn process_data(
let metrics_limiter = Arc::new(RateLimiter::direct(metrics_quota));

if let Some(resource) = only_collect_for_resource {
match resource {
return match resource {
CloudLinterResources::ApiGateway => {
process_api_gateway_resources(
&config,
Expand All @@ -117,7 +124,7 @@ async fn process_data(
sender.clone(),
)
.await?;
return Ok(());
Ok(())
}
CloudLinterResources::S3 => {
process_s3_resources(
Expand All @@ -127,7 +134,7 @@ async fn process_data(
sender.clone(),
)
.await?;
return Ok(());
Ok(())
}
CloudLinterResources::Dynamo => {
process_ddb_resources(
Expand All @@ -140,7 +147,7 @@ async fn process_data(
enable_gsi,
)
.await?;
return Ok(());
Ok(())
}
CloudLinterResources::ElastiCache => {
process_elasticache_resources(
Expand All @@ -158,9 +165,9 @@ async fn process_data(
sender.clone(),
)
.await?;
return Ok(());
Ok(())
}
}
};
};

process_s3_resources(
Expand Down
3 changes: 1 addition & 2 deletions momento/src/commands/cloud_linter/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async fn process_buckets(
control_plane_limiter: Arc<DefaultDirectRateLimiter>,
) -> Result<(), CliError> {
let process_buckets_bar =
ProgressBar::new((buckets.len()) as u64).with_message("Processing S3 Buckets");
ProgressBar::new(buckets.len() as u64).with_message("Processing S3 Buckets");
process_buckets_bar.set_style(
ProgressStyle::with_template(" {pos:>7}/{len:7} {msg}").expect("invalid template"),
);
Expand All @@ -258,7 +258,6 @@ async fn process_buckets(
let region_clone = region.to_string().clone();
let progress_bar_clone = process_buckets_bar.clone();
let spawn = tokio::spawn(async move {
progress_bar_clone.inc(1);
let res = process_bucket(
s3_client_clone,
bucket,
Expand Down
Loading

0 comments on commit 7664098

Please sign in to comment.