Skip to content

Commit

Permalink
Updates to Orchestrator API for OSI (#805)
Browse files Browse the repository at this point in the history
This changes introduces the following additions to the Console Orchestrator API

-Update OSI Create API model to use URI field instead of host and port and remove unnecessary migration type field
-Add serializer to verify received data for all API actions
-Add delete action as another OSI API action
-Add assume role ability to all OSI actions, this allows passing in a separate role for creating/updating/deleting pipelines that is different than the default task role of the container

---------

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn authored Jul 19, 2024
1 parent 8f78856 commit 600022b
Show file tree
Hide file tree
Showing 12 changed files with 1,136 additions and 75 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,25 @@ jobs:
files: ./coverage.xml
flags: python-test

console-api-tests:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./TrafficCapture/dockerSolution/src/main/docker/migrationConsole/console_api
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ env.python-version }}
- run: |
python -m pip install --upgrade pipenv
pipenv install --deploy --dev
pipenv run coverage run --source='.' manage.py test console_api
pipenv run coverage xml
- uses: codecov/codecov-action@v4
with:
files: ./coverage.xml

gradle-tests:
runs-on: ubuntu-latest
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ sqlparse = "==0.5.0"
pyopenssl = "*"

[dev-packages]
coverage = "*"
moto = {version = "*", extras = ["sts"]}

[requires]
python_version = "3.10"

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@


class DataProviderSerializer(serializers.Serializer):
Host = serializers.CharField(max_length=2048)
Port = serializers.CharField(max_length=5)
Uri = serializers.URLField(max_length=2000)
AuthType = serializers.ChoiceField(choices=[e.name.upper() for e in AuthMethod])
SecretArn = serializers.CharField(max_length=2048, required=False)


