Skip to content

Commit

Permalink
Add Backfill basic integ tests / Refactor integ tests to use Console …
Browse files Browse the repository at this point in the history
…Library (#759)

--Add Backfill basic integ tests
Lays the groundwork for generic backfill integration tests (usable across any backfill migration) that operate by using the Console Library. These tests currently expect a single execution flow of a backfill migration, that is all data will be preloaded onto the source cluster as needed by the different tests, the backfill migration will be kicked off, and the test cases will verify the result. This keeps the execution time at a reasonable rate (not starting,stopping containers multiple times) for our common tests, with my expectation being that in the future we have more targeted tests cases which need to control this flow and can operate independently.

--Refactor all integ tests to use Console Library
This change involved moving our integ tests to be a library on the Migration Console. Since we do plan on having the Migration Console execute these tests this does make some sense, but the larger goals this enables were to be able to have these tests available from startup on the Migration Console (allowing us to remove some troublesome code that tries to pull this with git) as well as allow these tests to easily make use of the Console Library for performing operations (a desperately needed addition to our tests)
---------

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn authored Jul 8, 2024
1 parent ad3fc40 commit c45b991
Show file tree
Hide file tree
Showing 38 changed files with 2,089 additions and 944 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,10 @@ jobs:
run: ./gradlew -p TrafficCapture dockerSolution:ComposeUp -x test --scan --info --stacktrace
env:
OS_MIGRATIONS_GRADLE_SCAN_TOS_AGREE_AND_ENABLED: ''
- name: Install python dependencies
working-directory: test
run: |
python -m pip install --upgrade pipenv
pipenv install --deploy --dev
- name: Run E2E test script
working-directory: test
working-directory: TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test
run: |
pipenv run pytest tests.py --unique_id="testindex"
docker exec $(docker ps --filter "name=migration-console" -q) pytest /root/lib/integ_test/integ_test/replayer_tests.py --unique_id="testindex" -s
- name: Clean up migrations docker images before caching
run: |
docker stop $(docker ps -q) && docker rm $(docker ps -aq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ COPY kafkaCmdRef.md /root/kafka-tools
COPY humanReadableLogs.py /root/
RUN chmod ug+x /root/humanReadableLogs.py

COPY setupIntegTests.sh /root/
RUN chmod ug+x /root/setupIntegTests.sh

COPY showFetchMigrationCommand.sh /root/
RUN chmod ug+x /root/showFetchMigrationCommand.sh

Expand All @@ -39,6 +36,9 @@ RUN chmod ug+x /root/loadServicesFromParameterStore.sh
COPY lib /root/lib
WORKDIR /root/lib/console_link
RUN pipenv install --system --deploy --ignore-pipfile
WORKDIR /root/lib/integ_test
RUN pipenv install --system --deploy --ignore-pipfile


# Experimental console API, not started by default
COPY console_api /root/console_api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ services:
volumes:
- ./lib/console_link/services.yaml:/etc/migration_services.yaml
# this is a convenience thing for testing -- it should be removed before this makes it to prod.
- ./lib/console_link:/root/lib/console_link
- ./lib:/root/lib
- ~/.aws:/root/.aws
environment:
# Copy local AWS env to Docker container
#- ~/.aws:/root/.aws
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
- AWS_DEFAULT_REGION=us-east-1
- API_ALLOWED_HOSTS=localhost
ports:
- "8000:8000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ name = "pypi"

[packages]
requests = ">=2.32.3"
opensearch-benchmark = "*"
boto3 = "*"
pyyaml = "*"
Click = "*"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,28 @@ def cluster_group(ctx):


@cluster_group.command(name="cat-indices")
@click.option("--refresh", is_flag=True, default=False)
@click.pass_obj
def cat_indices_cmd(ctx):
def cat_indices_cmd(ctx, refresh):
"""Simple program that calls `_cat/indices` on both a source and target cluster."""
if ctx.json:
click.echo(
json.dumps(
{
"source_cluster": logic_clusters.cat_indices(
ctx.env.source_cluster, as_json=True
ctx.env.source_cluster, as_json=True, refresh=refresh
),
"target_cluster": logic_clusters.cat_indices(
ctx.env.target_cluster, as_json=True
ctx.env.target_cluster, as_json=True, refresh=refresh
),
}
)
)
return
click.echo("SOURCE CLUSTER")
click.echo(logic_clusters.cat_indices(ctx.env.source_cluster))
click.echo(logic_clusters.cat_indices(ctx.env.source_cluster, refresh=refresh))
click.echo("TARGET CLUSTER")
click.echo(logic_clusters.cat_indices(ctx.env.target_cluster))
click.echo(logic_clusters.cat_indices(ctx.env.target_cluster, refresh=refresh))


@cluster_group.command(name="connection-check")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot
from console_link.models.replayer_base import Replayer
from console_link.models.replayer_ecs import ECSReplayer
from console_link.models.replayer_docker import DockerReplayer
from console_link.models.kafka import Kafka, MSK, StandardKafka
import yaml
from cerberus import Validator
Expand All @@ -26,7 +27,7 @@ def get_snapshot(config: Dict, source_cluster: Cluster):
def get_replayer(config: Dict):
if 'ecs' in config:
return ECSReplayer(config)
raise ValueError("Invalid replayer config")
return DockerReplayer(config)


def get_kafka(config: Dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,16 @@ class ConnectionResult:
cluster_version: str


def cat_indices(cluster: Cluster, as_json=False):
def call_api(cluster: Cluster, path: str, method=HttpMethod.GET, data=None, headers=None, timeout=None,
session=None, raise_error=False):
r = cluster.call_api(path=path, method=method, data=data, headers=headers, timeout=timeout, session=session,
raise_error=raise_error)
return r


def cat_indices(cluster: Cluster, refresh=False, as_json=False):
if refresh:
cluster.call_api('/_refresh')
as_json_suffix = "?format=json" if as_json else "?v"
cat_indices_path = f"/_cat/indices/_all{as_json_suffix}"
r = cluster.call_api(cat_indices_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def start(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
except Exception as e:
logger.error(f"Failed to start replayer: {e}")
return ExitCode.FAILURE, f"Failure when starting replayer: {type(e).__name__} {e}"

if result.success:
return ExitCode.SUCCESS, "Replayer started successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer start failed." + "\n" + result.display()
Expand Down Expand Up @@ -64,13 +64,13 @@ def scale(replayer: Replayer, units: int, *args, **kwargs) -> Tuple[ExitCode, st
def status(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Getting replayer status")
try:
status = replayer.get_status(*args, **kwargs)
result = replayer.get_status(*args, **kwargs)
except NotImplementedError:
logger.error(f"Status is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Status is not implemented for replayer: {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to get status of replayer: {e}")
return ExitCode.FAILURE, f"Failure when getting status of replayer: {type(e).__name__} {e}"
if status:
return ExitCode.SUCCESS, status.value
return ExitCode.FAILURE, "Replayer status retrieval failed." + "\n" + status
if result.success:
return ExitCode.SUCCESS, result.value[0].name
return ExitCode.FAILURE, "Replayer status retrieval failed." + "\n" + result.value[1]
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ def create(snapshot: Snapshot, *args, **kwargs) -> CommandResult:
def status(snapshot: Snapshot, *args, **kwargs) -> CommandResult:
logger.info("Getting snapshot status")
return snapshot.status(*args, **kwargs)


def delete(snapshot: Snapshot, *args, **kwargs) -> CommandResult:
logger.info(f"Deleting snapshot with {args=} and {kwargs=}")
try:
return snapshot.delete(*args, **kwargs)
except Exception as e:
logger.error(f"Failure running delete snapshot: {e}")
return CommandResult(status=False, message=f"Failure running delete snapshot: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ def stop(self, *args, **kwargs) -> CommandResult:
def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

def get_status(self, deep_check, *args, **kwargs) -> CommandResult:
logger.info(f"Getting status of RFS backfill, with {deep_check=}")
instance_statuses = self.ecs_client.get_instance_statuses()
if not instance_statuses:
return CommandResult(False, "Failed to get instance statuses")

status_string = str(instance_statuses)
if deep_check:
try:
Expand Down Expand Up @@ -185,7 +185,7 @@ def _get_detailed_status(self) -> Optional[str]:

def parse_query_response(query: dict, cluster: Cluster, label: str) -> Optional[int]:
try:
response = cluster.call_api("/.migrations_working_state/_search", json_body=query)
response = cluster.call_api("/.migrations_working_state/_search", data=query)
except Exception as e:
logger.error(f"Failed to execute query: {e}")
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def __init__(self, config: Dict) -> None:
raise ValueError("Invalid config file for cluster", v.errors)

self.endpoint = config["endpoint"]
self.allow_insecure = config.get("allow_insecure", False)
self.allow_insecure = config.get("allow_insecure", False) if self.endpoint.startswith(
"https") else config.get("allow_insecure", True)
if 'no_auth' in config:
self.auth_type = AuthMethod.NO_AUTH
elif 'basic_auth' in config:
Expand All @@ -85,11 +86,13 @@ def __init__(self, config: Dict) -> None:
elif 'sigv4' in config:
self.auth_type = AuthMethod.SIGV4

def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None,
json_body=None, **kwargs) -> requests.Response:
def call_api(self, path, method: HttpMethod = HttpMethod.GET, data=None, headers=None,
timeout=None, session=None, raise_error=True, **kwargs) -> requests.Response:
"""
Calls an API on the cluster.
"""
if session is None:
session = requests.Session()
if self.auth_type == AuthMethod.BASIC_AUTH:
assert self.auth_details is not None # for mypy's sake
auth = HTTPBasicAuth(
Expand All @@ -101,38 +104,34 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None,
else:
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")

if json_body is not None:
data = json_body
else:
data = None

logger.info(f"Making api call to {self.endpoint}{path}")

# Extract query parameters from kwargs
params = kwargs.get('params', {})

r = requests.request(

logger.info(f"Performing request: {method.name} {self.endpoint}{path}")
r = session.request(
method.name,
f"{self.endpoint}{path}",
params=params,
verify=(not self.allow_insecure),
params=params,
auth=auth,
timeout=timeout,
json=data
data=data,
headers=headers,
timeout=timeout
)
logger.debug(f"Cluster API call request: {r.request}")
r.raise_for_status()
logger.info(f"Received response: {r.status_code} {method.name} {self.endpoint}{path} - {r.text[:1000]}")
if raise_error:
r.raise_for_status()
return r

def execute_benchmark_workload(self, workload: str,
workload_params='target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,'
'search_clients:1'):
client_options = ""
client_options = "verify_certs:false"
if not self.allow_insecure:
client_options += "use_ssl:true,verify_certs:false"
client_options += ",use_ssl:true"
if self.auth_type == AuthMethod.BASIC_AUTH:
if self.auth_details['password'] is not None:
client_options += (f"basic_auth_user:{self.auth_details['username']},"
client_options += (f",basic_auth_user:{self.auth_details['username']},"
f"basic_auth_password:{self.auth_details['password']}")
else:
raise NotImplementedError(f"Auth type {self.auth_type} with AWS Secret ARN is not currently support "
Expand All @@ -142,7 +141,8 @@ def execute_benchmark_workload(self, workload: str,
f"benchmark workloads")
# Note -- we should censor the password when logging this command
logger.info(f"Running opensearch-benchmark with '{workload}' workload")
subprocess.run(f"opensearch-benchmark execute-test --distribution-version=1.0.0 "
f"--target-host={self.endpoint} --workload={workload} --pipeline=benchmark-only --test-mode "
f"--kill-running-processes --workload-params={workload_params} "
f"--client-options={client_options}", shell=True)
command = (f"opensearch-benchmark execute-test --distribution-version=1.0.0 --target-host={self.endpoint} "
f"--workload={workload} --pipeline=benchmark-only --test-mode --kill-running-processes "
f"--workload-params={workload_params} --client-options={client_options}")
logger.info(f"Executing command: {command}")
subprocess.run(command, shell=True)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

DOCKER_REPLAY_SCHEMA = {
"type": "dict",
"nullable": True,
"schema": {
"socket": {"type": "string", "required": False}
}
Expand Down Expand Up @@ -36,7 +37,7 @@
}


ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "RUNNING", "STOPPED", "FAILED"])
ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "STARTING", "RUNNING", "STOPPED", "FAILED"])


class Replayer(ABC):
Expand All @@ -62,7 +63,7 @@ def stop(self, *args, **kwargs) -> CommandResult:
pass

@abstractmethod
def get_status(self, *args, **kwargs) -> ReplayStatus:
def get_status(self, *args, **kwargs) -> CommandResult:
"""Return a status"""
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Dict
from console_link.models.command_result import CommandResult
from console_link.models.replayer_base import Replayer, ReplayStatus

import logging

logger = logging.getLogger(__name__)


class DockerReplayer(Replayer):
def __init__(self, config: Dict) -> None:
super().__init__(config)

def start(self, *args, **kwargs) -> CommandResult:
logger.warning("Start command is not implemented for Docker Replayer")
return CommandResult(success=True, value="No action performed, action is unimplemented")

def stop(self, *args, **kwargs) -> CommandResult:
logger.warning("Stop command is not implemented for Docker Replayer")
return CommandResult(success=True, value="No action performed, action is unimplemented")

def get_status(self, *args, **kwargs) -> CommandResult:
logger.warning("Get status command is not implemented for Docker Replayer and "
"always assumes service is running")
return CommandResult(True, (ReplayStatus.RUNNING, "Docker Replayer is assumed to be running"))

def scale(self, units: int, *args, **kwargs) -> CommandResult:
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,19 @@ def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping ECS replayer by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def get_status(self, *args, **kwargs) -> ReplayStatus:
raise NotImplementedError()
def get_status(self, *args, **kwargs) -> CommandResult:
# Simple implementation that only checks ECS service status currently
instance_statuses = self.ecs_client.get_instance_statuses()
if not instance_statuses:
return CommandResult(False, "Failed to get instance statuses")

status_string = str(instance_statuses)

if instance_statuses.running > 0:
return CommandResult(True, (ReplayStatus.RUNNING, status_string))
elif instance_statuses.pending > 0:
return CommandResult(True, (ReplayStatus.STARTING, status_string))
return CommandResult(True, (ReplayStatus.STOPPED, status_string))

def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling ECS replayer by setting desired count to {units} instances")
Expand Down
Loading

0 comments on commit c45b991

Please sign in to comment.