Skip to content

Commit

Permalink
Merge branch 'main' into GithubGradleImprovements
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreKurait authored Feb 14, 2024
2 parents 183fc80 + e016226 commit 8cdca98
Show file tree
Hide file tree
Showing 14 changed files with 551 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.migrations.testutils;


import java.io.IOException;
import java.net.ServerSocket;
import lombok.extern.slf4j.Slf4j;

import java.util.Random;
Expand All @@ -17,7 +19,6 @@ public class PortFinder {
private PortFinder() {}

private static final int MAX_PORT_TRIES = 100;
private static final Random random = new Random();

public static class ExceededMaxPortAssigmentAttemptException extends Exception {
public ExceededMaxPortAssigmentAttemptException(Throwable cause) {
Expand All @@ -30,8 +31,8 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
int numTries = 0;
while (true) {
try {
int port = random.nextInt((2 << 15) - 1025) + 1025;
r.accept(Integer.valueOf(port));
int port = findOpenPort();
r.accept(port);
return port;
} catch (Exception e) {
if (++numTries >= MAX_PORT_TRIES) {
Expand All @@ -44,4 +45,14 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r)
}
}

public static int findOpenPort() {
try (ServerSocket serverSocket = new ServerSocket(0)) {
int port = serverSocket.getLocalPort();
log.info("Open port found: " + port);
return port;
} catch (IOException e) {
log.error("Failed to find an open port: " + e.getMessage());
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.AllArgsConstructor;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
Expand All @@ -19,6 +21,8 @@
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;

import java.nio.charset.Charset;

import java.io.IOException;
Expand All @@ -38,6 +42,9 @@
*/
public class SimpleHttpClientForTesting implements AutoCloseable {

private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5);
private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5);

private final CloseableHttpClient httpClient;

private static BasicHttpClientConnectionManager getInsecureTlsConnectionManager()
Expand Down Expand Up @@ -65,7 +72,15 @@ public SimpleHttpClientForTesting(boolean useTlsAndInsecurelyInsteadOfClearText)
}

private SimpleHttpClientForTesting(BasicHttpClientConnectionManager connectionManager) {
httpClient = HttpClients.custom().setConnectionManager(connectionManager).build();
var requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(DEFAULT_CONNECTION_TIMEOUT)
.setResponseTimeout(DEFAULT_RESPONSE_TIMEOUT)
.build();

httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.build();
}

@AllArgsConstructor
Expand Down
5 changes: 5 additions & 0 deletions TrafficCapture/trafficCaptureProxyServer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ dependencies {
testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation testFixtures(project(path: ':captureOffloader'))
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: '2.1.7'
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5'
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5'
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5'
testImplementation group: 'org.testcontainers', name: 'toxiproxy', version: '1.19.5'
}

tasks.withType(Tar){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package org.opensearch.migrations.trafficcapture.proxyserver;

import static org.junit.jupiter.api.Assertions.assertEquals;

import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.opensearch.migrations.testutils.SimpleHttpClientForTesting;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.CaptureProxyContainer;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.HttpdContainerTestBase;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.KafkaContainerTestBase;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.ToxiproxyContainerTestBase;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.HttpdContainerTest;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.KafkaContainerTest;
import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.ToxiproxyContainerTest;

@Slf4j
@KafkaContainerTest
@HttpdContainerTest
@ToxiproxyContainerTest
public class KafkaConfigurationCaptureProxyTest {

private static final KafkaContainerTestBase kafkaTestBase = new KafkaContainerTestBase();
private static final HttpdContainerTestBase httpdTestBase = new HttpdContainerTestBase();
private static final ToxiproxyContainerTestBase toxiproxyTestBase = new ToxiproxyContainerTestBase();
private static final String HTTPD_GET_EXPECTED_RESPONSE = "<html><body><h1>It works!</h1></body></html>\n";
private static final int DEFAULT_NUMBER_OF_CALLS = 3;
private static final long PROXY_EXPECTED_MAX_LATENCY_MS = Duration.ofSeconds(1).toMillis();
private Proxy kafkaProxy;
private Proxy destinationProxy;

@BeforeAll
public static void setUp() {
kafkaTestBase.start();
httpdTestBase.start();
toxiproxyTestBase.start();
}

@AfterAll
public static void tearDown() {
kafkaTestBase.stop();
httpdTestBase.stop();
toxiproxyTestBase.stop();
}

private static void assertLessThan(long ceiling, long actual) {
Assertions.assertTrue(actual < ceiling,
() -> "Expected actual value to be less than " + ceiling + " but was " + actual + ".");
}

@BeforeEach
public void setUpTest() {
kafkaProxy = toxiproxyTestBase.getProxy(kafkaTestBase.getContainer());
destinationProxy = toxiproxyTestBase.getProxy(httpdTestBase.getContainer());
}

@AfterEach
public void tearDownTest() {
toxiproxyTestBase.deleteProxy(kafkaProxy);
toxiproxyTestBase.deleteProxy(destinationProxy);
}

@ParameterizedTest
@EnumSource(FailureMode.class)
@Disabled
// TODO: Fix proxy bug and enable test
public void testCaptureProxyWithKafkaImpairedBeforeStart(FailureMode failureMode) {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
failureMode.apply(kafkaProxy);

captureProxy.start();

var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS);

assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis());
}
}

@ParameterizedTest
@EnumSource(FailureMode.class)
public void testCaptureProxyWithKafkaImpairedAfterStart(FailureMode failureMode) {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
captureProxy.start();

failureMode.apply(kafkaProxy);

var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS);

assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis());
}
}