class OpenSearchIngestionCreateRequestSerializer(serializers.Serializer):
PipelineRoleArn = serializers.CharField(max_length=2048)
PipelineManagerAssumeRoleArn = serializers.CharField(max_length=2048, required=False)
PipelineName = serializers.CharField(max_length=28)
AwsRegion = serializers.CharField(max_length=28)
IndexRegexSelections = serializers.ListField(
Expand All @@ -30,3 +30,8 @@ class OpenSearchIngestionCreateRequestSerializer(serializers.Serializer):
VpcSecurityGroupIds = serializers.ListField(
child=serializers.CharField(min_length=11, max_length=20)
)


class OpenSearchIngestionUpdateRequestSerializer(serializers.Serializer):
PipelineName = serializers.CharField(max_length=28)
PipelineManagerAssumeRoleArn = serializers.CharField(max_length=2048, required=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import botocore
from django.test import Client, SimpleTestCase
from moto import mock_aws
from rest_framework import status
from unittest.mock import patch

PIPELINE_NAME = 'test-pipeline'
VALID_CREATE_PAYLOAD = {
'PipelineRoleArn': 'arn:aws:iam::123456789012:role/pipeline-access-role',
'PipelineName': PIPELINE_NAME,
'AwsRegion': 'us-east-1',
'Tags': ['test-tag=123'],
'IndexRegexSelections': ['index-test-*', 'stored-*'],
'LogGroupName': '/aws/vendedlogs/osi-logs',
'SourceDataProvider': {
'Uri': 'http://vpc-test-source-domain.com:9200',
'AuthType': 'BASIC_AUTH',
'SecretArn': 'arn:aws:secretsmanager:123456789012:secret:source-secret'
},
'TargetDataProvider': {
'Uri': 'https://vpc-test-target-domain.com:443',
'AuthType': 'SIGV4'
},
'VpcSubnetIds': ['subnet-123456789', 'subnet-789012345'],
'VpcSecurityGroupIds': ['sg-123456789', 'sg-789012345']
}
VALID_UPDATE_PAYLOAD = {
'PipelineName': PIPELINE_NAME
}
VALID_ASSUME_ROLE_UPDATE_PAYLOAD = {
'PipelineName': PIPELINE_NAME,
'PipelineManagerAssumeRoleArn': 'arn:aws:iam::123456789012:role/testRole',
}

# Original botocore _make_api_call function
orig = botocore.client.BaseClient._make_api_call


# Moto has not yet implemented mocking for OSI API actions, so must mock these API calls manually here
def mock_make_api_call(self, operation_name, kwarg):
# For example for the Access Analyzer service
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
if (operation_name == 'CreatePipeline' or operation_name == 'DeletePipeline' or
operation_name == 'StartPipeline' or operation_name == 'StopPipeline'):
return {'Pipeline': {'PipelineName': PIPELINE_NAME}}
# If we don't want to patch the API call
return orig(self, operation_name, kwarg)


class OrchestratorViewsTest(SimpleTestCase):

def setUp(self):
self.client = Client()
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_create_migration(self):
response = self.client.post('/orchestrator/osi-create-migration', data=VALID_CREATE_PAYLOAD,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_create_migration_assume_role(self):
payload = dict(VALID_CREATE_PAYLOAD)
payload['PipelineManagerAssumeRoleArn'] = 'arn:aws:iam::123456789012:role/testRole'
response = self.client.post('/orchestrator/osi-create-migration', data=payload,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_create_migration_fails_for_missing_field(self):
payload = dict(VALID_CREATE_PAYLOAD)
payload['AwsRegion'] = None
response = self.client.post('/orchestrator/osi-create-migration', data=payload,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_create_migration_fails_for_invalid_auth(self):
payload = dict(VALID_CREATE_PAYLOAD)
payload['SourceDataProvider'].pop('SecretArn')
response = self.client.post('/orchestrator/osi-create-migration', data=payload,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_start_migration(self):
response = self.client.post('/orchestrator/osi-start-migration', data=VALID_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_start_migration_assume_role(self):
response = self.client.post('/orchestrator/osi-start-migration', data=VALID_ASSUME_ROLE_UPDATE_PAYLOAD,
content_type='application/json')

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_stop_migration(self):
response = self.client.post('/orchestrator/osi-stop-migration', data=VALID_UPDATE_PAYLOAD,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_stop_migration_assume_role(self):
response = self.client.post('/orchestrator/osi-stop-migration', data=VALID_ASSUME_ROLE_UPDATE_PAYLOAD,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_delete_migration(self):
response = self.client.post('/orchestrator/osi-delete-migration', data=VALID_UPDATE_PAYLOAD,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)

@mock_aws
@patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call)
def test_osi_delete_migration_assume_role(self):
response = self.client.post('/orchestrator/osi-delete-migration', data=VALID_ASSUME_ROLE_UPDATE_PAYLOAD,
content_type='application/json')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('Timestamp', response.data)
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
path('osi-create-migration', views.osi_create_migration, name='osi-create-migration'),
path('osi-start-migration', views.osi_start_migration, name='osi-start-migration'),
path('osi-stop-migration', views.osi_stop_migration, name='osi-stop-migration'),
path('osi-delete-migration', views.osi_delete_migration, name='osi-delete-migration'),
]
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
from console_api.apps.orchestrator.serializers import OpenSearchIngestionCreateRequestSerializer
from console_api.apps.orchestrator.serializers import (OpenSearchIngestionCreateRequestSerializer,
OpenSearchIngestionUpdateRequestSerializer)
from console_link.models.osi_utils import (InvalidAuthParameters, create_pipeline_from_json, start_pipeline,
stop_pipeline)
stop_pipeline, delete_pipeline, get_assume_role_session)
from rest_framework.decorators import api_view, parser_classes
from rest_framework.parsers import JSONParser
from rest_framework.response import Response
from rest_framework import status
from pathlib import Path
from typing import Callable
import boto3
import datetime
from enum import Enum
import logging

logger = logging.getLogger(__name__)

PIPELINE_TEMPLATE_PATH = f"{Path(__file__).parents[4]}/osiPipelineTemplate.yaml"

MigrationType = Enum('MigrationType', ['OSI_HISTORICAL_MIGRATION'])


def pretty_request(request, data):
headers = ''
Expand All @@ -37,70 +36,77 @@ def pretty_request(request, data):
)


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_create_migration(request):
def determine_osi_client(request_data, action_name: str):
pipeline_manager_role_arn = request_data.get('PipelineManagerAssumeRoleArn')
if pipeline_manager_role_arn:
logger.debug(f'Assuming provided role: {pipeline_manager_role_arn}')
session = get_assume_role_session(role_arn=pipeline_manager_role_arn,
session_name=f'Console{action_name}PipelineAssumeRole')
osi_client = session.client('osis')
else:
osi_client = boto3.client('osis')
return osi_client


def osi_update_workflow(request, osi_action_func: Callable, action_name: str):
request_data = request.data
logger.info(pretty_request(request, request_data))

osi_serializer = OpenSearchIngestionCreateRequestSerializer(data=request_data)
osi_serializer = OpenSearchIngestionUpdateRequestSerializer(data=request_data)
osi_serializer.is_valid(raise_exception=True)
pipeline_name = request_data.get('PipelineName')
try:
osi_client = boto3.client('osis')
create_pipeline_from_json(osi_client=osi_client,
input_json=request_data,
pipeline_template_path=PIPELINE_TEMPLATE_PATH)
except InvalidAuthParameters as i:
logger.error(f"Error performing osi_create_migration API: {i}")
return Response(str(i), status=status.HTTP_400_BAD_REQUEST)
osi_action_func(osi_client=determine_osi_client(request_data=request_data, action_name=action_name),
pipeline_name=pipeline_name)
except Exception as e:
logger.error(f"Error performing osi_create_migration API: {e}")
logger.error(f'Error performing OSI {action_name} Pipeline API: {e}')
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)

response_data = {
'Timestamp': datetime.datetime.now(datetime.timezone.utc),
'MigrationType': MigrationType.OSI_HISTORICAL_MIGRATION
'Timestamp': datetime.datetime.now(datetime.timezone.utc)
}
return Response(response_data, status=status.HTTP_200_OK)


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_start_migration(request):
def osi_create_migration(request):
request_data = request.data
logger.info(pretty_request(request, request_data))
action_name = 'Create'

pipeline_name = request_data.get("PipelineName")
osi_serializer = OpenSearchIngestionCreateRequestSerializer(data=request_data)
osi_serializer.is_valid(raise_exception=True)
try:
osi_client = boto3.client('osis')
start_pipeline(osi_client=osi_client, pipeline_name=pipeline_name)
create_pipeline_from_json(osi_client=determine_osi_client(request_data=request_data, action_name=action_name),
input_json=request_data,
pipeline_template_path=PIPELINE_TEMPLATE_PATH)
except InvalidAuthParameters as i:
logger.error(f'Error performing OSI {action_name} Pipeline API: {i}')
return Response(str(i), status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Error performing osi_start_migration API: {e}")
logger.error(f'Error performing OSI {action_name} Pipeline API: {e}')
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)

response_data = {
'Timestamp': datetime.datetime.now(datetime.timezone.utc),
'MigrationType': MigrationType.OSI_HISTORICAL_MIGRATION
'Timestamp': datetime.datetime.now(datetime.timezone.utc)
}
return Response(response_data, status=status.HTTP_200_OK)


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_start_migration(request):
return osi_update_workflow(request=request, osi_action_func=start_pipeline, action_name='Start')


@api_view(['POST'])
@parser_classes([JSONParser])
def osi_stop_migration(request):
request_data = request.data
logger.info(pretty_request(request, request_data))
return osi_update_workflow(request=request, osi_action_func=stop_pipeline, action_name='Stop')

pipeline_name = request_data.get("PipelineName")
try:
osi_client = boto3.client('osis')
stop_pipeline(osi_client=osi_client, pipeline_name=pipeline_name)
except Exception as e:
logger.error(f"Error performing osi_stop_migration API: {e}")
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)

response_data = {
'Timestamp': datetime.datetime.now(datetime.timezone.utc),
'MigrationType': MigrationType.OSI_HISTORICAL_MIGRATION
}
return Response(response_data, status=status.HTTP_200_OK)
@api_view(['POST'])
@parser_classes([JSONParser])
def osi_delete_migration(request):
return osi_update_workflow(request=request, osi_action_func=delete_pipeline, action_name='Delete')
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pytest = "*"
pytest-mock = "*"
requests-mock = "*"
coverage = "*"
moto = "*"

[requires]
python_version = "3.10"
Loading

0 comments on commit 600022b

Please sign in to comment.