Skip to content

Commit

Permalink
[Fetch Migration] Custom exceptions and other readability improvements (
Browse files Browse the repository at this point in the history
#506)

This PR incorporates two pieces of prior feedback:

1. The "index_operations.py" file has been renamed to "index_management.py" to eliminate confusion on whether the operations in this file involve indexing of data (they don't).

2. Custom exceptions have been created for "index_management.py" and "metadata_migration.py" to use instead of the generic "RuntimeError" class to help disambiguate errors in the call stack.

This change also includes a minor bugfix to the "showFetchMigrationCommand.sh" script to move default value declaration so that command-line arguments are parsed correctly.

---------

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Feb 13, 2024
1 parent 8c6fa53 commit d3820af
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 96 deletions.
22 changes: 22 additions & 0 deletions FetchMigration/python/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

class RequestError(RuntimeError):
def __init__(self, message=None):
super().__init__(message)


class IndexManagementError(RuntimeError):
def __init__(self, message=None):
super().__init__(message)


class MetadataMigrationError(RuntimeError):
def __init__(self, message=None):
super().__init__(message)
2 changes: 1 addition & 1 deletion FetchMigration/python/index_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#

import utils
from index_operations import SETTINGS_KEY, MAPPINGS_KEY
from index_management import SETTINGS_KEY, MAPPINGS_KEY


# Computes and captures differences in indices between a "source" cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from component_template_info import ComponentTemplateInfo
from endpoint_info import EndpointInfo
from exceptions import IndexManagementError, RequestError
from index_doc_count import IndexDocCount
from index_template_info import IndexTemplateInfo

Expand Down Expand Up @@ -46,21 +47,21 @@ def __send_get_request(url: str, endpoint: EndpointInfo, payload: Optional[dict]
timeout=__TIMEOUT_SECONDS)
resp.raise_for_status()
return resp
except requests.ConnectionError:
raise RuntimeError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}")
except requests.ConnectionError as e:
raise RequestError(f"ConnectionError on GET request to cluster endpoint: {endpoint.get_url()}") from e
except requests.HTTPError as e:
raise RuntimeError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()} - {e!s}")
except requests.Timeout:
raise RequestError(f"HTTPError on GET request to cluster endpoint: {endpoint.get_url()}") from e
except requests.Timeout as e:
# TODO retry mechanism
raise RuntimeError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}")
raise RequestError(f"Timed out on GET request to cluster endpoint: {endpoint.get_url()}") from e
except requests.exceptions.RequestException as e:
raise RuntimeError(f"GET request failure to cluster endpoint: {endpoint.get_url()} - {e!s}")
raise RequestError(f"GET request failure to cluster endpoint: {endpoint.get_url()}") from e


def fetch_all_indices(endpoint: EndpointInfo) -> dict:
all_indices_url: str = endpoint.add_path(__ALL_INDICES_ENDPOINT)
try:
# raises RuntimeError in case of any request errors
# raises RequestError in case of any request errors
resp = __send_get_request(all_indices_url, endpoint)
result = dict(resp.json())
for index in list(result.keys()):
Expand All @@ -74,8 +75,8 @@ def fetch_all_indices(endpoint: EndpointInfo) -> dict:
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
return result
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch metadata from cluster endpoint: {e!s}")
except RequestError as e:
raise IndexManagementError("Failed to fetch metadata from cluster endpoint") from e


def create_indices(indices_data: dict, endpoint: EndpointInfo) -> dict:
Expand All @@ -101,7 +102,7 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
count_endpoint_suffix: str = ','.join(indices) + __SEARCH_COUNT_PATH
doc_count_endpoint: str = endpoint.add_path(count_endpoint_suffix)
try:
# raises RuntimeError in case of any request errors
# raises RequestError in case of any request errors
resp = __send_get_request(doc_count_endpoint, endpoint, __SEARCH_COUNT_PAYLOAD)
result = dict(resp.json())
total: int = __TOTAL_COUNT_JSONPATH.find(result)[0].value
Expand All @@ -110,41 +111,41 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
for entry in counts_list:
count_map[entry[__BUCKET_INDEX_NAME_KEY]] = entry[__BUCKET_DOC_COUNT_KEY]
return IndexDocCount(total, count_map)
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch doc_count: {e!s}")
except RequestError as e:
raise IndexManagementError("Failed to fetch doc_count") from e


def __fetch_templates(endpoint: EndpointInfo, path: str, root_key: str, factory) -> set:
url: str = endpoint.add_path(path)
# raises RuntimeError in case of any request errors
# raises RequestError in case of any request errors
try:
resp = __send_get_request(url, endpoint)
result = set()
if root_key in resp.json():
for template in resp.json()[root_key]:
result.add(factory(template))
return result
except RuntimeError as e:
except RequestError as e:
# Chain the underlying exception as a cause
raise RuntimeError("Failed to fetch template metadata from cluster endpoint") from e
raise IndexManagementError("Failed to fetch template metadata from cluster endpoint") from e


