diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 1282d1a34..9c8d09a88 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -16,10 +16,10 @@ jobs: python-version: ["3.10"] steps: - name: Checkout Repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.github/workflows/add-untriaged.yml b/.github/workflows/add-untriaged.yml index 9dcc7020d..74e494be1 100644 --- a/.github/workflows/add-untriaged.yml +++ b/.github/workflows/add-untriaged.yml @@ -8,7 +8,7 @@ jobs: apply-label: runs-on: ubuntu-latest steps: - - uses: actions/github-script@v6 + - uses: actions/github-script@v7 with: script: | github.rest.issues.addLabels({ diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml index 5a75d2c87..0400f8daf 100644 --- a/.github/workflows/backport.yml +++ b/.github/workflows/backport.yml @@ -16,14 +16,14 @@ jobs: steps: - name: GitHub App token id: github_app_token - uses: tibdex/github-app-token@v1.5.0 + uses: tibdex/github-app-token@v2 with: app_id: ${{ secrets.APP_ID }} private_key: ${{ secrets.APP_PRIVATE_KEY }} installation_id: 22958780 - name: Backport - uses: VachaShah/backport@v2.1.0 + uses: VachaShah/backport@v2 with: github_token: ${{ steps.github_app_token.outputs.token }} head_template: backport/backport-<%= number %>-to-<%= base %> diff --git a/.github/workflows/e2eTest.yml b/.github/workflows/e2eTest.yml index 7393ca28c..7e4419dc6 100644 --- a/.github/workflows/e2eTest.yml +++ b/.github/workflows/e2eTest.yml @@ -14,22 +14,27 @@ jobs: steps: - name: Check out code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up JDK - uses: actions/setup-java@v2 + uses: actions/setup-java@v4 with: java-version: '11' - distribution: 'adopt' + distribution: 'corretto' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + with: + gradle-version: 8.0.2 + gradle-home-cache-cleanup: true + cache-read-only: false - name: Start Docker Solution - run: | - cd TrafficCapture - chmod +x ./gradlew - ./gradlew dockerSolution:ComposeUp + run: ./gradlew dockerSolution:ComposeUp -x test + working-directory: TrafficCapture - name: Set up Python - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: '3.10' diff --git a/.github/workflows/gradle-build-and-test.yml b/.github/workflows/gradle-build-and-test.yml index 5ddda0506..fc3b82db4 100644 --- a/.github/workflows/gradle-build-and-test.yml +++ b/.github/workflows/gradle-build-and-test.yml @@ -8,15 +8,23 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - name: Checkout project sources + uses: actions/checkout@v4 + - name: Set up JDK 11 - uses: actions/setup-java@v2 + uses: actions/setup-java@v4 with: java-version: '11' - distribution: 'adopt' + distribution: 'corretto' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + with: + gradle-version: 8.0.2 + gradle-home-cache-cleanup: true - name: Run Gradle Build - run: ./gradlew build -x test + run: ./gradlew build -x test --info working-directory: TrafficCapture - name: Run Tests with Coverage @@ -24,7 +32,7 @@ jobs: working-directory: TrafficCapture - name: Upload to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: files: "TrafficCapture/**/jacocoTestReport.xml" flags: unittests diff --git a/.github/workflows/linkCheck.yml b/.github/workflows/linkCheck.yml index 382a2a6bd..23af1a6da 100644 --- a/.github/workflows/linkCheck.yml +++ b/.github/workflows/linkCheck.yml @@ -13,10 +13,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: lychee Link Checker id: lychee - uses: lycheeverse/lychee-action@v1.5.0 + uses: lycheeverse/lychee-action@v1 with: args: --verbose --accept=200,403,429 "**/*.html" "**/*.md" "**/*.txt" "**/*.json" --exclude "file:///github/workspace/*" diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index dfbd65288..fcfe5b1a4 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -20,9 +20,9 @@ jobs: working-directory: ./experimental/cluster_migration_core steps: - name: Checkout Repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install Dependencies for Framework and Test Coverage @@ -32,6 +32,6 @@ jobs: run: | python -m pytest unit_tests/ --cov=cluster_migration_core --cov-report=xml --cov-branch - name: Upload Coverage Report - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: files: cluster_migration_core/coverage.xml diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml index 1c91b32f0..5f9c786c6 100644 --- a/.github/workflows/release-drafter.yml +++ b/.github/workflows/release-drafter.yml @@ -11,7 +11,7 @@ jobs: name: Draft a release runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - id: get_data run: | echo "approvers=$(cat .github/CODEOWNERS | grep @ | tr -d '*\n ' | sed 's/@/,/g' | sed 's/,//1')" >> $GITHUB_OUTPUT diff --git a/TrafficCapture/build.gradle b/TrafficCapture/build.gradle index d8c352490..58afc0aa0 100644 --- a/TrafficCapture/build.gradle +++ b/TrafficCapture/build.gradle @@ -3,8 +3,10 @@ plugins { id 'org.owasp.dependencycheck' version '8.2.1' } -repositories { - mavenCentral() +allprojects { + repositories { + mavenCentral() + } } allprojects { @@ -19,38 +21,34 @@ allprojects { } jacoco { - toolVersion = '0.8.9' + toolVersion = '0.8.11' } - test { + tasks.withType(Test) { // Provide way to exclude particular tests from CLI // e.g. ./gradlew test -PexcludeTests=**/KafkaProtobufConsumerLongTermTest* if (project.hasProperty('excludeTests')) { exclude project.property('excludeTests') } + useJUnitPlatform { + systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' + systemProperty 'junit.jupiter.execution.parallel.mode.default', "concurrent" + systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'concurrent' + } + } + + test { systemProperty 'disableMemoryLeakTests', 'true' useJUnitPlatform { excludeTags 'longTest' } - jvmArgs '-ea' - jacoco { enabled = false } } task slowTest(type: Test) { - dependsOn test - // Provide way to exclude particular tests from CLI - // e.g. ./gradlew test -PexcludeTests=**/KafkaProtobufConsumerLongTermTest* - if (project.hasProperty('excludeTests')) { - exclude project.property('excludeTests') - } systemProperty 'disableMemoryLeakTests', 'false' - useJUnitPlatform { - } - jvmArgs '-ea' - jacoco { enabled = true } @@ -86,4 +84,4 @@ jacocoTestReport { html.required = true html.destination file("${buildDir}/reports/jacoco/test/html") } -} \ No newline at end of file +} diff --git a/TrafficCapture/gradle.properties b/TrafficCapture/gradle.properties new file mode 100644 index 000000000..5ca1b8ea9 --- /dev/null +++ b/TrafficCapture/gradle.properties @@ -0,0 +1,7 @@ +org.gradle.caching=true +org.gradle.configuration-cache=true +org.gradle.configureondemand=true + +# Set Gradle Daemon's idle timeout to 30 minutes +org.gradle.daemon.idletimeout=1800000 +org.gradle.parallel=true \ No newline at end of file diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java index f4fe7b893..9fecf3e17 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.testutils.TestUtilities; @@ -128,6 +129,7 @@ private static byte[] consumeIntoArray(ByteBuf m) { @ParameterizedTest @ValueSource(booleans = {true, false}) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); var bb = TestUtilities.getByteBuf(fullTrafficBytes, usePool); @@ -137,6 +139,7 @@ public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) th @ParameterizedTest @ValueSource(booleans = {true, false}) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInTinyPacketsBlocksFutureActivity(boolean usePool) throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); writeMessageAndVerify(fullTrafficBytes, getSingleByteAtATimeWriter(usePool, fullTrafficBytes)); @@ -261,6 +264,7 @@ private Consumer getWriter(boolean singleBytes, boolean usePool @ParameterizedTest @ValueSource(booleans = {true, false}) @WrapWithNettyLeakDetection(repetitions = 16) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInTinyPacketsBlocksFutureActivity(usePool); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); @@ -269,6 +273,7 @@ public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boo @ParameterizedTest @ValueSource(booleans = {true, false}) @WrapWithNettyLeakDetection(repetitions = 32) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInASinglePacketBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInASinglePacketBlocksFutureActivity(usePool); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); diff --git a/TrafficCapture/replayerPlugins/build.gradle b/TrafficCapture/replayerPlugins/build.gradle index fb3c3f88a..6b416abc4 100644 --- a/TrafficCapture/replayerPlugins/build.gradle +++ b/TrafficCapture/replayerPlugins/build.gradle @@ -2,4 +2,4 @@ subprojects { repositories { mavenCentral() } -} \ No newline at end of file +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java index 46a20b517..6c165e39b 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java @@ -319,8 +319,6 @@ private String toStringOnThread() { sb.append(", numInProgressItems=").append(inProgressItems.size()); sb.append(", numReadyItems=").append(readyItems.size()); } - sb.append(", inProgressItems=").append(inProgressItems); - sb.append(", readyItems=").append(readyItems); sb.append(", itemSupplier=").append(itemSupplier); sb.append(", onExpirationConsumer=").append(onExpirationConsumer); sb.append(", eventLoop=").append(eventLoop); diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java index 47944c922..03f162fe1 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java @@ -20,8 +20,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.junit.jupiter.api.parallel.Isolated; @Slf4j +@Isolated("Isolation based on temporal checks") class ExpiringSubstitutableItemPoolTest { public static final int NUM_POOLED_ITEMS = 5; @@ -69,7 +71,6 @@ void get() throws Exception { return rval; }, item->{ - expireCountdownLatch.countDown(); log.info("Expiring item: "+item); try { expiredItems.add(item.get()); @@ -79,6 +80,7 @@ void get() throws Exception { } catch (ExecutionException e) { throw Lombok.sneakyThrow(e); } + expireCountdownLatch.countDown(); }); for (int i = 0; iblockingSource.readNextTrafficStreamChunk(rootContext::createReadChunkContext) - .get(10, TimeUnit.MILLISECONDS)); + .get(10000, TimeUnit.MILLISECONDS)); Assertions.assertInstanceOf(EOFException.class, exception.getCause()); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java index 74fbb8c49..b3ea65f49 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource; @@ -41,6 +42,7 @@ protected TestContext makeInstrumentationContext() { } @Test + @ResourceLock("TrafficReplayerRunner") public void testSingleStreamWithCloseIsCommitted() throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), @@ -57,11 +59,13 @@ public void testSingleStreamWithCloseIsCommitted() throws Throwable { () -> TestContext.withAllTracking(), trafficSourceSupplier); Assertions.assertEquals(1, trafficSourceSupplier.nextReadCursor.get()); + httpServer.close(); log.info("done"); } @ParameterizedTest @ValueSource(ints = {1,2}) + @ResourceLock("TrafficReplayerRunner") public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), @@ -108,7 +112,8 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro Assertions.assertTrue(wasNew); }); } finally { - tr.shutdown(null); + tr.shutdown(null).join(); + httpServer.close(); } Assertions.assertEquals(numRequests, tuplesReceived.size()); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java index b5bdb7e5c..5188af001 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; @@ -74,6 +75,7 @@ public Consumer get() { "-1,true", }) @Tag("longTest") + @ResourceLock("TrafficReplayerRunner") public void fullTest(int testSize, boolean randomize) throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(200), diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java index 9dd8164c7..8b6f16a19 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java @@ -10,6 +10,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.parallel.ResourceLock; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.opensearch.migrations.replay.kafka.KafkaTestUtils; import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource; @@ -76,7 +78,7 @@ public Consumer get() { } } - //@ParameterizedTest + @ParameterizedTest @CsvSource(value = { "3,false", "-1,false", @@ -84,6 +86,7 @@ public Consumer get() { "-1,true", }) @Tag("longTest") + @ResourceLock("TrafficReplayerRunner") public void fullTest(int testSize, boolean randomize) throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), @@ -102,6 +105,7 @@ public void fullTest(int testSize, boolean randomize) throws Throwable { rootContext -> new SentinelSensingTrafficSource( new KafkaTrafficCaptureSource(rootContext, buildKafkaConsumer(), TEST_TOPIC_NAME, Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS)))); + httpServer.close(); log.info("done"); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 7f01c7d56..fe335a252 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -8,6 +8,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; import org.opensearch.migrations.testutils.SimpleHttpServer; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; @@ -30,6 +32,7 @@ class RequestSenderOrchestratorTest extends InstrumentationTest { @Test @Tag("longTest") + @Execution(ExecutionMode.SAME_THREAD) public void testThatSchedulingWorks() throws Exception { var httpServer = SimpleHttpServer.makeServer(false, r -> TestHttpServerContext.makeResponse(r, Duration.ofMillis(100))); @@ -77,6 +80,7 @@ public void testThatSchedulingWorks() throws Exception { } } closeFuture.get(); + httpServer.close(); } private List makeRequest(int i) { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 3ff102293..3a294e8bc 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -3,15 +3,23 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.Logger; -import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumerTest; import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus; import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKeyAndContext; @@ -20,15 +28,7 @@ import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.tracing.InstrumentationTest; import org.opensearch.migrations.tracing.TestContext; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; +import org.slf4j.LoggerFactory; @Slf4j @WrapWithNettyLeakDetection(repetitions = 4) @@ -43,28 +43,38 @@ protected TestContext makeInstrumentationContext() { } private static class CloseableLogSetup implements AutoCloseable { - List logEvents = new ArrayList<>(); + List logEvents = Collections.synchronizedList(new ArrayList<>()); AbstractAppender testAppender; + org.slf4j.Logger testLogger; + org.apache.logging.log4j.core.Logger internalLogger; + + final String instanceName; + public CloseableLogSetup() { - testAppender = new AbstractAppender(ResultsToLogsConsumer.OUTPUT_TUPLE_JSON_LOGGER, + instanceName = this.getClass().getName() + ".Thread" + Thread.currentThread().getId(); + + testAppender = new AbstractAppender(instanceName, null, null, false, null) { @Override public void append(LogEvent event) { logEvents.add(event.getMessage().getFormattedMessage()); } }; - var tupleLogger = (Logger) LogManager.getLogger(ResultsToLogsConsumer.OUTPUT_TUPLE_JSON_LOGGER); - tupleLogger.setLevel(Level.ALL); + testAppender.start(); - tupleLogger.setAdditive(false); - tupleLogger.addAppender(testAppender); - var loggerCtx = ((LoggerContext) LogManager.getContext(false)); + + internalLogger = (org.apache.logging.log4j.core.Logger) LogManager.getLogger(instanceName); + testLogger = LoggerFactory.getLogger(instanceName); + + // Cast to core.Logger to access internal methods + internalLogger.setLevel(Level.ALL); + internalLogger.setAdditive(false); + internalLogger.addAppender(testAppender); } @Override public void close() { - var tupleLogger = (Logger) LogManager.getLogger(ResultsToLogsConsumer.OUTPUT_TUPLE_JSON_LOGGER); - tupleLogger.removeAppender(testAppender); + internalLogger.removeAppender(testAppender); testAppender.stop(); } } @@ -80,6 +90,7 @@ public void testTupleNewWithNullKeyThrows() { } @Test + @ResourceLock("TestContext") public void testOutputterWithNulls() throws IOException { var urk = new UniqueReplayerRequestKey(PojoTrafficStreamKeyAndContext.build(NODE_ID, "c", 0, @@ -87,7 +98,8 @@ public void testOutputterWithNulls() throws IOException { var emptyTuple = new SourceTargetCaptureTuple(rootContext.getTestTupleContext(), null, null, null, null, null, null); try (var closeableLogSetup = new CloseableLogSetup()) { - var consumer = new TupleParserChainConsumer(new ResultsToLogsConsumer()); + var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.testLogger, null); + var consumer = new TupleParserChainConsumer(resultsToLogsConsumer); consumer.accept(emptyTuple); Assertions.assertEquals(1, closeableLogSetup.logEvents.size()); var contents = closeableLogSetup.logEvents.get(0); @@ -97,13 +109,15 @@ public void testOutputterWithNulls() throws IOException { } @Test - public void testOutputterWithException() throws IOException { + @ResourceLock("TestContext") + public void testOutputterWithException() { var exception = new Exception(TEST_EXCEPTION_MESSAGE); var emptyTuple = new SourceTargetCaptureTuple(rootContext.getTestTupleContext(), null, null, null, null, exception, null); try (var closeableLogSetup = new CloseableLogSetup()) { - var consumer = new TupleParserChainConsumer(new ResultsToLogsConsumer()); + var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.testLogger, null); + var consumer = new TupleParserChainConsumer(resultsToLogsConsumer); consumer.accept(emptyTuple); Assertions.assertEquals(1, closeableLogSetup.logEvents.size()); var contents = closeableLogSetup.logEvents.get(0); @@ -113,13 +127,14 @@ public void testOutputterWithException() throws IOException { } } - public static byte[] loadResourceAsBytes(String path) throws IOException { + private static byte[] loadResourceAsBytes(String path) throws IOException { try (InputStream inputStream = ResultsToLogsConsumerTest.class.getResourceAsStream(path)) { return inputStream.readAllBytes(); } } @Test + @ResourceLock("TestContext") public void testOutputterForGet() throws IOException { final String EXPECTED_LOGGED_OUTPUT = "" + @@ -178,6 +193,7 @@ public void testOutputterForGet() throws IOException { } @Test + @ResourceLock("TestContext") public void testOutputterForPost() throws IOException { final String EXPECTED_LOGGED_OUTPUT = "" + "{\n" + @@ -232,7 +248,7 @@ public void testOutputterForPost() throws IOException { testOutputterForRequest("post_formUrlEncoded_withFixedLength.txt", EXPECTED_LOGGED_OUTPUT); } - public void testOutputterForRequest(String requestResourceName, String expected) throws IOException { + private void testOutputterForRequest(String requestResourceName, String expected) throws IOException { var trafficStreamKey = PojoTrafficStreamKeyAndContext.build(NODE_ID, "c", 0, rootContext::createTrafficStreamContextForTest); var sourcePair = new RequestResponsePacketPair(trafficStreamKey, Instant.EPOCH, @@ -251,7 +267,7 @@ public void testOutputterForRequest(String requestResourceName, String expected) var closeableLogSetup = new CloseableLogSetup()) { var tuple = new SourceTargetCaptureTuple(tupleContext, sourcePair, targetRequest, targetResponse, HttpRequestTransformationStatus.SKIPPED, null, Duration.ofMillis(267)); - var streamConsumer = new ResultsToLogsConsumer(); + var streamConsumer = new ResultsToLogsConsumer(closeableLogSetup.testLogger, null); var consumer = new TupleParserChainConsumer(streamConsumer); consumer.accept(tuple); Assertions.assertEquals(1, closeableLogSetup.logEvents.size()); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index c79986c89..049c9b527 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -10,8 +10,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.internal.matchers.Same; import org.opensearch.migrations.replay.ClientConnectionPool; import org.opensearch.migrations.replay.PacketToTransformingHttpHandlerFactory; import org.opensearch.migrations.replay.ReplayEngine; diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java index 81ad5d8ef..7790f16f0 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java @@ -61,7 +61,7 @@ public void testTrafficCaptureSource() throws Exception { for (int i = 0; i < TEST_RECORD_COUNT; ) { Thread.sleep(getSleepAmountMsForProducerRun(i)); var nextChunkFuture = kafkaTrafficCaptureSource.readNextTrafficStreamChunk(rootContext::createReadChunkContext); - var recordsList = nextChunkFuture.get((2 + TEST_RECORD_COUNT) * PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS); + var recordsList = nextChunkFuture.get((2 * TEST_RECORD_COUNT) * PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS); for (int j = 0; j < recordsList.size(); ++j) { Assertions.assertEquals(KafkaTestUtils.getConnectionId(i + j), recordsList.get(j).getStream().getConnectionId()); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index e2b13fdca..521943f01 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -38,6 +38,8 @@ class KafkaTrafficCaptureSourceTest extends InstrumentationTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); + @Test public void testRecordToString() { var ts = TrafficStream.newBuilder() @@ -74,7 +76,7 @@ public void testSupplyTrafficFromSource() { // were missing traffic streams. Its task currently is limited to the numTrafficStreams where it will stop the stream var tsCount = new AtomicInteger(); - Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + Assertions.assertTimeoutPreemptively(TEST_TIMEOUT, () -> { while (tsCount.get() < numTrafficStreams) { protobufConsumer.readNextTrafficStreamChunk(rootContext::createReadChunkContext).get().stream() .forEach(streamWithKey -> { @@ -125,7 +127,7 @@ public void testSupplyTrafficWithUnformattedMessages() { // were missing traffic streams. Its task currently is limited to the numTrafficStreams where it will stop the stream var tsCount = new AtomicInteger(); - Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + Assertions.assertTimeoutPreemptively(TEST_TIMEOUT, () -> { while (tsCount.get() < numTrafficStreams) { protobufConsumer.readNextTrafficStreamChunk(rootContext::createReadChunkContext).get().stream() .forEach(streamWithKey->{