Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to Orchestrator API for OSI #805

Merged
merged 11 commits into from
Jul 19, 2024
19 changes: 19 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ jobs:
files: ./coverage.xml
flags: python-test

console-api-tests:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.python-version }}
- run: |
python -m pip install --upgrade pipenv
pipenv install --deploy --dev
pipenv run coverage run --source='.' manage.py test console_api
pipenv run coverage xml
- uses: codecov/codecov-action@v4
with:
files: ./coverage.xml

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for a follow up change in the sonarqube stuff to incorporate this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this, yes we need to add

gradle-tests:
runs-on: ubuntu-latest
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ sqlparse = "==0.5.0"
pyopenssl = "*"

[dev-packages]
coverage = "*"
moto = "*"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you can install moto with just the specific dependencies you want to mock (https://docs.getmoto.org/en/latest/docs/getting_started.html#installing-moto) -- worth doing? I assume we're mocking a pretty narrow list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went ahead and added the specifier for sts for moto, from my testing all decorators like mock_sts have been removed in favor of using the single mock_aws


[requires]
python_version = "3.10"

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@


class DataProviderSerializer(serializers.Serializer):
Host = serializers.CharField(max_length=2048)
Port = serializers.CharField(max_length=5)
Uri = serializers.URLField(max_length=2000)
AuthType = serializers.ChoiceField(choices=[e.name.upper() for e in AuthMethod])
SecretArn = serializers.CharField(max_length=2048, required=False)


class OpenSearchIngestionCreateRequestSerializer(serializers.Serializer):
PipelineRoleArn = serializers.CharField(max_length=2048)
PipelineManagerAssumeRoleArn = serializers.CharField(max_length=2048, required=False)
PipelineName = serializers.CharField(max_length=28)
AwsRegion = serializers.CharField(max_length=28)
IndexRegexSelections = serializers.ListField(
Expand All @@ -30,3 +30,8 @@ class OpenSearchIngestionCreateRequestSerializer(serializers.Serializer):
VpcSecurityGroupIds = serializers.ListField(
child=serializers.CharField(min_length=11, max_length=20)
)


class OpenSearchIngestionUpdateRequestSerializer(serializers.Serializer):
PipelineName = serializers.CharField(max_length=28)
PipelineManagerAssumeRoleArn = serializers.CharField(max_length=2048, required=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import os
from django.test import Client, SimpleTestCase
from moto import mock_aws
from rest_framework import status
from unittest.mock import patch


VALID_CREATE_PAYLOAD = {
'PipelineRoleArn': 'arn:aws:iam::123456789012:role/pipeline-access-role',
'PipelineName': 'test-pipeline',
'AwsRegion': 'us-east-1',
'Tags': ['test-tag=123'],
'IndexRegexSelections': ['index-test-*', 'stored-*'],
'LogGroupName': '/aws/vendedlogs/osi-logs',
'SourceDataProvider': {
'Uri': 'http://vpc-test-source-domain.com:9200',
'AuthType': 'BASIC_AUTH',
'SecretArn': 'arn:aws:secretsmanager:123456789012:secret:source-secret'
},
'TargetDataProvider': {
'Uri': 'https://vpc-test-target-domain.com:443',
'AuthType': 'SIGV4'
},
'VpcSubnetIds': ['subnet-123456789', 'subnet-789012345'],
'VpcSecurityGroupIds': ['sg-123456789', 'sg-789012345']
}
VALID_UPDATE_PAYLOAD = {
'PipelineName': 'test-pipeline'
}
VALID_ASSUME_ROLE_UPDATE_PAYLOAD = {
'PipelineName': 'test-pipeline',
'PipelineManagerAssumeRoleArn': 'arn:aws:iam::123456789012:role/testRole',
}


# Moto has not yet implemented mocking for OSI API actions, using @patch for these calls
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So moto is basically handling/mocking creating the boto3 client? It looks like we could add the OSI actions directly to moto (like https://docs.getmoto.org/en/latest/docs/services/patching_other_services.html or via PR to their repo), but I think this approach is fine with me for now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to using patch for moto like you mentioned to give a little more flexibility here. Thanks for suggestion

class OrchestratorViewsTest(SimpleTestCase):

def setUp(self):
self.client = Client()
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

@patch('console_api.apps.orchestrator.views.create_pipeline_from_json')
def test_osi_create_migration(self, mock_create_pipeline_from_json):
mock_create_pipeline_from_json.return_value = None

response = self.client.post('/orchestrator/osi-create-migration', data=VALID_CREATE_PAYLOAD,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('console_api.apps.orchestrator.views.create_pipeline_from_json')
def test_osi_create_migration_assume_role(self, mock_create_pipeline_from_json):
mock_create_pipeline_from_json.return_value = None

payload = dict(VALID_CREATE_PAYLOAD)
payload['PipelineManagerAssumeRoleArn'] = 'arn:aws:iam::123456789012:role/testRole'
response = self.client.post('/orchestrator/osi-create-migration', data=payload,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@patch('console_api.apps.orchestrator.views.create_pipeline_from_json')
def test_osi_create_migration_fails_for_missing_field(self, mock_create_pipeline_from_json):
mock_create_pipeline_from_json.return_value = None

payload = dict(VALID_CREATE_PAYLOAD)
payload['AwsRegion'] = None
response = self.client.post('/orchestrator/osi-create-migration', data=payload,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

@patch('console_api.apps.orchestrator.views.start_pipeline')
def test_osi_start_migration(self, mock_start_pipeline):
mock_start_pipeline.return_value = None

response = self.client.post('/orchestrator/osi-start-migration', data=VALID_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('console_api.apps.orchestrator.views.start_pipeline')
def test_osi_start_migration_assume_role(self, mock_start_pipeline):
mock_start_pipeline.return_value = None

response = self.client.post('/orchestrator/osi-start-migration', data=VALID_ASSUME_ROLE_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@patch('console_api.apps.orchestrator.views.stop_pipeline')
def test_osi_stop_migration(self, mock_stop_pipeline):
mock_stop_pipeline.return_value = None

response = self.client.post('/orchestrator/osi-stop-migration', data=VALID_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('console_api.apps.orchestrator.views.stop_pipeline')
def test_osi_stop_migration_assume_role(self, mock_stop_pipeline):
mock_stop_pipeline.return_value = None

response = self.client.post('/orchestrator/osi-stop-migration', data=VALID_ASSUME_ROLE_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@patch('console_api.apps.orchestrator.views.delete_pipeline')
def test_osi_delete_migration(self, mock_delete_pipeline):
mock_delete_pipeline.return_value = None

response = self.client.post('/orchestrator/osi-delete-migration', data=VALID_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('console_api.apps.orchestrator.views.delete_pipeline')
def test_osi_delete_migration_assume_role(self, mock_delete_pipeline):
mock_delete_pipeline.return_value = None

response = self.client.post('/orchestrator/osi-delete-migration', data=VALID_ASSUME_ROLE_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
path('osi-create-migration', views.osi_create_migration, name='osi-create-migration'),
path('osi-start-migration', views.osi_start_migration, name='osi-start-migration'),
path('osi-stop-migration', views.osi_stop_migration, name='osi-stop-migration'),
path('osi-delete-migration', views.osi_delete_migration, name='osi-delete-migration'),
]
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from console_api.apps.orchestrator.serializers import OpenSearchIngestionCreateRequestSerializer
from console_api.apps.orchestrator.serializers import (OpenSearchIngestionCreateRequestSerializer,
OpenSearchIngestionUpdateRequestSerializer)
from console_link.models.osi_utils import (InvalidAuthParameters, create_pipeline_from_json, start_pipeline,
stop_pipeline)
stop_pipeline, delete_pipeline, get_assume_role_session)
from rest_framework.decorators import api_view, parser_classes
from rest_framework.parsers import JSONParser
from rest_framework.response import Response
from rest_framework import status
from pathlib import Path
from typing import Callable
import boto3
import datetime
from enum import Enum
import logging

logger = logging.getLogger(__name__)

PIPELINE_TEMPLATE_PATH = f"{Path(__file__).parents[4]}/osiPipelineTemplate.yaml"

MigrationType = Enum('MigrationType', ['OSI_HISTORICAL_MIGRATION'])


def pretty_request(request, data):
headers = ''
Expand All @@ -37,70 +36,77 @@
)


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_create_migration(request):
def determine_osi_client(request_data, action_name: str):
pipeline_manager_role_arn = request_data.get('PipelineManagerAssumeRoleArn')
if pipeline_manager_role_arn:
logger.debug(f'Assuming provided role: {pipeline_manager_role_arn}')
session = get_assume_role_session(role_arn=pipeline_manager_role_arn,
session_name=f'Console{action_name}PipelineAssumeRole')
osi_client = session.client('osis')
else:
osi_client = boto3.client('osis')
return osi_client


def osi_update_workflow(request, osi_action_func: Callable, action_name: str):
request_data = request.data
logger.info(pretty_request(request, request_data))

osi_serializer = OpenSearchIngestionCreateRequestSerializer(data=request_data)
osi_serializer = OpenSearchIngestionUpdateRequestSerializer(data=request_data)
osi_serializer.is_valid(raise_exception=True)
pipeline_name = request_data.get('PipelineName')
try:
osi_client = boto3.client('osis')
create_pipeline_from_json(osi_client=osi_client,
input_json=request_data,
pipeline_template_path=PIPELINE_TEMPLATE_PATH)
except InvalidAuthParameters as i:
logger.error(f"Error performing osi_create_migration API: {i}")
return Response(str(i), status=status.HTTP_400_BAD_REQUEST)
osi_action_func(osi_client=determine_osi_client(request_data=request_data, action_name=action_name),
pipeline_name=pipeline_name)
except Exception as e:
logger.error(f"Error performing osi_create_migration API: {e}")
logger.error(f'Error performing OSI {action_name} Pipeline API: {e}')

Check warning on line 62 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py#L62

Added line #L62 was not covered by tests
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)

response_data = {
'Timestamp': datetime.datetime.now(datetime.timezone.utc),
'MigrationType': MigrationType.OSI_HISTORICAL_MIGRATION
'Timestamp': datetime.datetime.now(datetime.timezone.utc)
}
return Response(response_data, status=status.HTTP_200_OK)


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_start_migration(request):
def osi_create_migration(request):
request_data = request.data
logger.info(pretty_request(request, request_data))
action_name = 'Create'

pipeline_name = request_data.get("PipelineName")
osi_serializer = OpenSearchIngestionCreateRequestSerializer(data=request_data)
osi_serializer.is_valid(raise_exception=True)
try:
osi_client = boto3.client('osis')
start_pipeline(osi_client=osi_client, pipeline_name=pipeline_name)
create_pipeline_from_json(osi_client=determine_osi_client(request_data=request_data, action_name=action_name),
input_json=request_data,
pipeline_template_path=PIPELINE_TEMPLATE_PATH)
except InvalidAuthParameters as i:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note that at some point, it would be great to have a test that exercised this branch, but not a blocker for this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a test for this now 👍

logger.error(f'Error performing OSI {action_name} Pipeline API: {i}')
return Response(str(i), status=status.HTTP_400_BAD_REQUEST)

Check warning on line 86 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py#L84-L86

Added lines #L84 - L86 were not covered by tests
except Exception as e:
logger.error(f"Error performing osi_start_migration API: {e}")
logger.error(f'Error performing OSI {action_name} Pipeline API: {e}')

Check warning on line 88 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api/console_api/apps/orchestrator/views.py#L88

Added line #L88 was not covered by tests
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)

response_data = {
'Timestamp': datetime.datetime.now(datetime.timezone.utc),
'MigrationType': MigrationType.OSI_HISTORICAL_MIGRATION
'Timestamp': datetime.datetime.now(datetime.timezone.utc)
}
return Response(response_data, status=status.HTTP_200_OK)


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_start_migration(request):
return osi_update_workflow(request=request, osi_action_func=start_pipeline, action_name='Start')


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_stop_migration(request):
request_data = request.data
logger.info(pretty_request(request, request_data))
return osi_update_workflow(request=request, osi_action_func=stop_pipeline, action_name='Stop')
Comment on lines +99 to +106
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there/should there be validation that the pipeline exists for these actions? What happens if it doesn't?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine users will get a 500 with the error message, that the pipeline doesn't exist. But agree this is likely a common error case we should test for and be more specific for like a 404 response code. Have created a task around this here: https://opensearch.atlassian.net/browse/MIGRATIONS-1877


pipeline_name = request_data.get("PipelineName")
try:
osi_client = boto3.client('osis')
stop_pipeline(osi_client=osi_client, pipeline_name=pipeline_name)
except Exception as e:
logger.error(f"Error performing osi_stop_migration API: {e}")
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)

response_data = {
'Timestamp': datetime.datetime.now(datetime.timezone.utc),
'MigrationType': MigrationType.OSI_HISTORICAL_MIGRATION
}
return Response(response_data, status=status.HTTP_200_OK)
@api_view(['POST'])
@parser_classes([JSONParser])
def osi_delete_migration(request):
return osi_update_workflow(request=request, osi_action_func=delete_pipeline, action_name='Delete')
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pytest = "*"
pytest-mock = "*"
requests-mock = "*"
coverage = "*"
moto = "*"

[requires]
python_version = "3.10"
Loading
Loading