Skip to content

Commit

Permalink
Merge branch 'main' into MeteringAndFanout
Browse files Browse the repository at this point in the history
* main:
  [Fetch Migration] Monitoring script for Data Prepper (opensearch-project#264)
  Refactor secrets cache to sdk client (opensearch-project#278)
  Update humanReadableLogs script
  Update primary->source and shadow->target and documentation
  Add shadowRequest and connectionId to tuple
  Update tests for CRLF
  Use CRLF in NettyJsonToByteBufHandler

Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java
#	TrafficCapture/trafficReplayer/build.gradle
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java
  • Loading branch information
gregschohn committed Sep 5, 2023
2 parents 7d40d64 + 81f8a99 commit eb00d38
Show file tree
Hide file tree
Showing 14 changed files with 384 additions and 76 deletions.
103 changes: 103 additions & 0 deletions FetchMigration/index_configuration_tool/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import argparse
import time
from typing import Optional, List

import requests
from prometheus_client import Metric
from prometheus_client.parser import text_string_to_metric_families

from endpoint_info import EndpointInfo

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
except requests.exceptions.RequestException:
return None
# Based on response headers defined in Data Prepper's PrometheusMetricsHandler.java class
metrics = response.content.decode('utf-8')
# Collect generator return values into list
return list(text_string_to_metric_families(metrics))


def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]:
for metric_family in metric_families:
if metric_family.name.endswith(metric_suffix):
return int(metric_family.samples[0].value)
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_part_count: Optional[int],
prev_no_part_count: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_part_count is not None and no_part_count > prev_no_part_count > 0:
return True
return False


def run(args: argparse.Namespace, wait_seconds: int) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
prev_no_partitions_count = 0
terminal = False
while not terminal:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
shutdown_pipeline(endpoint)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
prog="python monitor.py",
description="""Monitoring process for a running Data Prepper pipeline.
The first input is the Data Prepper URL endpoint.
The second input is the target doc_count for termination.""",
formatter_class=argparse.RawTextHelpFormatter
)
# Required positional arguments
arg_parser.add_argument(
"dp_endpoint",
help="URL endpoint for the running Data Prepper process"
)
arg_parser.add_argument(
"target_count",
type=int,
help="Target doc_count to reach, after which the Data Prepper pipeline will be terminated"
)
cli_args = arg_parser.parse_args()
print("\n##### Starting monitor tool... #####\n")
run(cli_args, 30)
print("\n##### Ending monitor tool... #####\n")
1 change: 1 addition & 0 deletions FetchMigration/index_configuration_tool/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
jsondiff>=2.0.0
prometheus-client>=0.17.1
pyyaml>=6.0
requests>=2.28.2
responses>=0.23.1
143 changes: 143 additions & 0 deletions FetchMigration/index_configuration_tool/tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import argparse
import unittest
from unittest.mock import patch, MagicMock, PropertyMock

import requests
import responses
from prometheus_client.parser import text_string_to_metric_families

import monitor
from endpoint_info import EndpointInfo

# Constants
TEST_ENDPOINT = "test"
TEST_AUTH = ("user", "pass")
TEST_FLAG = False
TEST_METRIC_NAME = "test_metric"
TEST_METRIC_VALUE = 123.45
TEST_PROMETHEUS_METRIC_STRING = "# HELP " + TEST_METRIC_NAME + " Unit Test Metric\n"\
+ "# TYPE " + TEST_METRIC_NAME + " gauge\n" \
+ TEST_METRIC_NAME + "{serviceName=\"unittest\",} " + str(TEST_METRIC_VALUE)


class TestMonitor(unittest.TestCase):
@patch('requests.post')
def test_shutdown(self, mock_post: MagicMock):
expected_shutdown_url = TEST_ENDPOINT + "/shutdown"
test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG)
monitor.shutdown_pipeline(test_endpoint)
mock_post.assert_called_once_with(expected_shutdown_url, auth=TEST_AUTH, verify=TEST_FLAG)

