Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fetch Migration] Enable migration for identical, empty target cluster index #390

Merged
merged 7 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def write_inline_pipeline(pipeline_file_path: str, inline_pipeline: str, inline_


def write_inline_target_host(pipeline_file_path: str, inline_target_host: str):
with open(pipeline_file_path, 'rw') as pipeline_file:
with open(pipeline_file_path, 'r+') as pipeline_file:
pipeline_yaml = yaml.safe_load(pipeline_file)
update_target_host(pipeline_yaml, inline_target_host)
# Note - this does not preserve comments
Expand All @@ -84,7 +84,7 @@ def run(params: FetchOrchestratorParams) -> Optional[int]:
report=True, dryrun=params.is_dry_run)
logging.info("Running metadata migration...\n")
metadata_migration_result = metadata_migration.run(metadata_migration_params)
if len(metadata_migration_result.created_indices) > 0 and not params.is_only_metadata_migration():
if len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration():
# Kick off a subprocess for Data Prepper
logging.info("Running Data Prepper...\n")
proc = subprocess.Popen(dp_exec_path)
Expand Down
35 changes: 35 additions & 0 deletions FetchMigration/python/index_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import utils
from index_operations import SETTINGS_KEY, MAPPINGS_KEY


# Computes and captures differences in indices between a "source" cluster
# and a "target" cluster. Indices that exist on the source cluster but not
# on the target cluster are considered "to-create". "Conflicting" indices
# are present on both source and target clusters, but differ in their index
# settings or mappings.
class IndexDiff:
indices_to_create: set
identical_indices: set
identical_empty_indices: set
conflicting_indices: set

def __init__(self, source: dict, target: dict):
self.identical_empty_indices = set()
self.conflicting_indices = set()
# Compute index names that are present in both the source and target
indices_intersection = set(source.keys()) & set(target.keys())
# Check if these "common" indices are identical or have metadata conflicts
for index in indices_intersection:
# Check settings
if utils.has_differences(SETTINGS_KEY, source[index], target[index]):
self.conflicting_indices.add(index)
# Check mappings
if utils.has_differences(MAPPINGS_KEY, source[index], target[index]):
self.conflicting_indices.add(index)
# Identical indices are the subset that do not have metadata conflicts
self.identical_indices = set(indices_intersection) - set(self.conflicting_indices)
# Indices that are not already on the target need to be created
self.indices_to_create = set(source.keys()) - set(indices_intersection)

def set_identical_empty_indices(self, indices: set):
self.identical_empty_indices = indices
8 changes: 8 additions & 0 deletions FetchMigration/python/index_doc_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass


# Captures the doc_count for indices in a cluster, and also computes a total
@dataclass
class IndexDocCount:
total: int
index_doc_count_map: dict
25 changes: 20 additions & 5 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import jsonpath_ng
import requests

from endpoint_info import EndpointInfo

# Constants
from index_doc_count import IndexDocCount

SETTINGS_KEY = "settings"
MAPPINGS_KEY = "mappings"
COUNT_KEY = "count"
__INDEX_KEY = "index"
__ALL_INDICES_ENDPOINT = "*"
__COUNT_ENDPOINT = "/_count"
__SEARCH_COUNT_PATH = "/_search?size=0"
__SEARCH_COUNT_PAYLOAD = {"aggs": {"count": {"terms": {"field": "_index"}}}}
__TOTAL_COUNT_JSONPATH = jsonpath_ng.parse("$.hits.total.value")
__INDEX_COUNT_JSONPATH = jsonpath_ng.parse("$.aggregations.count.buckets")
__BUCKET_INDEX_NAME_KEY = "key"
__BUCKET_DOC_COUNT_KEY = "doc_count"
__INTERNAL_SETTINGS_KEYS = ["creation_date", "uuid", "provided_name", "version", "store"]


Expand Down Expand Up @@ -43,9 +51,16 @@ def create_indices(indices_data: dict, endpoint: EndpointInfo):
raise RuntimeError(f"Failed to create index [{index}] - {e!s}")


