From 2ae2959d5fb2486f1feae68f37182706f8105b5a Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 9 May 2024 00:32:16 -0500 Subject: [PATCH] Address PR Comments in NettyPacketToHttpConsumerTest Signed-off-by: Andre Kurait --- .../NettyPacketToHttpConsumerTest.java | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 4ca89cb44..5f106787b 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -196,7 +196,7 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout, Duration readTimeout, Duration resultWaitTimeout) throws Exception { ClientConnectionPool clientConnectionPool = null; try (var testServer = SimpleNettyHttpServer.makeServer(useTls, - withServerReadTimeout ? Duration.ofMillis(100) : null, + withServerReadTimeout ? readTimeout : null, NettyPacketToHttpConsumerTest::makeResponseContext)) { log.atError().setMessage("Got port " + testServer.port).log(); var sslContext = !useTls ? null : @@ -208,7 +208,7 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout, var reqCtx = rootContext.getTestConnectionRequestContext(1); var nphc = new NettyPacketToHttpConsumer(clientConnectionPool - .buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, REGULAR_RESPONSE_TIMEOUT); + .buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, readTimeout); // purposefully send ONLY the beginning of a request nphc.consumeBytes("GET ".getBytes(StandardCharsets.UTF_8)); if (resultWaitTimeout.minus(readTimeout).isNegative()) { @@ -318,9 +318,7 @@ public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean @ParameterizedTest @CsvSource({"false", "true"}) - @Tag("longTest") public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception { - var testDurationBuffer = Duration.ofSeconds(10); var responseTimeout = Duration.ofMillis(50); // Response shouldn't come back before everything else finishes var responseDuration = Duration.ofHours(1); @@ -337,7 +335,9 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception "targetPool for testReadTimeoutHandler_responseTakesLongerThanTimeout", 1); var timeShifter = new TimeShifter(); - timeShifter.setFirstTimestamp(Instant.now()); + var firstRequestTime = Instant.now(); + timeShifter.setFirstTimestamp(firstRequestTime); + log.atInfo().setMessage("Initial Timestamp: " + firstRequestTime).log(); var sendingFactory = new ReplayEngine( new RequestSenderOrchestrator(clientConnectionPool, @@ -348,8 +348,9 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(transformingHttpHandlerFactory, sendingFactory, ctx, Instant.now(), Instant.now(), () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); - // Make sure finalize is triggered shortly after the response timeout - var aggregatedResponse = requestFinishFuture.get(responseTimeout.plus(testDurationBuffer)); + var maxTimeToWaitForTimeoutOrResponse = Duration.ofSeconds(10); + var aggregatedResponse = requestFinishFuture.get(maxTimeToWaitForTimeoutOrResponse); + log.atInfo().setMessage("RequestFinishFuture finished").log(); Assertions.assertInstanceOf(ReadTimeoutException.class, aggregatedResponse.getError()); } } @@ -357,9 +358,10 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception @ParameterizedTest @CsvSource({"false", "true"}) public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) throws Exception { - var testDurationBuffer = Duration.ofSeconds(10); var responseTimeout = Duration.ofMillis(100); var timeBetweenRequests = responseTimeout.plus(Duration.ofMillis(10)); + log.atInfo().setMessage("Running testTimeBetweenRequestsLongerThanResponseTimeout with responseTimeout " + + responseTimeout + " and timeBetweenRequests" + timeBetweenRequests).log(); try (var testServer = SimpleNettyHttpServer.makeServer(useTls, NettyPacketToHttpConsumerTest::makeResponseContext)) { var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null : @@ -371,24 +373,32 @@ public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) thr "targetPool for testTimeBetweenRequestsLongerThanResponseTimeout", 1); var timeShifter = new TimeShifter(); - timeShifter.setFirstTimestamp(Instant.now()); + var firstRequestTime = Instant.now(); + timeShifter.setFirstTimestamp(firstRequestTime); + log.atInfo().setMessage("Initial Timestamp: " + firstRequestTime).log(); var sendingFactory = new ReplayEngine( new RequestSenderOrchestrator(clientConnectionPool, (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)), new TestFlowController(), timeShifter); - for (int i = 0; i < 2; ++i) { - if (i != 0) { - parkForAtLeast(timeBetweenRequests); - } + int i = 0; + while (true) { var ctx = rootContext.getTestConnectionRequestContext("TEST", i); + log.atInfo().setMessage("Starting transformAndSendRequest for request " + i).log(); var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest( transformingHttpHandlerFactory, sendingFactory, ctx, Instant.now(), Instant.now(), () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); - var aggregatedResponse = requestFinishFuture.get(testDurationBuffer); + var maxTimeToWaitForTimeoutOrResponse = Duration.ofSeconds(10); + var aggregatedResponse = requestFinishFuture.get(maxTimeToWaitForTimeoutOrResponse); + log.atInfo().setMessage("RequestFinishFuture finished for request " + i).log(); Assertions.assertNull(aggregatedResponse.getError()); var responseAsString = getResponsePacketsAsString(aggregatedResponse); Assertions.assertEquals(EXPECTED_RESPONSE_STRING, responseAsString); + if (i > 1) { + break; + } + parkForAtLeast(timeBetweenRequests); + i++; } } }