@patch('requests.get')
def test_fetch_prometheus_metrics(self, mock_get: MagicMock):
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
# Set up GET response
mock_response = MagicMock()
# content is a property
mock_content = PropertyMock(return_value=bytes(TEST_PROMETHEUS_METRIC_STRING, "utf-8"))
type(mock_response).content = mock_content
mock_get.return_value = mock_response
# Test fetch
raw_metrics_list = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
mock_get.assert_called_once_with(expected_url, auth=None, verify=True)
self.assertEqual(1, len(raw_metrics_list))
test_metric = raw_metrics_list[0]
self.assertEqual(TEST_METRIC_NAME, test_metric.name)
self.assertTrue(len(test_metric.type) > 0)
self.assertTrue(len(test_metric.documentation) > 0)
self.assertEqual(1, len(test_metric.samples))
test_sample = test_metric.samples[0]
self.assertEqual(TEST_METRIC_NAME, test_sample.name)
self.assertEqual(TEST_METRIC_VALUE, test_sample.value)
self.assertTrue(len(test_sample.labels) > 0)

@responses.activate
def test_fetch_prometheus_metrics_failure(self):
# Set up expected GET call with a mock exception
expected_url = TEST_ENDPOINT + "/metrics/prometheus"
responses.get(expected_url, body=requests.Timeout())
# Test fetch
result = monitor.fetch_prometheus_metrics(EndpointInfo(TEST_ENDPOINT))
self.assertIsNone(result)

def test_get_metric_value(self):
# Return value is an int
expected_val = int(TEST_METRIC_VALUE)
test_input = list(text_string_to_metric_families(TEST_PROMETHEUS_METRIC_STRING))
# Should fetch by suffix
val = monitor.get_metric_value(test_input, "metric")
self.assertEqual(expected_val, val)
# No matching metric returns None
val = monitor.get_metric_value(test_input, "invalid")
self.assertEqual(None, val)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock,
mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
# Check will first fail, then pass
mock_check.side_effect = [False, True]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

@patch('monitor.shutdown_pipeline')
@patch('time.sleep')
@patch('monitor.check_if_complete')
@patch('monitor.get_metric_value')
@patch('monitor.fetch_prometheus_metrics')
# Note that mock objects are passed bottom-up from the patch order above
def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock,
mock_sleep: MagicMock, mock_shut: MagicMock):
test_input = argparse.Namespace()
# The values here don't matter since we've mocked the check method
test_input.dp_endpoint = "test"
test_input.target_count = 1
mock_get.return_value = None
mock_check.return_value = True
# Fetch call will first fail, then succeed
mock_fetch.side_effect = [None, MagicMock()]
# Run test method
wait_time = 3
monitor.run(test_input, wait_time)
# Test that fetch was called with the expected EndpointInfo
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

def test_check_if_complete(self):
# If any of the optional values are missing, we are not complete
self.assertFalse(monitor.check_if_complete(None, 0, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, None, 1, 0, 2))
self.assertFalse(monitor.check_if_complete(2, 0, None, 0, 2))
# Target count not reached
self.assertFalse(monitor.check_if_complete(1, None, None, 0, 2))
# Target count reached, but has records in flight
self.assertFalse(monitor.check_if_complete(2, 1, None, 0, 2))
# Target count reached, no records in flight, but no prev no_part_count
self.assertFalse(monitor.check_if_complete(2, 0, 1, 0, 2))
# Terminal state
self.assertTrue(monitor.check_if_complete(2, 0, 2, 1, 2))


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,32 @@
logger = logging.getLogger(__name__)

LOG_JSON_TUPLE_FIELD = "message"
BASE64_ENCODED_TUPLE_PATHS = ["request.body", "primaryResponse.body", "shadowResponse.body"]
BASE64_ENCODED_TUPLE_PATHS = ["sourceRequest.body", "targetRequest.body", "sourceResponse.body", "targetResponse.body"]
# TODO: I'm not positive about the capitalization of the Content-Encoding and Content-Type headers.
# This version worked on my test cases, but not guaranteed to work in all cases.
CONTENT_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Content-Encoding"
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Encoding"
}
CONTENT_TYPE_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Content-Type"
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Type"
}
TRANSFER_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Transfer-Encoding"
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Transfer-Encoding"
}

