diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java index 3a5d5112b..7ef0bae24 100644 --- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java +++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java @@ -1,6 +1,8 @@ package org.opensearch.migrations.testutils; +import java.io.IOException; +import java.net.ServerSocket; import lombok.extern.slf4j.Slf4j; import java.util.Random; @@ -17,7 +19,6 @@ public class PortFinder { private PortFinder() {} private static final int MAX_PORT_TRIES = 100; - private static final Random random = new Random(); public static class ExceededMaxPortAssigmentAttemptException extends Exception { public ExceededMaxPortAssigmentAttemptException(Throwable cause) { @@ -30,11 +31,11 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r) int numTries = 0; while (true) { try { - int port = random.nextInt((2 << 15) - 1025) + 1025; - r.accept(Integer.valueOf(port)); + int port = findOpenPort(); + r.accept(port); return port; } catch (Exception e) { - if (++numTries <= MAX_PORT_TRIES) { + if (++numTries >= MAX_PORT_TRIES) { log.atError().setCause(e).setMessage(()->"Exceeded max tries {} giving up") .addArgument(MAX_PORT_TRIES).log(); throw new ExceededMaxPortAssigmentAttemptException(e); @@ -44,4 +45,14 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r) } } + public static int findOpenPort() { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + log.info("Open port found: " + port); + return port; + } catch (IOException e) { + log.error("Failed to find an open port: " + e.getMessage()); + throw new RuntimeException(e); + } + } } \ No newline at end of file diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java index a2bba88f2..13799499b 100644 --- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java +++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java @@ -5,6 +5,8 @@ import lombok.AllArgsConstructor; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.classic.methods.HttpPut; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; @@ -19,6 +21,8 @@ import org.apache.hc.core5.http.io.entity.InputStreamEntity; import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.Timeout; + import java.nio.charset.Charset; import java.io.IOException; @@ -38,6 +42,9 @@ */ public class SimpleHttpClientForTesting implements AutoCloseable { + private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5); + private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5); + private final CloseableHttpClient httpClient; private static BasicHttpClientConnectionManager getInsecureTlsConnectionManager() @@ -65,7 +72,15 @@ public SimpleHttpClientForTesting(boolean useTlsAndInsecurelyInsteadOfClearText) } private SimpleHttpClientForTesting(BasicHttpClientConnectionManager connectionManager) { - httpClient = HttpClients.custom().setConnectionManager(connectionManager).build(); + var requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(DEFAULT_CONNECTION_TIMEOUT) + .setResponseTimeout(DEFAULT_RESPONSE_TIMEOUT) + .build(); + + httpClient = HttpClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) + .build(); } @AllArgsConstructor diff --git a/TrafficCapture/trafficCaptureProxyServer/build.gradle b/TrafficCapture/trafficCaptureProxyServer/build.gradle index 74fd3dfe8..fdec3d131 100644 --- a/TrafficCapture/trafficCaptureProxyServer/build.gradle +++ b/TrafficCapture/trafficCaptureProxyServer/build.gradle @@ -41,6 +41,11 @@ dependencies { testImplementation testFixtures(project(path: ':testUtilities')) testImplementation testFixtures(project(path: ':captureOffloader')) testImplementation testFixtures(project(path: ':coreUtilities')) + testImplementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: '2.1.7' + testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'toxiproxy', version: '1.19.5' } tasks.withType(Tar){ diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java new file mode 100644 index 000000000..cfae95c80 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java @@ -0,0 +1,217 @@ +package org.opensearch.migrations.trafficcapture.proxyserver; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.model.ToxicDirection; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.ThrowingConsumer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.opensearch.migrations.testutils.SimpleHttpClientForTesting; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.CaptureProxyContainer; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.HttpdContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.KafkaContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.ToxiproxyContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.HttpdContainerTest; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.KafkaContainerTest; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.ToxiproxyContainerTest; + +@Slf4j +@KafkaContainerTest +@HttpdContainerTest +@ToxiproxyContainerTest +public class KafkaConfigurationCaptureProxyTest { + + private static final KafkaContainerTestBase kafkaTestBase = new KafkaContainerTestBase(); + private static final HttpdContainerTestBase httpdTestBase = new HttpdContainerTestBase(); + private static final ToxiproxyContainerTestBase toxiproxyTestBase = new ToxiproxyContainerTestBase(); + private static final String HTTPD_GET_EXPECTED_RESPONSE = "

It works!

\n"; + private static final int DEFAULT_NUMBER_OF_CALLS = 3; + private static final long PROXY_EXPECTED_MAX_LATENCY_MS = Duration.ofSeconds(1).toMillis(); + private Proxy kafkaProxy; + private Proxy destinationProxy; + + @BeforeAll + public static void setUp() { + kafkaTestBase.start(); + httpdTestBase.start(); + toxiproxyTestBase.start(); + } + + @AfterAll + public static void tearDown() { + kafkaTestBase.stop(); + httpdTestBase.stop(); + toxiproxyTestBase.stop(); + } + + private static void assertLessThan(long ceiling, long actual) { + Assertions.assertTrue(actual < ceiling, + () -> "Expected actual value to be less than " + ceiling + " but was " + actual + "."); + } + + @BeforeEach + public void setUpTest() { + kafkaProxy = toxiproxyTestBase.getProxy(kafkaTestBase.getContainer()); + destinationProxy = toxiproxyTestBase.getProxy(httpdTestBase.getContainer()); + } + + @AfterEach + public void tearDownTest() { + toxiproxyTestBase.deleteProxy(kafkaProxy); + toxiproxyTestBase.deleteProxy(destinationProxy); + } + + @ParameterizedTest + @EnumSource(FailureMode.class) + @Disabled + // TODO: Fix proxy bug and enable test + public void testCaptureProxyWithKafkaImpairedBeforeStart(FailureMode failureMode) { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + failureMode.apply(kafkaProxy); + + captureProxy.start(); + + var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS); + + assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis()); + } + } + + @ParameterizedTest + @EnumSource(FailureMode.class) + public void testCaptureProxyWithKafkaImpairedAfterStart(FailureMode failureMode) { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + captureProxy.start(); + + failureMode.apply(kafkaProxy); + + var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS); + + assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis()); + } + } + + @ParameterizedTest + @EnumSource(FailureMode.class) + public void testCaptureProxyWithKafkaImpairedDoesNotAffectRequest_proxysRequest(FailureMode failureMode) { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + captureProxy.start(); + final int numberOfTests = 20; + + // Performance is different for first few calls so throw them away + assertBasicCalls(captureProxy, 3); + + var averageBaselineDuration = assertBasicCalls(captureProxy, numberOfTests); + + failureMode.apply(kafkaProxy); + + // Calculate average duration of impaired calls + var averageImpairedDuration = assertBasicCalls(captureProxy, numberOfTests); + + long acceptableDifference = Duration.ofMillis(25).toMillis(); + + log.info("Baseline Duration: {}ms, Impaired Duration: {}ms", averageBaselineDuration.toMillis(), + averageImpairedDuration.toMillis()); + + assertEquals(averageBaselineDuration.toMillis(), averageImpairedDuration.toMillis(), acceptableDifference, + "The average durations are not close enough"); + } + } + + @Test + public void testCaptureProxyLatencyAddition() { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + captureProxy.start(); + final int numberOfTests = 25; + + // Performance is different for first few calls so throw them away + assertBasicCalls(captureProxy, 3); + + var averageRequestDurationWithProxy = assertBasicCalls(captureProxy, numberOfTests); + + var averageNoProxyDuration = assertBasicCalls(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + numberOfTests); + + var acceptableProxyLatencyAdd = Duration.ofMillis(25); + + assertLessThan(averageNoProxyDuration.plus(acceptableProxyLatencyAdd).toMillis(), + averageRequestDurationWithProxy.toMillis()); + } + } + + private Duration assertBasicCalls(CaptureProxyContainer proxy, int numberOfCalls) { + return assertBasicCalls(CaptureProxyContainer.getUriFromContainer(proxy), numberOfCalls); + } + + private Duration assertBasicCalls(String endpoint, int numberOfCalls) { + return IntStream.range(0, numberOfCalls).mapToObj(i -> assertBasicCall(endpoint)) + .reduce(Duration.ZERO, Duration::plus).dividedBy(numberOfCalls); + } + + + private Duration assertBasicCall(String endpoint) { + try (var client = new SimpleHttpClientForTesting()) { + long startTimeNanos = System.nanoTime(); + var response = client.makeGetRequest(URI.create(endpoint), Stream.empty()); + long endTimeNanos = System.nanoTime(); + + var responseBody = new String(response.payloadBytes); + assertEquals(HTTPD_GET_EXPECTED_RESPONSE, responseBody); + return Duration.ofNanos(endTimeNanos - startTimeNanos); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public enum FailureMode { + LATENCY( + (proxy) -> proxy.toxics().latency("latency", ToxicDirection.UPSTREAM, 5000)), + BANDWIDTH( + (proxy) -> proxy.toxics().bandwidth("bandwidth", ToxicDirection.DOWNSTREAM, 1)), + TIMEOUT( + (proxy) -> proxy.toxics().timeout("timeout", ToxicDirection.UPSTREAM, 5000)), + SLICER( + (proxy) -> { + proxy.toxics().slicer("slicer_down", ToxicDirection.DOWNSTREAM, 1, 1000); + proxy.toxics().slicer("slicer_up", ToxicDirection.UPSTREAM, 1, 1000); + }), + SLOW_CLOSE( + (proxy) -> proxy.toxics().slowClose("slow_close", ToxicDirection.UPSTREAM, 5000)), + RESET_PEER( + (proxy) -> proxy.toxics().resetPeer("reset_peer", ToxicDirection.UPSTREAM, 5000)), + LIMIT_DATA( + (proxy) -> proxy.toxics().limitData("limit_data", ToxicDirection.UPSTREAM, 10)), + DISCONNECT(Proxy::disable); + private final ThrowingConsumer failureModeApplier; + + FailureMode(ThrowingConsumer applier) { + this.failureModeApplier = applier; + } + + public void apply(Proxy proxy) { + try { + this.failureModeApplier.accept(proxy); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java new file mode 100644 index 000000000..ed9f44d67 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java @@ -0,0 +1,134 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.testutils.PortFinder; +import org.opensearch.migrations.trafficcapture.proxyserver.CaptureProxy; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.containers.wait.strategy.WaitStrategyTarget; +import org.testcontainers.lifecycle.Startable; + +@Slf4j +public class CaptureProxyContainer + extends GenericContainer implements AutoCloseable, WaitStrategyTarget, Startable { + + private static final Duration TIMEOUT_DURATION = Duration.ofSeconds(30); + private final Supplier destinationUriSupplier; + private final Supplier kafkaUriSupplier; + private Integer listeningPort; + private Thread serverThread; + + public CaptureProxyContainer(final Supplier destinationUriSupplier, + final Supplier kafkaUriSupplier) { + this.destinationUriSupplier = destinationUriSupplier; + this.kafkaUriSupplier = kafkaUriSupplier; + } + + public CaptureProxyContainer(final String destinationUri, final String kafkaUri) { + this.destinationUriSupplier = () -> destinationUri; + this.kafkaUriSupplier = () -> kafkaUri; + } + + public CaptureProxyContainer(final Container destination, final KafkaContainer kafka) { + this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka)); + } + + public static String getUriFromContainer(final Container container) { + return "http://" + container.getHost() + ":" + container.getFirstMappedPort(); + } + + @Override + public void start() { + this.listeningPort = PortFinder.findOpenPort(); + serverThread = new Thread(() -> { + try { + String[] args = { + "--kafkaConnection", kafkaUriSupplier.get(), + "--destinationUri", destinationUriSupplier.get(), + "--listenPort", String.valueOf(listeningPort), + "--insecureDestination" + }; + + CaptureProxy.main(args); + } catch (Exception e) { + throw new AssertionError("Should not have exception", e); + } + }); + + serverThread.start(); + new HttpWaitStrategy().forPort(listeningPort) + .withStartupTimeout(TIMEOUT_DURATION) + .waitUntilReady(this); + } + + @Override + public boolean isRunning() { + return serverThread != null; + } + + @Override + public void stop() { + if (serverThread != null) { + serverThread.interrupt(); + this.serverThread = null; + } + this.listeningPort = null; + close(); + } + + @Override + public void close() { + } + + @Override + public Set getLivenessCheckPortNumbers() { + return getExposedPorts() + .stream() + .map(this::getMappedPort) + .collect(Collectors.toSet()); + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public Integer getMappedPort(int originalPort) { + if (getExposedPorts().contains(originalPort)) { + return listeningPort; + } + return null; + } + + @Override + public List getExposedPorts() { + // Internal and External ports are the same + return List.of(listeningPort); + } + + @Override + public InspectContainerResponse getContainerInfo() { + return new InspectNonContainerResponse("captureProxy"); + } + + @AllArgsConstructor + static class InspectNonContainerResponse extends InspectContainerResponse { + + private String name; + + @Override + public String getName() { + return name; + } + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java new file mode 100644 index 000000000..f16993eb7 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import org.testcontainers.containers.GenericContainer; + +public class HttpdContainerTestBase extends TestContainerTestBase> { + + private static final GenericContainer httpd = new GenericContainer("httpd:alpine") + .withExposedPorts(80); // Container Port + + public GenericContainer getContainer() { + return httpd; + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java new file mode 100644 index 000000000..be90048b6 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java @@ -0,0 +1,14 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +public class KafkaContainerTestBase extends TestContainerTestBase { + + private static final KafkaContainer kafka = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:latest")); + + public KafkaContainer getContainer() { + return kafka; + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java new file mode 100644 index 000000000..0d1e23f50 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java @@ -0,0 +1,16 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import org.testcontainers.containers.GenericContainer; + +abstract class TestContainerTestBase> { + + public void start() { + getContainer().start(); + } + + public void stop() { + getContainer().start(); + } + + abstract T getContainer(); +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java new file mode 100644 index 000000000..de773a9b9 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java @@ -0,0 +1,76 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; +import java.io.IOException; +import java.util.HashSet; +import java.util.concurrent.ConcurrentSkipListSet; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.ToxiproxyContainer; + +public class ToxiproxyContainerTestBase extends TestContainerTestBase { + + private static final ToxiproxyContainer toxiproxy = new ToxiproxyContainer( + "ghcr.io/shopify/toxiproxy:latest") + .withAccessToHost(true); + + final ConcurrentSkipListSet toxiproxyUnusedExposedPorts = new ConcurrentSkipListSet<>(); + + static int getListeningPort(Proxy proxy) { + return Integer.parseInt(proxy.getListen().replaceAll(".*:", "")); + } + + public ToxiproxyContainer getContainer() { + return toxiproxy; + } + + @Override + public void start() { + final int TOXIPROXY_CONTROL_PORT = 8474; + getContainer().start(); + var concurrentPortSet = new HashSet<>(getContainer().getExposedPorts()); + concurrentPortSet.remove(TOXIPROXY_CONTROL_PORT); + toxiproxyUnusedExposedPorts.addAll(concurrentPortSet); + } + + @Override + public void stop() { + toxiproxyUnusedExposedPorts.clear(); + getContainer().stop(); + } + + public void deleteProxy(Proxy proxy) { + var proxyPort = getListeningPort(proxy); + try { + proxy.delete(); + toxiproxyUnusedExposedPorts.add(proxyPort); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Proxy getProxy(GenericContainer container) { + var containerPort = container.getFirstMappedPort(); + final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), + getContainer().getControlPort()); + org.testcontainers.Testcontainers.exposeHostPorts(containerPort); + try { + var containerName = (container.getDockerImageName() + "_" + container.getContainerName() + "_" + + Thread.currentThread().getId()).replaceAll("[^a-zA-Z0-9_]+", "_"); + synchronized (toxiproxyUnusedExposedPorts) { + var proxyPort = toxiproxyUnusedExposedPorts.first(); + var proxy = toxiproxyClient.createProxy(containerName, "0.0.0.0:" + proxyPort, + "host.testcontainers.internal" + ":" + containerPort); + toxiproxyUnusedExposedPorts.remove(proxyPort); + proxy.enable(); + return proxy; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getProxyUrlHttp(Proxy proxy) { + return "http://" + getContainer().getHost() + ":" + getContainer().getMappedPort(getListeningPort(proxy)); + } +} \ No newline at end of file diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java new file mode 100644 index 000000000..719069bae --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.parallel.ResourceLock; + +@Inherited +@ResourceLock("HttpdContainer") +@TestContainerTest +public @interface HttpdContainerTest { + +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java new file mode 100644 index 000000000..30c5a2cd2 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.parallel.ResourceLock; + +@Inherited +@ResourceLock("KafkaContainer") +@TestContainerTest +public @interface KafkaContainerTest { + +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java new file mode 100644 index 000000000..1163dff3e --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.Tag; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Inherited +@Tag("longTest") +@Testcontainers(disabledWithoutDocker = true, parallel = true) +public @interface TestContainerTest { +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java new file mode 100644 index 000000000..85146b080 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java @@ -0,0 +1,10 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.parallel.ResourceLock; + +@Inherited +@ResourceLock("ToxiproxyContainerTestBase") +@TestContainerTest +public @interface ToxiproxyContainerTest { +} diff --git a/TrafficCapture/trafficReplayer/build.gradle b/TrafficCapture/trafficReplayer/build.gradle index 368eb1901..4e0a52e03 100644 --- a/TrafficCapture/trafficReplayer/build.gradle +++ b/TrafficCapture/trafficReplayer/build.gradle @@ -82,9 +82,9 @@ dependencies { testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1' testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api', version:'5.x.x' - testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.0' - testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.0' - testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.0' + testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5' testImplementation group: 'org.mockito', name:'mockito-core', version:'4.6.1' testImplementation group: 'org.mockito', name:'mockito-junit-jupiter', version:'4.6.1' testRuntimeOnly group:'org.junit.jupiter', name:'junit-jupiter-engine', version:'5.x.x'