Skip to content

Commit

Permalink
Merge branch 'main' into whitesource-remediate/aws-cdk-lib-2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
okhasawn authored Sep 8, 2023
2 parents d5cd15e + 0afb7c7 commit 46f8c31
Show file tree
Hide file tree
Showing 246 changed files with 3,674 additions and 1,442 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv

# F401 - Unused imports -- this is the only way to have a file-wide rule exception
per-file-ignores =
upgrades/upgrade_testing_framework/steps/__init__.py:F401
experimental/upgrades/upgrade_testing_framework/steps/__init__.py:F401
8 changes: 4 additions & 4 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name: python-tests
on:
push:
paths:
- 'cluster_migration_core/**.py'
- 'experimental/cluster_migration_core/**.py'
pull_request:
paths:
- 'cluster_migration_core/**.py'
- 'experimental/cluster_migration_core/**.py'

jobs:
test-linux:
Expand All @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.os }}
defaults:
run:
working-directory: ./cluster_migration_core
working-directory: ./experimental/cluster_migration_core
steps:
- name: Checkout Repository
uses: actions/checkout@v3
Expand All @@ -34,4 +34,4 @@ jobs:
- name: Upload Coverage Report
uses: codecov/codecov-action@v3
with:
files: cluster_migration_core/coverage.xml
files: cluster_migration_core/coverage.xml
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ coverage.xml
.venv
__pycache__
*.egg-info*
.python-version
.python-version
logs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM opensearch-data-prepper:2.4.0-SNAPSHOT
COPY requirements.txt .
COPY python/requirements.txt .