def doc_count(indices: set, endpoint: EndpointInfo) -> int:
count_endpoint_suffix: str = ','.join(indices) + __COUNT_ENDPOINT
def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH
doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix)
resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl())
resp = requests.get(doc_count_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=__SEARCH_COUNT_PAYLOAD)
# TODO Handle resp.status_code for non successful requests
result = dict(resp.json())
return int(result[COUNT_KEY])
total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value
counts_list: list = __INDEX_COUNT_JSONPATH.find(result)[0].value
count_map = dict()
for entry in counts_list:
count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY]
return IndexDocCount(total, count_map)
73 changes: 30 additions & 43 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import argparse
import logging

import yaml

import endpoint_utils
import index_operations
import utils
from index_diff import IndexDiff
from metadata_migration_params import MetadataMigrationParams
from metadata_migration_result import MetadataMigrationResult

Expand All @@ -14,50 +16,29 @@
INDEX_NAME_KEY = "index_name_regex"


def write_output(yaml_data: dict, new_indices: set, output_path: str):
def write_output(yaml_data: dict, indices_to_migrate: set, output_path: str):
pipeline_config = next(iter(yaml_data.values()))
# Result is a tuple of (type, config)
source_config = endpoint_utils.get_supported_endpoint_config(pipeline_config, endpoint_utils.SOURCE_KEY)[1]
source_indices = source_config.get(INDICES_KEY, dict())
included_indices = source_indices.get(INCLUDE_KEY, list())
for index in new_indices:
for index in indices_to_migrate:
included_indices.append({INDEX_NAME_KEY: index})
source_indices[INCLUDE_KEY] = included_indices
source_config[INDICES_KEY] = source_indices
with open(output_path, 'w') as out_file:
yaml.dump(yaml_data, out_file)


# Computes differences in indices between source and target.
# Returns a tuple with 3 elements:
# - The 1st element is the set of indices to create on the target
# - The 2nd element is a set of indices that are identical on source and target
# - The 3rd element is a set of indices that are present on both source and target,
# but differ in their settings or mappings.
def get_index_differences(source: dict, target: dict) -> tuple[set, set, set]:
index_conflicts = set()
indices_in_target = set(source.keys()) & set(target.keys())
for index in indices_in_target:
# Check settings
if utils.has_differences(index_operations.SETTINGS_KEY, source[index], target[index]):
index_conflicts.add(index)
# Check mappings
if utils.has_differences(index_operations.MAPPINGS_KEY, source[index], target[index]):
index_conflicts.add(index)
identical_indices = set(indices_in_target) - set(index_conflicts)
indices_to_create = set(source.keys()) - set(indices_in_target)
return indices_to_create, identical_indices, index_conflicts