@ParameterizedTest
@EnumSource(FailureMode.class)
public void testCaptureProxyWithKafkaImpairedDoesNotAffectRequest_proxysRequest(FailureMode failureMode) {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
captureProxy.start();
final int numberOfTests = 20;

// Performance is different for first few calls so throw them away
assertBasicCalls(captureProxy, 3);

var averageBaselineDuration = assertBasicCalls(captureProxy, numberOfTests);

failureMode.apply(kafkaProxy);

// Calculate average duration of impaired calls
var averageImpairedDuration = assertBasicCalls(captureProxy, numberOfTests);

long acceptableDifference = Duration.ofMillis(25).toMillis();

log.info("Baseline Duration: {}ms, Impaired Duration: {}ms", averageBaselineDuration.toMillis(),
averageImpairedDuration.toMillis());

assertEquals(averageBaselineDuration.toMillis(), averageImpairedDuration.toMillis(), acceptableDifference,
"The average durations are not close enough");
}
}

@Test
public void testCaptureProxyLatencyAddition() {
try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) {
captureProxy.start();
final int numberOfTests = 25;

// Performance is different for first few calls so throw them away
assertBasicCalls(captureProxy, 3);

var averageRequestDurationWithProxy = assertBasicCalls(captureProxy, numberOfTests);

var averageNoProxyDuration = assertBasicCalls(toxiproxyTestBase.getProxyUrlHttp(destinationProxy),
numberOfTests);

var acceptableProxyLatencyAdd = Duration.ofMillis(25);

assertLessThan(averageNoProxyDuration.plus(acceptableProxyLatencyAdd).toMillis(),
averageRequestDurationWithProxy.toMillis());
}
}

private Duration assertBasicCalls(CaptureProxyContainer proxy, int numberOfCalls) {
return assertBasicCalls(CaptureProxyContainer.getUriFromContainer(proxy), numberOfCalls);
}

private Duration assertBasicCalls(String endpoint, int numberOfCalls) {
return IntStream.range(0, numberOfCalls).mapToObj(i -> assertBasicCall(endpoint))
.reduce(Duration.ZERO, Duration::plus).dividedBy(numberOfCalls);
}


private Duration assertBasicCall(String endpoint) {
try (var client = new SimpleHttpClientForTesting()) {
long startTimeNanos = System.nanoTime();
var response = client.makeGetRequest(URI.create(endpoint), Stream.empty());
long endTimeNanos = System.nanoTime();

var responseBody = new String(response.payloadBytes);
assertEquals(HTTPD_GET_EXPECTED_RESPONSE, responseBody);
return Duration.ofNanos(endTimeNanos - startTimeNanos);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public enum FailureMode {
LATENCY(
(proxy) -> proxy.toxics().latency("latency", ToxicDirection.UPSTREAM, 5000)),
BANDWIDTH(
(proxy) -> proxy.toxics().bandwidth("bandwidth", ToxicDirection.DOWNSTREAM, 1)),
TIMEOUT(
(proxy) -> proxy.toxics().timeout("timeout", ToxicDirection.UPSTREAM, 5000)),
SLICER(
(proxy) -> {
proxy.toxics().slicer("slicer_down", ToxicDirection.DOWNSTREAM, 1, 1000);
proxy.toxics().slicer("slicer_up", ToxicDirection.UPSTREAM, 1, 1000);
}),
SLOW_CLOSE(
(proxy) -> proxy.toxics().slowClose("slow_close", ToxicDirection.UPSTREAM, 5000)),
RESET_PEER(
(proxy) -> proxy.toxics().resetPeer("reset_peer", ToxicDirection.UPSTREAM, 5000)),
LIMIT_DATA(
(proxy) -> proxy.toxics().limitData("limit_data", ToxicDirection.UPSTREAM, 10)),
DISCONNECT(Proxy::disable);
private final ThrowingConsumer<Proxy> failureModeApplier;

FailureMode(ThrowingConsumer<Proxy> applier) {
this.failureModeApplier = applier;
}

public void apply(Proxy proxy) {
try {
this.failureModeApplier.accept(proxy);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
}
Loading

0 comments on commit 8cdca98

Please sign in to comment.