From fdcc6d3b3091535c3f798fe94838c33823604736 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 22 Mar 2024 08:38:30 -0400 Subject: [PATCH 1/3] When closing connections in the ClientConnectionPool, the right key was never used to grab the connection from the cache to close it down. I'll add tighter tests in a subsequent commit. Signed-off-by: Greg Schohn --- .../migrations/replay/ClientConnectionPool.java | 8 ++++---- .../migrations/replay/RequestSenderOrchestrator.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index 09390bd9d..b67c9188b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -132,12 +132,12 @@ public DiagnosticTrackableCompletableFuture closeConnectionsAndShu () -> "Final shutdown for " + this.getClass().getSimpleName()); } - public void closeConnection(IReplayContexts.IChannelKeyContext ctx) { + public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) { var connId = ctx.getConnectionId(); log.atInfo().setMessage(() -> "closing connection for " + connId).log(); - var channelsFuture = connectionId2ChannelCache.getIfPresent(connId); - if (channelsFuture != null) { - closeClientConnectionChannel(channelsFuture); + var connectionReplaySession = connectionId2ChannelCache.getIfPresent(getKey(connId, sessionNumber)); + if (connectionReplaySession != null) { + closeClientConnectionChannel(connectionReplaySession); connectionId2ChannelCache.invalidate(connId); } else { log.atTrace().setMessage(()->"No ChannelFuture for " + ctx + diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java index 08dc5c845..3e397b3dd 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java @@ -100,7 +100,7 @@ public StringTrackableCompletableFuture scheduleClose(IReplayContexts.ICha channelFutureAndRequestSchedule, finalTunneledResponse, timestamp, new ChannelTask(ChannelTaskType.CLOSE, () -> { log.trace("Closing client connection " + channelInteraction); - clientConnectionPool.closeConnection(ctx); + clientConnectionPool.closeConnection(ctx, sessionNumber); finalTunneledResponse.future.complete(null); }))); return finalTunneledResponse; From 0666b69fdb927e4a4cd9d5a8a507e0fadf254faa Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 22 Mar 2024 13:47:57 -0400 Subject: [PATCH 2/3] Fix up when connections are forcibly closed so that unit tests can check BEFORE shutdown cleanup and make sure that ongoing cleanup is working as expected. I've also turned the timeout for the FullReplayerWithTracingChecks so that the local http server won't timeout immediately, forcing connections to close. I want to make sure that the close comes from the client side. That change also makes the goofy numTraces loop that was in the test unnecessary. In retrospect, that loop was masking an underlying issue that the tcpConnections weren't being properly closed at the right times. Signed-off-by: Greg Schohn --- .../replay/ClientConnectionPool.java | 40 ++++------------ .../migrations/replay/ReplayEngine.java | 4 -- .../migrations/replay/TrafficReplayer.java | 31 +++++------- .../replay/HttpByteBufFormatterTest.java | 4 -- .../NettyPacketToHttpConsumerTest.java | 9 ++-- .../FullReplayerWithTracingChecksTest.java | 47 ++++++------------- 6 files changed, 39 insertions(+), 96 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index b67c9188b..0672e02e5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -97,41 +97,19 @@ public ConnectionReplaySession buildConnectionReplaySession(final IReplayContext return clientConnectionChannelCreatedFuture; } - public Future shutdownNow() { + public CompletableFuture shutdownNow() { + CompletableFuture shutdownFuture = new CompletableFuture<>(); connectionId2ChannelCache.invalidateAll(); - return eventLoopGroup.shutdownGracefully(); - } - - public DiagnosticTrackableCompletableFuture closeConnectionsAndShutdown() { - StringTrackableCompletableFuture eventLoopFuture = - new StringTrackableCompletableFuture<>(new CompletableFuture<>(), () -> "all channels closed"); - this.eventLoopGroup.submit(() -> { - try { - var channelClosedFuturesArray = - connectionId2ChannelCache.asMap().values().stream() - .map(this::closeClientConnectionChannel) - .collect(Collectors.toList()); - StringTrackableCompletableFuture.allOf(channelClosedFuturesArray.stream(), - () -> "all channels closed") - .handle((v, t) -> { - if (t == null) { - eventLoopFuture.future.complete(v); - } else { - eventLoopFuture.future.completeExceptionally(t); - } - return null; - }, - () -> "Waiting for all channels to close: Remaining=" + - (channelClosedFuturesArray.stream().filter(c -> !c.isDone()).count())); - } catch (Exception e) { - log.atError().setCause(e).setMessage("Caught error while closing cached connections").log(); - eventLoopFuture.future.completeExceptionally(e); + eventLoopGroup.shutdownGracefully().addListener(f->{ + if (f.isSuccess()) { + shutdownFuture.complete(null); + } else { + shutdownFuture.completeExceptionally(f.cause()); } }); - return eventLoopFuture.map(f -> f.whenComplete((c, t) -> shutdownNow()), - () -> "Final shutdown for " + this.getClass().getSimpleName()); + return shutdownFuture; } - + public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) { var connId = ctx.getConnectionId(); log.atInfo().setMessage(() -> "closing connection for " + connId).log(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java index e98149224..779f894be 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ReplayEngine.java @@ -165,10 +165,6 @@ private static void logStartOfWork(Object stringableKey, long newCount, Instant return hookWorkFinishingUpdates(future, timestamp, channelKey, label); } - public DiagnosticTrackableCompletableFuture closeConnectionsAndShutdown() { - return networkSendOrchestrator.clientConnectionPool.closeConnectionsAndShutdown(); - } - public void setFirstTimestamp(Instant firstPacketTimestamp) { timeShifter.setFirstTimestamp(firstPacketTimestamp); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 6e35a348e..1e00c1d96 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -96,7 +96,6 @@ public class TrafficReplayer implements AutoCloseable { private final AtomicReference shutdownReasonRef; private final AtomicReference> shutdownFutureRef; private final AtomicReference>> nextChunkFutureRef; - private Future nettyShutdownFuture; public static class DualException extends Exception { public final Throwable originalCause; @@ -502,10 +501,10 @@ public void close() { } } - void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout, - BlockingTrafficSource trafficSource, - TimeShifter timeShifter, - Consumer resultTupleConsumer) + public void setupRunAndWaitForReplayToFinish(Duration observedPacketConnectionTimeout, + BlockingTrafficSource trafficSource, + TimeShifter timeShifter, + Consumer resultTupleConsumer) throws InterruptedException, ExecutionException { var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool); @@ -578,7 +577,7 @@ public void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketCo Consumer resultTupleConsumer) throws TerminationException, ExecutionException, InterruptedException { try { - setupRunAndWaitForReplay(observedPacketConnectionTimeout, trafficSource, + setupRunAndWaitForReplayToFinish(observedPacketConnectionTimeout, trafficSource, timeShifter, resultTupleConsumer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -591,7 +590,6 @@ public void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketCo } // if nobody has run shutdown yet, do so now so that we can tear down the netty resources shutdown(null).get(); // if somebody already HAD run shutdown, it will return the future already created - nettyShutdownFuture.sync(); } @AllArgsConstructor @@ -794,14 +792,6 @@ private void waitForRemainingWork(Level logLevel, } finally { allRemainingWorkFutureOrShutdownSignalRef.set(null); } - allWorkFuture.getDeferredFutureThroughHandle((t, v) -> { - log.info("stopping packetHandlerFactory's group"); - replayEngine.closeConnectionsAndShutdown(); - // squash exceptions for individual requests - return StringTrackableCompletableFuture.completedFuture(null, () -> "finished all work"); - }, () -> "TrafficReplayer.PacketHandlerFactory->stopGroup") - .get(); // allWorkFuture already completed - here we're just going to wait for the - // rest of the cleanup to finish, as per the name of the function } private void handleAlreadySetFinishedSignal() throws InterruptedException, ExecutionException { @@ -964,12 +954,13 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture{ - if (f.isSuccess()) { - shutdownFutureRef.get().complete(null); + + var nettyShutdownFuture = clientConnectionPool.shutdownNow(); + nettyShutdownFuture.whenComplete((v,t) -> { + if (t != null) { + shutdownFutureRef.get().completeExceptionally(t); } else { - shutdownFutureRef.get().completeExceptionally(f.cause()); + shutdownFutureRef.get().complete(null); } }); Optional.ofNullable(this.nextChunkFutureRef.get()).ifPresent(f->f.cancel(true)); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index 601b8e470..c2c4e550f 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -30,7 +30,6 @@ public static void setup() { "Connection: Keep-Alive\r\n" + "Host: localhost\r\n" + "User-Agent: UnitTest\r\n" + - "Connection: Keep-Alive\r\n" + "\r\n"; final static String SAMPLE_REQUEST_AS_BLOCKS = "[G],[E],[T],[ ],[/],[ ],[H],[T],[T],[P],[/],[1],[.],[1],[\r" + @@ -41,8 +40,6 @@ public static void setup() { "],[\n" + "],[U],[s],[e],[r],[-],[A],[g],[e],[n],[t],[:],[ ],[U],[n],[i],[t],[T],[e],[s],[t],[\r" + "],[\n" + - "],[C],[o],[n],[n],[e],[c],[t],[i],[o],[n],[:],[ ],[K],[e],[e],[p],[-],[A],[l],[i],[v],[e],[\r" + - "],[\n" + "],[\r" + "],[\n" + "]"; @@ -50,7 +47,6 @@ public static void setup() { "Connection: Keep-Alive\n" + "Host: localhost\n" + "User-Agent: UnitTest\n" + - "Connection: Keep-Alive\n" + "content-length: 0\n" + "\n"; diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 9b09a82e4..dc7407dfe 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -164,10 +164,9 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls) new TransformationLoader().getTransformerFactoryLoader(null), null); var timeShifter = new TimeShifter(); timeShifter.setFirstTimestamp(Instant.now()); - var sendingFactory = new ReplayEngine( - new RequestSenderOrchestrator( - new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, - "targetPool for testThatConnectionsAreKeptAliveAndShared", 1)), + var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, + "targetPool for testThatConnectionsAreKeptAliveAndShared", 1); + var sendingFactory = new ReplayEngine(new RequestSenderOrchestrator(clientConnectionPool), new TestFlowController(), timeShifter); for (int j = 0; j < 2; ++j) { for (int i = 0; i < 2; ++i) { @@ -187,7 +186,7 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls) normalizeMessage(responseAsString)); } } - var stopFuture = sendingFactory.closeConnectionsAndShutdown(); + var stopFuture = clientConnectionPool.shutdownNow(); log.info("waiting for factory to shutdown: " + stopFuture); stopFuture.get(); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java index c91fb3134..c763482e7 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java @@ -49,11 +49,11 @@ protected TestContext makeInstrumentationContext() { } @ParameterizedTest - @ValueSource(ints = {1,2}) + @ValueSource(ints = {1, 2}) @ResourceLock("TrafficReplayerRunner") public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) throws Throwable { var random = new Random(1); - try (var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), + try (var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMinutes(10), response->TestHttpServerContext.makeResponse(random, response))) { var baseTime = Instant.now(); var fixedTimestamp = @@ -63,19 +63,21 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro tsb = tsb .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) .setRead(ReadObservation.newBuilder() - .setData(ByteString.copyFrom(("GET /" + i + " HTTP/1.0\r\n") + .setData(ByteString.copyFrom(("GET /" + i + " HTTP/1.1\r\n" + + "Connection: Keep-Alive\r\n" + + "Host: localhost\r\n") .getBytes(StandardCharsets.UTF_8))) .build()) .build()) .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) .setEndOfMessageIndicator(EndOfMessageIndication.newBuilder() .setFirstLineByteLength(14) - .setHeadersByteLength(14) + .setHeadersByteLength(58) .build()) .build()) .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) .setWrite(WriteObservation.newBuilder() - .setData(ByteString.copyFrom("HTTP/1.0 OK 200\r\n".getBytes(StandardCharsets.UTF_8))) + .setData(ByteString.copyFrom("HTTP/1.1 OK 200\r\n".getBytes(StandardCharsets.UTF_8))) .build()) .build()); } @@ -92,16 +94,18 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro true, 10, 10 * 1024, "targetConnectionPool for testStreamWithRequestsWithCloseIsCommittedOnce"); var blockingTrafficSource = new BlockingTrafficSource(trafficSource, Duration.ofMinutes(2))) { - tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(70), blockingTrafficSource, - new TimeShifter(10 * 1000), (t) -> { + tr.setupRunAndWaitForReplayToFinish(Duration.ofSeconds(70), blockingTrafficSource, + new TimeShifter(10 * 1000), + t -> { var wasNew = tuplesReceived.add(t.getRequestKey().toString()); Assertions.assertTrue(wasNew); }); + Assertions.assertEquals(numRequests, tuplesReceived.size()); + Thread.sleep(1000); + checkSpansForSimpleReplayedTransactions(rootContext.inMemoryInstrumentationBundle, numRequests); + tr.shutdown(null).get(); } - Assertions.assertEquals(numRequests, tuplesReceived.size()); } - checkSpansForSimpleReplayedTransactions(rootContext.inMemoryInstrumentationBundle, numRequests); - log.info("done"); } private static class TraceProcessor { @@ -128,31 +132,10 @@ public String getRemainingItemsString() { private void checkSpansForSimpleReplayedTransactions(InMemoryInstrumentationBundle inMemoryBundle, int numRequests) { var traceProcessor = new TraceProcessor(inMemoryBundle.getFinishedSpans()); - for (int numTries=1; ; ++numTries) { - final String TCP_CONNECTION_SCOPE_NAME = "tcpConnection"; - var numTraces = traceProcessor.getCountAndRemoveSpan(TCP_CONNECTION_SCOPE_NAME); - switch (numTraces) { - case 1: - break; - case -1: - traceProcessor = new TraceProcessor(inMemoryBundle.getFinishedSpans()); - if (numTries > 5) { - Assertions.fail("Even after waiting/polling, no " + TCP_CONNECTION_SCOPE_NAME + " was found."); - } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw Lombok.sneakyThrow(e); - } - continue; - default: - Assertions.fail("Found " + numTraces + " for " + TCP_CONNECTION_SCOPE_NAME); - } - break; - } Assertions.assertEquals(1, traceProcessor.getCountAndRemoveSpan("channel")); Assertions.assertEquals(1, traceProcessor.getCountAndRemoveSpan("trafficStreamLifetime")); + Assertions.assertEquals(1, traceProcessor.getCountAndRemoveSpan("tcpConnection")); Assertions.assertEquals(numRequests, traceProcessor.getCountAndRemoveSpan("httpTransaction")); Assertions.assertEquals(numRequests, traceProcessor.getCountAndRemoveSpan("accumulatingRequest")); Assertions.assertEquals(numRequests, traceProcessor.getCountAndRemoveSpan("accumulatingResponse")); From be597d73b075554ef6cefbace0ac3ee7ba7e7b3b Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 22 Mar 2024 14:10:18 -0400 Subject: [PATCH 3/3] PR Feedback - elevate the loglevel of a message for what might be an error case given the way that the system is implemented. This will let us keep an eye on it better. Signed-off-by: Greg Schohn --- .../org/opensearch/migrations/replay/ClientConnectionPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index 0672e02e5..bcd49257a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -118,7 +118,7 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN closeClientConnectionChannel(connectionReplaySession); connectionId2ChannelCache.invalidate(connId); } else { - log.atTrace().setMessage(()->"No ChannelFuture for " + ctx + + log.atInfo().setMessage(()->"No ChannelFuture for " + ctx + " in closeConnection. The connection may have already been closed").log(); } }