Skip to content

Commit

Permalink
[Console] Implement start/stop for RFS backfill on ECS (#740)
Browse files Browse the repository at this point in the history
* Clean up handling of CommandResults

Signed-off-by: Mikayla Thompson <[email protected]>

* Move the aws utils

Signed-off-by: Mikayla Thompson <[email protected]>

* Add ECS service and implement rfs start/stop

Signed-off-by: Mikayla Thompson <[email protected]>

* Add tests

Signed-off-by: Mikayla Thompson <[email protected]>

* Fix exit code handling

Signed-off-by: Mikayla Thompson <[email protected]>

---------

Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
mikaylathompson authored Jun 19, 2024
1 parent aee343f commit 39cf3b7
Show file tree
Hide file tree
Showing 12 changed files with 753 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import console_link.logic.clusters as logic_clusters
import console_link.logic.metrics as logic_metrics
import console_link.logic.backfill as logic_backfill
from console_link.logic.backfill import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
import logging
Expand Down Expand Up @@ -116,7 +117,7 @@ def create_backfill_cmd(ctx, pipeline_template_file, print_config_only):
exitcode, message = logic_backfill.create(ctx.env.backfill,
pipeline_template_path=pipeline_template_file,
print_config_only=print_config_only)
if exitcode != 0:
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Expand All @@ -126,7 +127,7 @@ def create_backfill_cmd(ctx, pipeline_template_file, print_config_only):
@click.pass_obj
def start_backfill_cmd(ctx, pipeline_name):
exitcode, message = logic_backfill.start(ctx.env.backfill, pipeline_name=pipeline_name)
if exitcode != 0:
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Expand All @@ -136,7 +137,7 @@ def start_backfill_cmd(ctx, pipeline_name):
@click.pass_obj
def stop_backfill_cmd(ctx, pipeline_name):
exitcode, message = logic_backfill.stop(ctx.env.backfill, pipeline_name=pipeline_name)
if exitcode != 0:
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Expand All @@ -146,7 +147,7 @@ def stop_backfill_cmd(ctx, pipeline_name):
@click.pass_obj
def scale_backfill_cmd(ctx, units: int):
exitcode, message = logic_backfill.scale(ctx.env.backfill, units)
if exitcode != 0:
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Expand All @@ -155,7 +156,7 @@ def scale_backfill_cmd(ctx, units: int):
@click.pass_obj
def status_backfill_cmd(ctx):
exitcode, message = logic_backfill.status(ctx.env.backfill)
if exitcode != 0:
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def create(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
return ExitCode.FAILURE, f"Failure when creating backfill: {type(e).__name__} {e}"

if result.success:
return ExitCode.SUCCESS, "Backfill created successfully." + "\n" + result.value
return ExitCode.FAILURE, "Backfill creation failed." + "\n" + result.value
return ExitCode.SUCCESS, "Backfill created successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Backfill creation failed." + "\n" + result.display()


def start(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
Expand All @@ -90,8 +90,8 @@ def start(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
return ExitCode.FAILURE, f"Failure when starting backfill: {type(e).__name__} {e}"

if result.success:
return ExitCode.SUCCESS, "Backfill started successfully." + "\n" + result.value
return ExitCode.FAILURE, "Backfill start failed." + "\n" + result.value
return ExitCode.SUCCESS, "Backfill started successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Backfill start failed." + "\n" + result.display()


def stop(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
Expand All @@ -105,8 +105,8 @@ def stop(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.error(f"Failed to stop backfill: {e}")
return ExitCode.FAILURE, f"Failure when stopping backfill: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, "Backfill stopped successfully." + "\n" + result.value
return ExitCode.FAILURE, "Backfill stop failed." + "\n" + result.value
return ExitCode.SUCCESS, "Backfill stopped successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Backfill stop failed." + "\n" + result.display()


def scale(backfill: Backfill, units: int, *args, **kwargs) -> Tuple[ExitCode, str]:
Expand All @@ -120,20 +120,20 @@ def scale(backfill: Backfill, units: int, *args, **kwargs) -> Tuple[ExitCode, st
logger.error(f"Failed to scale backfill: {e}")
return ExitCode.FAILURE, f"Failure when scaling backfill: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, "Backfill scaled successfully." + "\n" + result.value
return ExitCode.FAILURE, "Backfill scale failed." + "\n" + result.value
return ExitCode.SUCCESS, "Backfill scaled successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Backfill scale failed." + "\n" + result.display()


def status(backfill: Backfill, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Getting backfill status")
try:
result = backfill.get_status(*args, **kwargs)
status = backfill.get_status(*args, **kwargs)
except NotImplementedError:
logger.error(f"Status is not implemented for backfill {type(backfill).__name__}")
return ExitCode.FAILURE, f"Status is not implemented for backfill: {type(backfill).__name__}"
except Exception as e:
logger.error(f"Failed to get status of backfill: {e}")
return ExitCode.FAILURE, f"Failure when getting status of backfill: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, result.value
return ExitCode.FAILURE, "Backfill status retrieval failed." + "\n" + result.value
if status:
return ExitCode.SUCCESS, status.value
return ExitCode.FAILURE, "Backfill status retrieval failed." + "\n" + status
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from console_link.models.cluster import Cluster
from console_link.models.schema_tools import contains_one_of
from console_link.models.command_result import CommandResult
from console_link.models.ecs_service import ECSService

from cerberus import Validator
import boto3

import logging

logger = logging.getLogger(__name__)

DOCKER_RFS_SCHEMA = {
"type": "dict",
Expand Down Expand Up @@ -76,5 +79,16 @@ class ECSRFSBackfill(RFSBackfill):
def __init__(self, config: Dict, target_cluster: Cluster) -> None:
super().__init__(config)
self.target_cluster = target_cluster
self.default_scale = self.config["reindex_from_snapshot"].get("scale", 1)

self.ecs_config = self.config["reindex_from_snapshot"]["ecs"]
self.client = boto3.client("ecs", region_name=self.ecs_config.get("aws_region", None))
self.ecs_client = ECSService(self.ecs_config["cluster_name"], self.ecs_config["service_name"],
self.ecs_config.get("aws_region", None))

def start(self, *args, **kwargs) -> CommandResult:
logger.info(f"Starting RFS backfill by setting desired count to {self.default_scale} instances")
return self.ecs_client.set_desired_count(self.default_scale)

def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping RFS backfill by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@
class CommandResult(NamedTuple):
success: bool
value: Any

def display(self) -> str:
if self.value:
return str(self.value)
return ""
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

import boto3

from console_link.models.command_result import CommandResult
from console_link.models.utils import AWSAPIError, raise_for_aws_api_error


logger = logging.getLogger(__name__)


class ECSService:
def __init__(self, cluster_name, service_name, aws_region=None):
self.cluster_name = cluster_name
self.service_name = service_name
self.aws_region = aws_region

logger.info(f"Creating ECS client for region {aws_region}, if specified")
self.client = boto3.client("ecs", region_name=self.aws_region)

def set_desired_count(self, desired_count: int) -> CommandResult:
logger.info(f"Setting desired count for service {self.service_name} to {desired_count}")
response = self.client.update_service(
cluster=self.cluster_name,
service=self.service_name,
desiredCount=desired_count
)
logger.debug(f"Response from update_service: {response}")

try:
raise_for_aws_api_error(response)
except AWSAPIError as e:
return CommandResult(False, e)

logger.info(f"Service status: {response['service']['status']}")
running_count = response["service"]["runningCount"]
pending_count = response["service"]["pendingCount"]
desired_count = response["service"]["desiredCount"]
return CommandResult(True, f"Service {self.service_name} set to {desired_count} desired count."
f" Currently {running_count} running and {pending_count} pending.")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import boto3
import botocore
from cerberus import Validator
from console_link.logic.utils import raise_for_aws_api_error
from console_link.models.utils import raise_for_aws_api_error
import requests
import logging

Expand Down
Loading

0 comments on commit 39cf3b7

Please sign in to comment.