Skip to content

Commit

Permalink
[Fetch Migration] [Refactoring] Update module names and change source…
Browse files Browse the repository at this point in the history
… directory (#292)

This change consists of several commits that refactor the Fetch Migration codebase. No functionality has been added.
* The source code path is now FetchMigration/python instead of FetchMigration/index_configuration_tool (which was a misleading name given the expanded functionality of Fetch Migration)
* main.py has been renamed to pre_migration.py to clarify that these are pre-migration steps
* monitor.py has been renamed to migration_monitor.py for clarity
* Finally, the central methods of pre_migration and migration_monitor (run) now accept dataclasses for input parameters and return a dataclass as a result. This decouples the business logic from the ArgParse interface logic.

Signed-off-by: Kartik Ganesh <[email protected]>

---------

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Sep 5, 2023
1 parent 81f8a99 commit d5444e5
Show file tree
Hide file tree
Showing 21 changed files with 114 additions and 112 deletions.
4 changes: 2 additions & 2 deletions FetchMigration/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM opensearch-data-prepper:2.4.0-SNAPSHOT
COPY index_configuration_tool/requirements.txt .
COPY python/requirements.txt .

# Install dependencies to local user directory
RUN apk update
Expand All @@ -9,7 +9,7 @@ RUN pip install --user -r requirements.txt
ENV ICT_CODE_PATH /code
WORKDIR $ICT_CODE_PATH
# Copy only source code
COPY ./index_configuration_tool/*.py .
COPY python/*.py .

# update PATH
ENV PATH=/root/.local:$PATH
Expand Down
2 changes: 1 addition & 1 deletion FetchMigration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ python -m pip install -r index_configuration_tool/requirements.txt
After [setup](#setup), the tool can be executed using:

```shell
python index_configuration_tool/main.py <pipeline_yaml_path> <output_file>
python index_configuration_tool/pre_migration.py <pipeline_yaml_path> <output_file>
```

### Docker
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
Expand Down Expand Up @@ -53,7 +54,7 @@ def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_par
return False


def run(args: argparse.Namespace, wait_seconds: int) -> None:
def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
Expand Down Expand Up @@ -97,7 +98,7 @@ def run(args: argparse.Namespace, wait_seconds: int) -> None:
type=int,
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
cli_args = arg_parser.parse_args()
namespace = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(cli_args, 30)
run(MigrationMonitorParams(namespace.target_count, namespace.dp_endpoint))
print("\n##### Ending monitor tool... #####\n")
7 changes: 7 additions & 0 deletions FetchMigration/python/migration_monitor_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class MigrationMonitorParams:
target_count: int
dp_endpoint: str = "https://localhost:4900"
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

# Constants
from endpoint_info import EndpointInfo
from pre_migration_params import PreMigrationParams
from pre_migration_result import PreMigrationResult

SUPPORTED_ENDPOINTS = ["opensearch", "elasticsearch"]
SOURCE_KEY = "source"
Expand Down Expand Up @@ -143,12 +145,6 @@ def print_report(index_differences: tuple[set, set, set], count: int): # pragma
print("Total documents to be moved: " + str(count))


def dump_count_and_indices(count: int, indices: set): # pragma no cover
print(count)
for index_name in indices:
print(index_name)


def compute_endpoint_and_fetch_indices(config: dict, key: str) -> tuple[EndpointInfo, dict]:
endpoint = get_supported_endpoint(config, key)
# Endpoint is a tuple of (type, config)
Expand All @@ -157,7 +153,7 @@ def compute_endpoint_and_fetch_indices(config: dict, key: str) -> tuple[Endpoint
return endpoint_info, indices


def run(args: argparse.Namespace) -> None:
def run(args: PreMigrationParams) -> PreMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
raise ValueError("No output file specified")
Expand All @@ -174,14 +170,13 @@ def run(args: argparse.Namespace) -> None:
diff = get_index_differences(source_indices, target_indices)
# The first element in the tuple is the set of indices to create
indices_to_create = diff[0]
doc_count = 0
result = PreMigrationResult()
if indices_to_create:
doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
result.created_indices = indices_to_create
result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
if args.report:
print_report(diff, doc_count)
print_report(diff, result.target_doc_count)
if indices_to_create:
if not args.report:
dump_count_and_indices(doc_count, indices_to_create)
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, indices_to_create, args.output_file)
Expand All @@ -192,12 +187,13 @@ def run(args: argparse.Namespace) -> None:
for index_name in indices_to_create:
index_data[index_name] = source_indices[index_name]
index_operations.create_indices(index_data, target_endpoint_info)
return result


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python main.py",
prog="python pre_migration.py",
description="This tool creates indices on a target cluster based on the contents of a source cluster.\n" +
"The first input to the tool is a path to a Data Prepper pipeline YAML file, which is parsed to obtain " +
"the source and target cluster endpoints.\nThe second input is an output path to which a modified version " +
Expand All @@ -223,4 +219,5 @@ def run(args: argparse.Namespace) -> None:
help="Print a report of the index differences")
arg_parser.add_argument("--dryrun", action="store_true",
help="Skips the actual creation of indices on the target cluster")
run(arg_parser.parse_args())
namespace = arg_parser.parse_args()
run(PreMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun))
9 changes: 9 additions & 0 deletions FetchMigration/python/pre_migration_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass


@dataclass
class PreMigrationParams:
config_file_path: str
output_file: str = ""
report: bool = False
dryrun: bool = False
7 changes: 7 additions & 0 deletions FetchMigration/python/pre_migration_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass, field


@dataclass
class PreMigrationResult:
target_doc_count: int = 0
created_indices: set = field(default_factory=set)
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import argparse
import unittest
from unittest.mock import patch, MagicMock, PropertyMock

import requests
import responses
from prometheus_client.parser import text_string_to_metric_families

import monitor
import migration_monitor
from endpoint_info import EndpointInfo

# Constants
from migration_monitor_params import MigrationMonitorParams

TEST_ENDPOINT = "test"
TEST_AUTH = ("user", "pass")
TEST_FLAG = False
Expand All @@ -20,12 +21,12 @@
+ TEST_METRIC_NAME + "{serviceName=\"unittest\",} " + str(TEST_METRIC_VALUE)


class TestMonitor(unittest.TestCase):
class TestMigrationMonitor(unittest.TestCase):
@patch('requests.post')
def test_shutdown(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
monitor.shutdown_pipeline(test_endpoint)
migration_monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)

@patch('requests.get')
Expand All @@ -38,7 +39,7 @@ def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
type(mock_response).content = mock_content
mock_get.return_value = mock_response
# Test fetch
raw_metrics_list = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
raw_metrics_list = migration_monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
mock_get.assert_called_once_with(expected_url, auth=None, verify=True)
self.assertEqual(1, len(raw_metrics_list))
test_metric = raw_metrics_list[0]
Expand All @@ -57,38 +58,36 @@ def test_fetch_prometheus_metrics_failure(self):
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
responses.get(expected_url, body=requests.Timeout())
# Test fetch
result = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
result = migration_monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
self.assertIsNone(result)

def test_get_metric_value(self):
# Return value is an int
expected_val = int(TEST_METRIC_VALUE)
test_input = list(text_string_to_metric_families(TEST_PROMETHEUS_METRIC_STRING))
# Should fetch by suffix
val = monitor.get_metric_value(test_input, "metric")
val = migration_monitor.get_metric_value(test_input, "metric")
self.assertEqual(expected_val, val)
# No matching metric returns None
val = monitor.get_metric_value(test_input, "invalid")
val = migration_monitor.get_metric_value(test_input, "invalid")
self.assertEqual(None, val)

@patch('monitor.shutdown_pipeline')
@patch('migration_monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
@patch('migration_monitor.check_if_complete')
@patch('migration_monitor.get_metric_value')
@patch('migration_monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock,
mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
# The param values don't matter since we've mocked the check method
test_input = MigrationMonitorParams(1, "test")
mock_get.return_value = None
# Check will first fail, then pass
mock_check.side_effect = [False, True]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
migration_monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
Expand All @@ -97,25 +96,23 @@ def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: Magic
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

@patch('monitor.shutdown_pipeline')
@patch('migration_monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
@patch('migration_monitor.check_if_complete')
@patch('migration_monitor.get_metric_value')
@patch('migration_monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock,
mock_sleep: MagicMock, mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
# The param values don't matter since we've mocked the check method
test_input = MigrationMonitorParams(1, "test")
mock_get.return_value = None
mock_check.return_value = True
# Fetch call will first fail, then succeed
mock_fetch.side_effect = [None, MagicMock()]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
migration_monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
Expand All @@ -126,17 +123,17 @@ def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock

def test_check_if_complete(self):
# If any of the optional values are missing, we are not complete
self.assertFalse(monitor.check_if_complete(None, 0, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, None, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, 0, None, 0, 2))
self.assertFalse(migration_monitor.check_if_complete(None, 0, 1, 0, 2))
self.assertFalse(migration_monitor.check_if_complete(2, None, 1, 0, 2))
self.assertFalse(migration_monitor.check_if_complete(2, 0, None, 0, 2))
# Target count not reached
self.assertFalse(monitor.check_if_complete(1, None, None, 0, 2))
self.assertFalse(migration_monitor.check_if_complete(1, None, None, 0, 2))
# Target count reached, but has records in flight
self.assertFalse(monitor.check_if_complete(2, 1, None, 0, 2))
self.assertFalse(migration_monitor.check_if_complete(2, 1, None, 0, 2))
# Target count reached, no records in flight, but no prev no_part_count
self.assertFalse(monitor.check_if_complete(2, 0, 1, 0, 2))
self.assertFalse(migration_monitor.check_if_complete(2, 0, 1, 0, 2))
# Terminal state
self.assertTrue(monitor.check_if_complete(2, 0, 2, 1, 2))
self.assertTrue(migration_monitor.check_if_complete(2, 0, 2, 1, 2))


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit d5444e5

Please sign in to comment.