def fetch_all_component_templates(endpoint: EndpointInfo) -> set[ComponentTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
# raises RequestError in case of any request errors
return __fetch_templates(endpoint, __COMPONENT_TEMPLATES_PATH, __COMPONENT_TEMPLATE_LIST_KEY,
lambda t: ComponentTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError("Failed to fetch component template metadata") from e
except IndexManagementError as e:
raise IndexManagementError("Failed to fetch component template metadata") from e


def fetch_all_index_templates(endpoint: EndpointInfo) -> set[IndexTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
# raises RequestError in case of any request errors
return __fetch_templates(endpoint, __INDEX_TEMPLATES_PATH, __INDEX_TEMPLATE_LIST_KEY,
lambda t: IndexTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError("Failed to fetch index template metadata") from e
except IndexManagementError as e:
raise IndexManagementError("Failed to fetch index template metadata") from e


def __create_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo, template_path: str) -> dict:
Expand Down
27 changes: 14 additions & 13 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
import yaml

import endpoint_utils
import index_operations
import index_management
import utils
from endpoint_info import EndpointInfo
from exceptions import MetadataMigrationError
from index_diff import IndexDiff
from metadata_migration_params import MetadataMigrationParams
from metadata_migration_result import MetadataMigrationResult
Expand Down Expand Up @@ -55,24 +56,24 @@ def index_metadata_migration(source: EndpointInfo, target: EndpointInfo,
args: MetadataMigrationParams) -> MetadataMigrationResult:
result = MetadataMigrationResult()
# Fetch indices
source_indices = index_operations.fetch_all_indices(source)
source_indices = index_management.fetch_all_indices(source)
# If source indices is empty, return immediately
if len(source_indices.keys()) == 0:
return result
target_indices = index_operations.fetch_all_indices(target)
target_indices = index_management.fetch_all_indices(target)
# Compute index differences and create result object
diff = IndexDiff(source_indices, target_indices)
if diff.identical_indices:
# Identical indices with zero documents on the target are eligible for migration
target_doc_count = index_operations.doc_count(diff.identical_indices, target)
target_doc_count = index_management.doc_count(diff.identical_indices, target)
# doc_count only returns indices that have non-zero counts, so the difference in responses
# gives us the set of identical, empty indices
result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys())
diff.set_identical_empty_indices(result.migration_indices)
if diff.indices_to_create:
result.migration_indices.update(diff.indices_to_create)
if result.migration_indices:
doc_count_result = index_operations.doc_count(result.migration_indices, source)
doc_count_result = index_management.doc_count(result.migration_indices, source)
result.target_doc_count = doc_count_result.total
# Print report
if args.report:
Expand All @@ -82,13 +83,13 @@ def index_metadata_migration(source: EndpointInfo, target: EndpointInfo,
index_data = dict()
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
failed_indices = index_operations.create_indices(index_data, target)
failed_indices = index_management.create_indices(index_data, target)
fail_count = len(failed_indices)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {len(index_data)} indices")
for failed_index_name, error in failed_indices.items():
logging.error(f"Index name {failed_index_name} failed: {error!s}")
raise RuntimeError("Metadata migration failed, index creation unsuccessful")
raise MetadataMigrationError("Metadata migration failed, index creation unsuccessful")
return result


Expand All @@ -109,16 +110,16 @@ def __log_template_failures(failures: dict, target_count: int) -> bool:
# Raises RuntimeError if component/index template migration fails
def template_migration(source: EndpointInfo, target: EndpointInfo):
# Fetch and migrate component templates first
templates = index_operations.fetch_all_component_templates(source)
failures = index_operations.create_component_templates(templates, target)
templates = index_management.fetch_all_component_templates(source)
failures = index_management.create_component_templates(templates, target)
if not __log_template_failures(failures, len(templates)):
# Only migrate index templates if component template migration had no failures
templates = index_operations.fetch_all_index_templates(source)
failures = index_operations.create_index_templates(templates, target)
templates = index_management.fetch_all_index_templates(source)
failures = index_management.create_index_templates(templates, target)
if __log_template_failures(failures, len(templates)):
raise RuntimeError("Failed to create some index templates")
raise MetadataMigrationError("Failed to create some index templates")
else:
raise RuntimeError("Failed to create some component templates, aborting index template creation")
raise MetadataMigrationError("Failed to create some component templates, aborting index template creation")


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
Expand Down
Loading

0 comments on commit d3820af

Please sign in to comment.