From e35ca8cd3e024c618c0c18ccb22025f09c91b456 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Sun, 28 Apr 2024 17:21:43 -0400 Subject: [PATCH] Convert remaining netty Future 'addListener' calls to use NettyToCompletableFutureBinders + other assorted tweaks. I've also stripped the longRunningActivity file of the log levels and timestamps. They were really just getting in the way of understanding what was going on and being able to quickly copy-paste from there into a json file for further analysis. Signed-off-by: Greg Schohn --- .../NettyToCompletableFutureBinders.java | 5 +- .../replay/ClientConnectionPool.java | 90 +++------- .../replay/RequestSenderOrchestrator.java | 4 +- .../migrations/replay/TrafficReplayer.java | 4 +- .../NettyPacketToHttpConsumer.java | 166 +++++++----------- .../datatypes/ConnectionReplaySession.java | 11 +- .../TimeToResponseFulfillmentFutureMap.java | 11 +- .../replay/util/OnlineRadixSorter.java | 16 +- .../src/main/resources/log4j2.properties | 2 +- .../NettyPacketToHttpConsumerTest.java | 7 +- .../e2etests/FullTrafficReplayerTest.java | 6 +- 11 files changed, 120 insertions(+), 202 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java index 155de099a..7a945f0f9 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java @@ -12,6 +12,9 @@ import java.util.function.Supplier; public class NettyToCompletableFutureBinders { + + private NettyToCompletableFutureBinders() {} + public static CompletableFuture bindNettyFutureToCompletableFuture(Future nettyFuture, CompletableFuture cf) { nettyFuture.addListener(f -> { @@ -54,7 +57,7 @@ public class NettyToCompletableFutureBinders { bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay) { var delayMs = Math.max(0, delay.toMillis()); return bindNettyFutureToTrackableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS), - "scheduling to run next send at " + delay + " in " + delayMs +" ms (clipped)"); + "scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)"); } public static CompletableFuture diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index d3f30a550..783712600 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -8,25 +8,21 @@ import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContext; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.Future; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.NonNull; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.NettyToCompletableFutureBinders; import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer; import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession; import org.opensearch.migrations.replay.tracing.IReplayContexts; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; -import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; @Slf4j public class ClientConnectionPool { @@ -81,41 +77,24 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha private DiagnosticTrackableCompletableFuture getResilientClientChannelProducer(EventLoop eventLoop, IReplayContexts.IChannelKeyContext connectionContext) { return new AdaptiveRateLimiter() - .get(() -> { - var channelFuture = NettyPacketToHttpConsumer.createClientConnection(eventLoop, - sslContext, serverUri, connectionContext, timeout); - return getCompletedChannelFutureAsCompletableFuture(connectionContext, channelFuture); - }); - } - - public static StringTrackableCompletableFuture - getCompletedChannelFutureAsCompletableFuture(IReplayContexts.IChannelKeyContext connectionContext, - ChannelFuture channelFuture) { - var clientConnectionChannelCreatedFuture = - new StringTrackableCompletableFuture("waiting for createClientConnection to finish"); - channelFuture.addListener(f -> { - log.atInfo().setMessage(()-> - "New network connection result for " + connectionContext + "=" + f.isSuccess()).log(); - if (f.isSuccess()) { - clientConnectionChannelCreatedFuture.future.complete(channelFuture); - } else { - clientConnectionChannelCreatedFuture.future.completeExceptionally(f.cause()); - } - }); - return clientConnectionChannelCreatedFuture; + .get(() -> + NettyPacketToHttpConsumer.createClientConnection(eventLoop, + sslContext, serverUri, connectionContext, timeout) + .whenComplete((v,t)-> { + if (t == null) { + log.atDebug().setMessage(() -> "New network connection result for " + + connectionContext + " =" + v).log(); + } else { + log.atInfo().setMessage(() -> "got exception for " + connectionContext) + .setCause(t).log(); + } + }, () -> "waiting for createClientConnection to finish")); } public CompletableFuture shutdownNow() { CompletableFuture shutdownFuture = new CompletableFuture<>(); connectionId2ChannelCache.invalidateAll(); - eventLoopGroup.shutdownGracefully().addListener(f->{ - if (f.isSuccess()) { - shutdownFuture.complete(null); - } else { - shutdownFuture.completeExceptionally(f.cause()); - } - }); - return shutdownFuture; + return NettyToCompletableFutureBinders.bindNettyFutureToCompletableFuture(eventLoopGroup.shutdownGracefully()); } public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) { @@ -143,25 +122,16 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN private DiagnosticTrackableCompletableFuture closeClientConnectionChannel(ConnectionReplaySession channelAndFutureWork) { - var channelClosedFuture = - new StringTrackableCompletableFuture("Waiting for closeFuture() on channel"); - - channelAndFutureWork.getFutureThatReturnsChannelFuture(false) - .thenAccept(channelFuture-> { - if (channelFuture == null) { - return; - } + return channelAndFutureWork.getFutureThatReturnsChannelFutureInAnyState(false) + .thenCompose(channelFuture-> { log.atTrace().setMessage(() -> "closing channel " + channelFuture.channel() + "(" + channelAndFutureWork.getChannelKeyContext() + ")...").log(); - channelFuture.channel().close() - .addListener(closeFuture -> { + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture( + channelFuture.channel().close(), + "calling channel.close()") + .thenApply(v -> { log.atTrace().setMessage(() -> "channel.close() has finished for " + - channelAndFutureWork.getChannelKeyContext()).log(); - if (closeFuture.isSuccess()) { - channelClosedFuture.future.complete(channelFuture.channel()); - } else { - channelClosedFuture.future.completeExceptionally(closeFuture.cause()); - } + channelAndFutureWork.getChannelKeyContext() + " with value=" + v).log(); if (channelAndFutureWork.hasWorkRemaining()) { log.atWarn().setMessage(() -> "Work items are still remaining for this connection session" + @@ -170,19 +140,9 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN "). " + channelAndFutureWork.calculateSizeSlowly() + " requests that were enqueued won't be run").log(); } - var schedule = channelAndFutureWork.schedule; - while (channelAndFutureWork.schedule.hasPendingTransmissions()) { - var scheduledItemToKill = schedule.peekFirstItem(); - schedule.removeFirstItem(); - } - }); - }, () -> "calling channel.close()") - .exceptionally(t->{ - log.atWarn().setMessage(()->"client connection encountered an exception while closing") - .setCause(t).log(); - channelClosedFuture.future.completeExceptionally(t); - return null; - }, () -> "handling any potential exceptions"); - return channelClosedFuture; + channelAndFutureWork.schedule.clear(); + return channelFuture.channel(); + }, () -> "clearing work"); + }, () -> ""); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java index 094e36e1d..6709b863c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java @@ -244,8 +244,8 @@ private Duration getDelayFromNowMs(Instant to) { return consumeFuture.thenCompose(cf -> NettyToCompletableFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, Duration.between(now(), startAt.plus(interval.multipliedBy(counter.get())))) - .getDeferredFutureThroughHandle((v,t)-> sendSendingRestOfPackets(packetReceiver, eventLoop, - iterator, startAt, interval, counter), () -> "sending next packet"), + .thenCompose(v -> sendSendingRestOfPackets(packetReceiver, eventLoop, iterator, + startAt, interval, counter), () -> "sending next packet"), () -> "recursing, once ready"); } else { return consumeFuture.getDeferredFutureThroughHandle((v,t) -> packetReceiver.finalizeRequest(), diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index de4cbc52f..e149cd847 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -32,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; @@ -316,7 +317,8 @@ public static void main(String[] args) throws Exception { cf-> DiagnosticTrackableCompletableFutureJsonFormatter.format(cf, TrafficReplayerTopLevel::formatWorkItem), activeContextLogger); ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor; scheduledExecutorService.scheduleAtFixedRate(()->{ - activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log(); + activeContextLogger.atInfo().setMessage(()->"Total requests outstanding at " + Instant.now() + + ": " + tr.requestWorkTracker.size()).log(); finalActiveContextMonitor.run(); }, ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index 17aa3db3f..e64aac458 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -3,7 +3,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -19,8 +18,10 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; +import lombok.Lombok; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.NettyToCompletableFutureBinders; import org.opensearch.migrations.replay.AggregatedRawResponse; import org.opensearch.migrations.replay.datahandlers.http.helpers.ReadMeteringHandler; import org.opensearch.migrations.replay.datahandlers.http.helpers.WriteMeteringHandler; @@ -41,7 +42,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; @Slf4j public class NettyPacketToHttpConsumer implements IPacketFinalizingConsumer { @@ -89,43 +89,40 @@ public NettyPacketToHttpConsumer(ConnectionReplaySession replaySession, var parentContext = ctx.createTargetRequestContext(); this.setCurrentMessageContext(parentContext.createHttpSendingContext()); responseBuilder = AggregatedRawResponse.builder(Instant.now()); - this.activeChannelFuture = new StringTrackableCompletableFuture<>( - () -> "incoming connection is ready for " + replaySession); - var initialFuture = this.activeChannelFuture; - - log.atDebug().setMessage(() -> - "C'tor: incoming session=" + replaySession).log(); - activateLiveChannel(initialFuture); + log.atDebug().setMessage(() -> "C'tor: incoming session=" + replaySession).log(); + this.activeChannelFuture = activateLiveChannel(); } - private void activateLiveChannel(DiagnosticTrackableCompletableFuture initialFuture) { - replaySession.getFutureThatReturnsChannelFuture(true).thenAccept(channelFuture-> { - channelFuture.addListener(connectFuture -> { - final var ctx = replaySession.getChannelKeyContext(); - if (connectFuture.isSuccess()) { - final var c = channelFuture.channel(); - if (c.isActive()) { - this.channel = c; - initializeChannelPipeline(); - log.atDebug().setMessage(()->"Channel initialized for " + ctx + " signaling future").log(); - initialFuture.future.complete(null); - } else { - // this may loop forever - until the event loop is shutdown - // (see the ClientConnectionPool::shutdownNow()) - ctx.addFailedChannelCreation(); - log.atWarn().setMessage(()->"Channel wasn't active, trying to create another for this request") - .log(); - activateLiveChannel(initialFuture); - } - } else { - ctx.addFailedChannelCreation(); - ctx.addTraceException(channelFuture.cause(), true); - log.atWarn().setMessage(()->"error creating channel, not retrying") - .setCause(connectFuture.cause()).log(); - initialFuture.future.completeExceptionally(connectFuture.cause()); - } - }); - }, () -> "creating an alive connection"); + private DiagnosticTrackableCompletableFuture activateLiveChannel() { + final var ctx = replaySession.getChannelKeyContext(); + return replaySession.getFutureThatReturnsChannelFutureInAnyState(true) + .thenCompose(channelFuture -> NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(channelFuture, + "waiting for newly acquired channel to be ready") + .getDeferredFutureThroughHandle((connectFuture,t)->{ + if (t != null) { + ctx.addFailedChannelCreation(); + ctx.addTraceException(channelFuture.cause(), true); + log.atWarn().setMessage(()->"error creating channel, not retrying") + .setCause(t).log(); + throw Lombok.sneakyThrow(t); + } + + final var c = channelFuture.channel(); + if (c.isActive()) { + this.channel = c; + initializeChannelPipeline(); + log.atDebug().setMessage(()->"Channel initialized for " + ctx + " signaling future").log(); + return StringTrackableCompletableFuture.completedFuture(null, ()->"Done"); + } else { + // this may recurse forever - until the event loop is shutdown + // (see the ClientConnectionPool::shutdownNow()) + ctx.addFailedChannelCreation(); + log.atWarn().setMessage(()->"Channel wasn't active, trying to create another for this request") + .log(); + return activateLiveChannel(); + } + }, () -> "acting on ready channelFuture to retry if inactive or to return"), + () -> "taking newly acquired channel and making it active"); } private & @@ -142,11 +139,12 @@ public IReplayContexts.ITargetRequestContext getParentContext() { return currentRequestContextUnion.getLogicalEnclosingScope(); } - public static ChannelFuture createClientConnection(EventLoopGroup eventLoopGroup, - SslContext sslContext, - URI serverUri, - IReplayContexts.IChannelKeyContext channelKeyContext, - Duration timeout) { + public static DiagnosticTrackableCompletableFuture + createClientConnection(EventLoopGroup eventLoopGroup, + SslContext sslContext, + URI serverUri, + IReplayContexts.IChannelKeyContext channelKeyContext, + Duration timeout) { String host = serverUri.getHost(); int port = serverUri.getPort(); log.atTrace().setMessage(()->"Active - setting up backend connection to " + host + ":" + port).log(); @@ -165,38 +163,29 @@ protected void initChannel(@NonNull Channel ch) throws Exception { var outboundChannelFuture = b.connect(host, port); - var rval = new DefaultChannelPromise(outboundChannelFuture.channel()); - outboundChannelFuture.addListener((ChannelFutureListener) connectFuture -> { - if (connectFuture.isSuccess()) { - final var channel = connectFuture.channel(); - log.atTrace().setMessage(()-> channelKeyContext.getChannelKey() + - " Done setting up client channel & it was successful for " + channel).log(); - var pipeline = channel.pipeline(); - if (sslContext != null) { - var sslEngine = sslContext.newEngine(channel.alloc()); - sslEngine.setUseClientMode(true); - var sslHandler = new SslHandler(sslEngine); - addLoggingHandlerLast(pipeline, "A"); - pipeline.addLast(SSL_HANDLER_NAME, sslHandler); - sslHandler.handshakeFuture().addListener(handshakeFuture -> { - if (handshakeFuture.isSuccess()) { - rval.setSuccess(); + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(outboundChannelFuture, "") + .thenCompose(voidVal-> { + if (outboundChannelFuture.isSuccess()) { + final var channel = outboundChannelFuture.channel(); + log.atTrace().setMessage(() -> channelKeyContext.getChannelKey() + + " Done setting up client channel & it was successful for " + channel).log(); + var pipeline = channel.pipeline(); + if (sslContext != null) { + var sslEngine = sslContext.newEngine(channel.alloc()); + sslEngine.setUseClientMode(true); + var sslHandler = new SslHandler(sslEngine); + addLoggingHandlerLast(pipeline, "A"); + pipeline.addLast(SSL_HANDLER_NAME, sslHandler); + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(sslHandler.handshakeFuture(), + ()->"") + .thenApply(voidVal2->outboundChannelFuture, ()->""); } else { - rval.setFailure(handshakeFuture.cause()); + return StringTrackableCompletableFuture.completedFuture(outboundChannelFuture, ()->""); } - }); - } else { - rval.setSuccess(); - } - } else { - // Close the connection if the connection attempt has failed. - log.atWarn().setCause(connectFuture.cause()) - .setMessage(() -> channelKeyContext.getChannelKey() + " CONNECT future was not successful, " + - "so setting the channel future's result to an exception").log(); - rval.setFailure(connectFuture.cause()); - } - }); - return rval; + } else { + return StringTrackableCompletableFuture.failedFuture(outboundChannelFuture.cause(), ()->""); + } + }, () -> ""); } private static boolean channelIsInUse(Channel c) { @@ -309,37 +298,8 @@ private IReplayContexts.IReplayerHttpTransactionContext httpContext() { private DiagnosticTrackableCompletableFuture writePacketAndUpdateFuture(ByteBuf packetData) { - final var completableFuture = new DiagnosticTrackableCompletableFuture(new CompletableFuture<>(), - ()->"CompletableFuture that will wait for the netty future to fill in the completion value"); - channel.writeAndFlush(packetData) - .addListener((ChannelFutureListener) future -> { - Throwable cause = null; - try { - if (!future.isSuccess()) { - log.atWarn().setMessage(()-> httpContext().getReplayerRequestKey() + "closing outbound channel " + - "because WRITE future was not successful " + future.cause() + " hash=" + - System.identityHashCode(packetData) + " will be sending the exception to " + - completableFuture).log(); - future.channel().close(); // close the backside - cause = future.cause(); - } - } catch (Exception e) { - cause = e; - } - if (cause == null) { - log.atTrace().setMessage(()->"Previously returned CompletableFuture packet write was " + - "successful: " + packetData + " hash=" + System.identityHashCode(packetData)).log(); - completableFuture.future.complete(null); - } else { - log.atInfo().setMessage(()->"Previously returned CompletableFuture packet write had " + - " an exception :" + packetData + " hash=" + System.identityHashCode(packetData)).log(); - completableFuture.future.completeExceptionally(cause); - channel.close(); - } - }); - log.atTrace().setMessage(()->"Writing packet data=" + packetData + - ". Created future for writing data="+completableFuture).log(); - return completableFuture; + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(channel.writeAndFlush(packetData), + "CompletableFuture that will wait for the netty future to fill in the completion value"); } @Override diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java index a1d1f0df7..4fe33d8ab 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java @@ -2,7 +2,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; @@ -13,13 +12,7 @@ import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture; import java.io.IOException; -import java.util.Comparator; -import java.util.Map; -import java.util.Optional; -import java.util.PriorityQueue; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import java.util.function.UnaryOperator; /** * This class contains everything that is needed to replay packets to a specific channel. @@ -61,7 +54,7 @@ public ConnectionReplaySession(EventLoop eventLoop, IReplayContexts.IChannelKeyC } public DiagnosticTrackableCompletableFuture - getFutureThatReturnsChannelFuture(boolean requireActiveChannel) { + getFutureThatReturnsChannelFutureInAnyState(boolean requireActiveChannel) { StringTrackableCompletableFuture eventLoopFuture = new StringTrackableCompletableFuture<>("procuring a connection"); eventLoop.submit(() -> { @@ -113,6 +106,6 @@ public boolean hasWorkRemaining() { } public long calculateSizeSlowly() { - return schedule.calculateSizeSlowly() + scheduleSequencer.size(); + return (long) schedule.timeToRunnableMap.size() + scheduleSequencer.size(); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java index 255952d86..da1fb9bf3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java @@ -10,6 +10,7 @@ public class TimeToResponseFulfillmentFutureMap { + public static class FutureWorkPoint { public final Instant startTime; public final DiagnosticTrackableCompletableFuture scheduleFuture; @@ -42,6 +43,10 @@ public boolean isEmpty() { return timeToRunnableMap.isEmpty(); } + public void clear() { + timeToRunnableMap.clear(); + } + public boolean hasPendingTransmissions() { if (timeToRunnableMap.isEmpty()) { return false; @@ -50,13 +55,9 @@ public boolean hasPendingTransmissions() { } } - public long calculateSizeSlowly() { - return timeToRunnableMap.size(); - } - @Override public String toString() { - return "[" + this.calculateSizeSlowly() + "]: {" + formatBookends() + "}"; + return "[" + (long) timeToRunnableMap.size() + "]: {" + formatBookends() + "}"; } private String formatBookends() { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java index 7979e95a0..e96e8a158 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java @@ -72,13 +72,13 @@ public OnlineRadixSorter(int startingOffset) { */ public DiagnosticTrackableCompletableFuture addFutureForWork(final int index, FutureTransformer processor) { - var oldWorkItem = items.get(index); - if (oldWorkItem == null) { + var workItem = items.get(index); + if (workItem == null) { if (index < currentOffset) { throw new IllegalArgumentException("index (" + index + ")" + " must be > last processed item (" + currentOffset + ")"); } - for (int nextKey = Math.max(currentOffset, items.isEmpty() ? 0 : items.lastKey()+1); + for (int nextKey = Math.max(currentOffset, items.isEmpty() ? Integer.MIN_VALUE : items.lastKey()+1); nextKey<=index; ++nextKey) { int finalNextKey = nextKey; @@ -86,20 +86,20 @@ public OnlineRadixSorter(int startingOffset) { new StringTrackableCompletableFuture( CompletableFuture.completedFuture(null), "unlinked signaling future for slot #" + finalNextKey) : - items.get(items.lastKey()).signalWorkCompletedFuture + items.get(finalNextKey-1).signalWorkCompletedFuture .thenAccept(v-> {}, ()->"Kickoff for slot #" + finalNextKey); - oldWorkItem = new IndexedWork(signalFuture, null, + workItem = new IndexedWork(signalFuture, null, new StringTrackableCompletableFuture(()->"Work to finish for slot #" + finalNextKey + " is awaiting [" + getAwaitingText() + "]")); - oldWorkItem.signalWorkCompletedFuture.whenComplete((v,t)->{ + workItem.signalWorkCompletedFuture.whenComplete((v,t)->{ ++currentOffset; items.remove(finalNextKey); }, ()->"cleaning up spent work for idx #" + finalNextKey); - items.put(nextKey, oldWorkItem); + items.put(nextKey, workItem); } } - return oldWorkItem.addWorkFuture(processor, index); + return workItem.addWorkFuture(processor, index); } public String getAwaitingText() { diff --git a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties index 36a75d032..1fb20bd93 100644 --- a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties +++ b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties @@ -63,7 +63,7 @@ appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.name = AllActiveWorkMonitorFile appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.fileName = logs/longRunningActivity.log appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.filePattern = logs/%d{yyyy-MM}{UTC}/longRunningActivity-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log.gz appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.layout.type = PatternLayout -appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.layout.pattern = %msg ([%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC})%n +appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.layout.pattern = %msg%n appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.policies.type = Policies appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.policies.time.type = TimeBasedTriggeringPolicy appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.policies.time.interval = 60 diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 8a01d9dae..5a44c3054 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.migrations.NettyToCompletableFutureBinders; import org.opensearch.migrations.replay.AggregatedRawResponse; import org.opensearch.migrations.replay.ClientConnectionPool; import org.opensearch.migrations.replay.PacketToTransformingHttpHandlerFactory; @@ -149,10 +150,8 @@ public void testHttpResponseIsSuccessfullyCaptured(boolean useTls, boolean large var channelContext = httpContext.getChannelKeyContext(); var eventLoop = new NioEventLoopGroup(1, new DefaultThreadFactory("test")).next(); var replaySession = new ConnectionReplaySession(eventLoop, channelContext, - () -> ClientConnectionPool.getCompletedChannelFutureAsCompletableFuture( - httpContext.getChannelKeyContext(), - NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, - testServer.localhostEndpoint(), channelContext, REGULAR_RESPONSE_TIMEOUT))); + () -> NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, + testServer.localhostEndpoint(), channelContext, REGULAR_RESPONSE_TIMEOUT)); var nphc = new NettyPacketToHttpConsumer(replaySession, httpContext); nphc.consumeBytes((EXPECTED_REQUEST_STRING).getBytes(StandardCharsets.UTF_8)); var aggregatedResponse = nphc.finalizeRequest().get(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java index d49e5ac9a..1020d9757 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java @@ -300,9 +300,9 @@ public void makeSureThatCollateralDamageDoesntFreezeTests() throws Throwable { @ParameterizedTest @CsvSource(value = { -// "3,false", -// "-1,false", -// "3,true", + "3,false", + "-1,false", + "3,true", "-1,true", }) @Tag("longTest")