# Install dependencies to local user directory
RUN apk update
Expand All @@ -9,7 +9,7 @@ RUN pip install --user -r requirements.txt
ENV ICT_CODE_PATH /code
WORKDIR $ICT_CODE_PATH
# Copy only source code
COPY ./*.py .
COPY python/*.py .

# update PATH
ENV PATH=/root/.local:$PATH
Expand Down
46 changes: 30 additions & 16 deletions index_configuration_tool/README.md → FetchMigration/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
# Index Configuration Tool

Python package that automates the creation of indices on a target cluster based on the contents of a source cluster. Index settings and index mappings are correctly copied over.
Python package that automates the creation of indices on a target cluster based on the contents of a source cluster.
Index settings and index mappings are correctly copied over, but no data is transferred.
This tool seeks to eliminate the need to [specify index templates](https://github.com/awslabs/logstash-output-amazon_es#optional-parameters) when migrating data from one cluster to another.
The tool currently supports ElasticSearch or OpenSearch as source and target.

This tool seeks to automate [the steps outlined here](https://github.com/kartg/opensearch-migrations/tree/datastash/datastash#2-configure-index-templates-on-the-target-cluster) and eliminate the need for defining index templates on the target cluster when migrating data from one cluster to another. The tool currently supports ElasticSearch or OpenSearch as source and target.
## Parameters

If an identical index or an index with the same name but differing settings/mappings is already present on the target cluster, that index is skipped and left as-is on the target cluster. The tool completes by printing a report of all processed indices under 3 buckets:
The first required input to the tool is a path to a [Data Prepper](https://github.com/opensearch-project/data-prepper) pipeline YAML file, which is parsed to obtain the source and target cluster endpoints.
The second required input is an output path to which a modified version of the pipeline YAML file is written.
This version of the pipeline adds an index inclusion configuration to the sink, specifying only those indices that were created by the index configuration tool.
The tool also supports several optional flags:

| Flag | Purpose |
| ------------- | ------------- |
| `-h, --help` | Prints help text and exits |
| `--report, -r` | Prints a report of indices indicating which ones will be created, along with indices that are identical or have conflicting settings/mappings. |
| `--dryrun` | Skips the actual creation of indices on the target cluster |

### Reporting

If `--report` is specified, the tool prints all processed indices organized into 3 buckets:
* Successfully created on the target cluster
* Skipped due to a conflict in settings/mappings
* Skipped since the index configuration is identical on source and target

The input to the tool is a Logstash configuration file that is then parsed to obtain source and target endpoints.

## Current Limitations

* Only supports ElasticSearch and OpenSearch endpoints for source and target
Expand Down Expand Up @@ -41,17 +55,22 @@ python -m pip install -r index_configuration_tool/requirements.txt
After [setup](#setup), the tool can be executed using:

```shell
python index_configuration_tool/main.py <path-to-Logstash-file>
python index_configuration_tool/pre_migration.py <pipeline_yaml_path> <output_file>
```

Usage information can also be printed by supplying the `-h` flag.

### Docker

Replace `<path-to-Logstash-config-file>` in the command below with the path to your Logstash config file.
First build the Docker image from the `Dockerfile`:

```shell
docker build -t fetch-migration .
```

Then run the `fetch-migration` image.
Replace `<pipeline_yaml_path>` in the command below with the path to your Logstash config file:

```shell
confPath=<path-to-Logstash-config-file>; docker run -v $confPath:/tmp/conf.json -t kartg/index-configuration-tool /tmp/conf.json
docker run -p 4900:4900 -v <pipeline_yaml_path>:/code/input.yaml ict
```

## Development
Expand Down Expand Up @@ -86,9 +105,4 @@ Note that the `--omit` parameter must be specified to avoid tracking code covera
```shell
python -m coverage report --omit "*/tests/*"
python -m coverage html --omit "*/tests/*"
```

### Lark

The code uses [Lark](https://github.com/lark-parser/lark) for grammar definition, tree parsing and transformation.
The Logstash parser grammar is adapted from [node-logstash](https://github.com/bpaquet/node-logstash/blob/master/lib/logstash_config.jison).
```
File renamed without changes.
File renamed without changes.
8 changes: 8 additions & 0 deletions FetchMigration/python/endpoint_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dataclasses import dataclass


@dataclass
class EndpointInfo:
url: str
auth: tuple = None
verify_ssl: bool = True
45 changes: 45 additions & 0 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import requests

from endpoint_info import EndpointInfo

# Constants
SETTINGS_KEY = "settings"
MAPPINGS_KEY = "mappings"
COUNT_KEY = "count"
__INDEX_KEY = "index"
__ALL_INDICES_ENDPOINT = "*"
__COUNT_ENDPOINT = "/_count"
__INTERNAL_SETTINGS_KEYS = ["creation_date", "uuid", "provided_name", "version", "store"]


def fetch_all_indices(endpoint: EndpointInfo) -> dict:
actual_endpoint = endpoint.url + __ALL_INDICES_ENDPOINT
resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
# Remove internal settings
result = dict(resp.json())
for index in result:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
return result


def create_indices(indices_data: dict, endpoint: EndpointInfo):
for index in indices_data:
actual_endpoint = endpoint.url + index
data_dict = dict()
data_dict[SETTINGS_KEY] = indices_data[index][SETTINGS_KEY]
data_dict[MAPPINGS_KEY] = indices_data[index][MAPPINGS_KEY]
try:
resp = requests.put(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl, json=data_dict)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Failed to create index [{index}] - {e!s}")


def doc_count(indices: set, endpoint: EndpointInfo) -> int:
actual_endpoint = endpoint.url + ','.join(indices) + __COUNT_ENDPOINT
resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
result = dict(resp.json())
return int(result[COUNT_KEY])
104 changes: 104 additions & 0 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import argparse
import time
from typing import Optional, List

import requests
from prometheus_client import Metric
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
except requests.exceptions.RequestException:
return None
# Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class
metrics = response.content.decode('utf-8')
# Collect generator return values into list
return list(text_string_to_metric_families(metrics))


def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]:
for metric_family in metric_families:
if metric_family.name.endswith(metric_suffix):
return int(metric_family.samples[0].value)
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_part_count: Optional[int],
prev_no_part_count: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_part_count is not None and no_part_count > prev_no_part_count > 0:
return True
return False


def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
prev_no_partitions_count = 0
terminal = False
while not terminal:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
shutdown_pipeline(endpoint)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python monitor.py",
description="""Monitoring process for a running Data Prepper pipeline.
The first input is the Data Prepper URL endpoint.
The second input is the target doc_count for termination.""",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional arguments
arg_parser.add_argument(
"dp_endpoint",
help="URL endpoint for the running Data Prepper process"
)
arg_parser.add_argument(
"target_count",
type=int,
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
namespace = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(MigrationMonitorParams(namespace.target_count, namespace.dp_endpoint))
print("\n##### Ending monitor tool... #####\n")
7 changes: 7 additions & 0 deletions FetchMigration/python/migration_monitor_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class MigrationMonitorParams:
target_count: int
dp_endpoint: str = "https://localhost:4900"
Loading

0 comments on commit 46f8c31

Please sign in to comment.