# The order of data in the tuple is:
# (indices to create), (identical indices), (indices with conflicts)
def print_report(index_differences: tuple[set, set, set], count: int): # pragma no cover
print("Identical indices in the target cluster (no changes will be made): " +
utils.string_from_set(index_differences[1]))
print("Indices in target cluster with conflicting settings/mappings: " +
utils.string_from_set(index_differences[2]))
print("Indices to create: " + utils.string_from_set(index_differences[0]))
print("Total documents to be moved: " + str(count))
def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover
logging.info("Identical indices in the target cluster: " + utils.string_from_set(diff.identical_indices))
logging.info("Identical empty indices in the target cluster (data will be migrated): " +
utils.string_from_set(diff.identical_empty_indices))
logging.info("Indices present in both clusters with conflicting settings/mappings (data will not be migrated): " +
utils.string_from_set(diff.conflicting_indices))
logging.info("Indices to be created in the target cluster (data will be migrated): " +
utils.string_from_set(diff.indices_to_create))
logging.info("Total number of documents to be moved: " + str(total_doc_count))


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
Expand All @@ -83,23 +64,29 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
return result
target_indices = index_operations.fetch_all_indices(target_endpoint_info)
# Compute index differences and print report
diff = get_index_differences(source_indices, target_indices)
# The first element in the tuple is the set of indices to create
indices_to_create = diff[0]
if indices_to_create:
result.created_indices = indices_to_create
result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
diff = IndexDiff(source_indices, target_indices)
if diff.identical_indices:
# Identical indices with zero documents on the target are eligible for migration
target_doc_count = index_operations.doc_count(diff.identical_indices, target_endpoint_info)
# doc_count only returns indices that have non-zero counts, so the difference in responses
# gives us the set of identical, empty indices
result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys())
diff.set_identical_empty_indices(result.migration_indices)
if diff.indices_to_create:
result.migration_indices.update(diff.indices_to_create)
if result.migration_indices:
doc_count_result = index_operations.doc_count(result.migration_indices, source_endpoint_info)
result.target_doc_count = doc_count_result.total
if args.report:
print_report(diff, result.target_doc_count)
if indices_to_create:
if result.migration_indices:
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, indices_to_create, args.output_file)
if args.report: # pragma no cover
print("Wrote output YAML pipeline to: " + args.output_file)
write_output(dp_config, result.migration_indices, args.output_file)
logging.debug("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
index_data = dict()
for index_name in indices_to_create:
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
index_operations.create_indices(index_data, target_endpoint_info)
return result
Expand Down
3 changes: 2 additions & 1 deletion FetchMigration/python/metadata_migration_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
@dataclass
class MetadataMigrationResult:
target_doc_count: int = 0
created_indices: set = field(default_factory=set)
# Set of indices for which data needs to be migrated
migration_indices: set = field(default_factory=set)
4 changes: 2 additions & 2 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,6 @@ def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_i
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
namespace = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
logging.info("\n##### Starting monitor tool... #####\n")
run(MigrationMonitorParams(namespace.target_count, namespace.data_prepper_endpoint))
print("\n##### Ending monitor tool... #####\n")
logging.info("\n##### Ending monitor tool... #####\n")
1 change: 1 addition & 0 deletions FetchMigration/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
botocore>=1.31.70
jsondiff>=2.0.0
jsonpath-ng>=1.6.0
prometheus-client>=0.17.1
pyyaml>=6.0.1
requests>=2.31.0
Expand Down
5 changes: 5 additions & 0 deletions FetchMigration/python/tests/test_endpoint_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from moto import mock_iam

import endpoint_utils
from endpoint_info import EndpointInfo
from tests import test_constants

# Constants
Expand Down Expand Up @@ -71,6 +72,10 @@ def test_is_insecure_missing_nested(self):
test_input = {"key1": 123, CONNECTION_KEY: {"key2": "val"}}
self.assertFalse(endpoint_utils.is_insecure(test_input))

def test_auth_normalized_url(self):
val = EndpointInfo("test")
self.assertEqual("test/", val.get_url())

def test_get_auth_returns_none(self):
# The following inputs should not return an auth tuple:
# - Empty input
Expand Down
2 changes: 1 addition & 1 deletion FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def test_write_inline_target_host(self, mock_file_open: MagicMock, mock_yaml_loa
mock_file_open.reset_mock()
mock_yaml_dump.reset_mock()
orchestrator.write_inline_target_host("test", val)
mock_file_open.assert_called_once_with("test", "rw")
mock_file_open.assert_called_once_with("test", "r+")
mock_yaml_dump.assert_called_once_with(expected_pipeline, ANY)

def test_update_target_host_bad_config(self):
Expand Down
73 changes: 73 additions & 0 deletions FetchMigration/python/tests/test_index_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import copy
import unittest

from index_diff import IndexDiff
from tests import test_constants


class TestIndexDiff(unittest.TestCase):
def test_index_diff_empty(self):
# Base case should return an empty list
diff = IndexDiff(dict(), dict())
# All members should be empty
self.assertEqual(set(), diff.indices_to_create)
self.assertEqual(set(), diff.identical_indices)
self.assertEqual(set(), diff.conflicting_indices)

def test_index_diff_empty_target(self):
diff = IndexDiff(test_constants.BASE_INDICES_DATA, dict())
# No conflicts or identical indices
self.assertEqual(set(), diff.conflicting_indices)
self.assertEqual(set(), diff.identical_indices)
# Indices-to-create
self.assertEqual(3, len(diff.indices_to_create))
self.assertTrue(test_constants.INDEX1_NAME in diff.indices_to_create)
self.assertTrue(test_constants.INDEX2_NAME in diff.indices_to_create)
self.assertTrue(test_constants.INDEX3_NAME in diff.indices_to_create)

def test_index_diff_identical_index(self):
test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA)
del test_data[test_constants.INDEX2_NAME]
del test_data[test_constants.INDEX3_NAME]
diff = IndexDiff(test_data, test_data)
# No indices to move, or conflicts
self.assertEqual(set(), diff.indices_to_create)
self.assertEqual(set(), diff.conflicting_indices)
# Identical indices
self.assertEqual(1, len(diff.identical_indices))
self.assertTrue(test_constants.INDEX1_NAME in diff.identical_indices)

def test_index_diff_settings_conflict(self):
test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA)
# Set up conflict in settings
index_settings = test_data[test_constants.INDEX2_NAME][test_constants.SETTINGS_KEY]
index_settings[test_constants.INDEX_KEY][test_constants.NUM_REPLICAS_SETTING] += 1
diff = IndexDiff(test_constants.BASE_INDICES_DATA, test_data)
# No indices to move
self.assertEqual(set(), diff.indices_to_create)
# Identical indices
self.assertEqual(2, len(diff.identical_indices))
self.assertTrue(test_constants.INDEX1_NAME in diff.identical_indices)
self.assertTrue(test_constants.INDEX3_NAME in diff.identical_indices)
# Conflicting indices
self.assertEqual(1, len(diff.conflicting_indices))
self.assertTrue(test_constants.INDEX2_NAME in diff.conflicting_indices)