CONTENT_TYPE_JSON = "application/json"
CONTENT_ENCODING_GZIP = "gzip"
TRANSFER_ENCODING_CHUNKED = "chunked"
URI_PATH = "request.Request-URI"
URI_PATH = "sourceRequest.Request-URI"
BULK_URI_PATH = "_bulk"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ void get() throws Exception {
Assertions.assertEquals(NUM_POOLED_ITEMS+i, getNextItem(pool));
}

Assertions.assertTrue(15 <= pool.getStats().getNItemsCreated());
Assertions.assertTrue(pool.getStats().getNItemsCreated() >= 15);
Assertions.assertEquals(11, pool.getStats().getNHotGets()+pool.getStats().getNColdGets());
Assertions.assertTrue(4 <= pool.getStats().getNItemsExpired());
Assertions.assertTrue(pool.getStats().getNItemsExpired() >= 4);

Assertions.assertTrue(pool.getStats().averageBuildTime().toMillis() > 0);
Assertions.assertTrue(pool.getStats().averageWaitTime().toMillis() <
Expand Down
4 changes: 1 addition & 3 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ dependencies {

implementation project(':captureProtobufs')

// TODO - upgrade this to 2.x so that we don't pollute the jar-space with two versions of AWS SDK
implementation group: 'com.amazonaws.secretsmanager', name: 'aws-secretsmanager-caching-java', version: '1.0.2'

implementation group: 'com.beust', name: 'jcommander', version: '1.82'
implementation group: 'com.bazaarvoice.jolt', name: 'jolt-core', version: '0.1.7'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.15.0'
Expand All @@ -66,6 +63,7 @@ dependencies {
implementation group: 'software.amazon.msk', name: 'aws-msk-iam-auth', version: '1.1.7'
implementation group: 'software.amazon.awssdk', name: 'sdk-core', version: '2.20.102'
implementation group: 'software.amazon.awssdk', name: 'auth', version: '2.20.102'
implementation group: 'software.amazon.awssdk', name: 'secretsmanager', version: '2.20.127'

testImplementation project(':testUtilities')
testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,44 @@
package org.opensearch.migrations.replay;

import com.amazonaws.secretsmanager.caching.SecretCache;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.nio.charset.Charset;
import java.util.Base64;

@Slf4j
public class AWSAuthService implements AutoCloseable {

private final SecretCache secretCache;
private final SecretsManagerClient secretsManagerClient;

public AWSAuthService(SecretCache secretCache) {
this.secretCache = secretCache;
public AWSAuthService(SecretsManagerClient secretsManagerClient) {
this.secretsManagerClient = secretsManagerClient;
}

public AWSAuthService() {
this(new SecretCache());
this(SecretsManagerClient.builder().build());
}

// SecretId here can be either the unique name of the secret or the secret ARN
public String getSecret(String secretId) {
return secretCache.getSecretString(secretId);
return secretsManagerClient.getSecretValue(builder -> builder.secretId(secretId)).secretString();
}

/**
* This method returns a Basic Auth header string, with the username:password Base64 encoded
* This method synchronously returns a Basic Auth header string, with the username:password Base64 encoded
* @param username The plaintext username
* @param secretId The unique name of the secret or the secret ARN from AWS Secrets Manager. Its retrieved value
* will fill the password part of the Basic Auth header
* @return Basic Auth header string
*/
public String getBasicAuthHeaderFromSecret(String username, String secretId) {
String authHeaderString = username + ":" + getSecret(secretId);
String secretValue = getSecret(secretId);
String authHeaderString = username + ":" + secretValue;
return "Basic " + Base64.getEncoder().encodeToString(authHeaderString.getBytes(Charset.defaultCharset()));
}

@Override
public void close() {
secretCache.close();
secretsManagerClient.close();
}
}
}
Loading

0 comments on commit eb00d38

Please sign in to comment.