def test_index_diff_mappings_conflict(self):
test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA)
# Set up conflict in mappings
test_data[test_constants.INDEX3_NAME][test_constants.MAPPINGS_KEY] = {}
diff = IndexDiff(test_constants.BASE_INDICES_DATA, test_data)
# No indices to move
self.assertEqual(set(), diff.indices_to_create)
# Identical indices
self.assertEqual(2, len(diff.identical_indices))
self.assertTrue(test_constants.INDEX1_NAME in diff.identical_indices)
self.assertTrue(test_constants.INDEX2_NAME in diff.identical_indices)
# Conflicting indices
self.assertEqual(1, len(diff.conflicting_indices))
self.assertTrue(test_constants.INDEX3_NAME in diff.conflicting_indices)


if __name__ == '__main__':
unittest.main()
14 changes: 10 additions & 4 deletions FetchMigration/python/tests/test_index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ def test_create_indices_exception(self):
@responses.activate
def test_doc_count(self):
test_indices = {test_constants.INDEX1_NAME, test_constants.INDEX2_NAME}
expected_count_endpoint = test_constants.SOURCE_ENDPOINT + ",".join(test_indices) + "/_count"
mock_count_response = {"count": "10"}
index_doc_count: int = 5
test_buckets = list()
for index_name in test_indices:
test_buckets.append({"key": index_name, "doc_count": index_doc_count})
total_docs: int = index_doc_count * len(test_buckets)
expected_count_endpoint = test_constants.SOURCE_ENDPOINT + ",".join(test_indices) + "/_search?size=0"
mock_count_response = {"hits": {"total": {"value": total_docs}},
"aggregations": {"count": {"buckets": test_buckets}}}
responses.get(expected_count_endpoint, json=mock_count_response)
# Now send request
count_value = index_operations.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(10, count_value)
doc_count_result = index_operations.doc_count(test_indices, EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(total_docs, doc_count_result.total)


if __name__ == '__main__':
Expand Down
Loading
Loading