diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java index 155de099a..7a945f0f9 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyToCompletableFutureBinders.java @@ -12,6 +12,9 @@ import java.util.function.Supplier; public class NettyToCompletableFutureBinders { + + private NettyToCompletableFutureBinders() {} + public static CompletableFuture bindNettyFutureToCompletableFuture(Future nettyFuture, CompletableFuture cf) { nettyFuture.addListener(f -> { @@ -54,7 +57,7 @@ public class NettyToCompletableFutureBinders { bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay) { var delayMs = Math.max(0, delay.toMillis()); return bindNettyFutureToTrackableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS), - "scheduling to run next send at " + delay + " in " + delayMs +" ms (clipped)"); + "scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)"); } public static CompletableFuture diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index d3f30a550..783712600 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -8,25 +8,21 @@ import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContext; -import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.Future; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.NonNull; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.NettyToCompletableFutureBinders; import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer; import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession; import org.opensearch.migrations.replay.tracing.IReplayContexts; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; -import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; @Slf4j public class ClientConnectionPool { @@ -81,41 +77,24 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha private DiagnosticTrackableCompletableFuture getResilientClientChannelProducer(EventLoop eventLoop, IReplayContexts.IChannelKeyContext connectionContext) { return new AdaptiveRateLimiter() - .get(() -> { - var channelFuture = NettyPacketToHttpConsumer.createClientConnection(eventLoop, - sslContext, serverUri, connectionContext, timeout); - return getCompletedChannelFutureAsCompletableFuture(connectionContext, channelFuture); - }); - } - - public static StringTrackableCompletableFuture - getCompletedChannelFutureAsCompletableFuture(IReplayContexts.IChannelKeyContext connectionContext, - ChannelFuture channelFuture) { - var clientConnectionChannelCreatedFuture = - new StringTrackableCompletableFuture("waiting for createClientConnection to finish"); - channelFuture.addListener(f -> { - log.atInfo().setMessage(()-> - "New network connection result for " + connectionContext + "=" + f.isSuccess()).log(); - if (f.isSuccess()) { - clientConnectionChannelCreatedFuture.future.complete(channelFuture); - } else { - clientConnectionChannelCreatedFuture.future.completeExceptionally(f.cause()); - } - }); - return clientConnectionChannelCreatedFuture; + .get(() -> + NettyPacketToHttpConsumer.createClientConnection(eventLoop, + sslContext, serverUri, connectionContext, timeout) + .whenComplete((v,t)-> { + if (t == null) { + log.atDebug().setMessage(() -> "New network connection result for " + + connectionContext + " =" + v).log(); + } else { + log.atInfo().setMessage(() -> "got exception for " + connectionContext) + .setCause(t).log(); + } + }, () -> "waiting for createClientConnection to finish")); } public CompletableFuture shutdownNow() { CompletableFuture shutdownFuture = new CompletableFuture<>(); connectionId2ChannelCache.invalidateAll(); - eventLoopGroup.shutdownGracefully().addListener(f->{ - if (f.isSuccess()) { - shutdownFuture.complete(null); - } else { - shutdownFuture.completeExceptionally(f.cause()); - } - }); - return shutdownFuture; + return NettyToCompletableFutureBinders.bindNettyFutureToCompletableFuture(eventLoopGroup.shutdownGracefully()); } public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) { @@ -143,25 +122,16 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN private DiagnosticTrackableCompletableFuture closeClientConnectionChannel(ConnectionReplaySession channelAndFutureWork) { - var channelClosedFuture = - new StringTrackableCompletableFuture("Waiting for closeFuture() on channel"); - - channelAndFutureWork.getFutureThatReturnsChannelFuture(false) - .thenAccept(channelFuture-> { - if (channelFuture == null) { - return; - } + return channelAndFutureWork.getFutureThatReturnsChannelFutureInAnyState(false) + .thenCompose(channelFuture-> { log.atTrace().setMessage(() -> "closing channel " + channelFuture.channel() + "(" + channelAndFutureWork.getChannelKeyContext() + ")...").log(); - channelFuture.channel().close() - .addListener(closeFuture -> { + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture( + channelFuture.channel().close(), + "calling channel.close()") + .thenApply(v -> { log.atTrace().setMessage(() -> "channel.close() has finished for " + - channelAndFutureWork.getChannelKeyContext()).log(); - if (closeFuture.isSuccess()) { - channelClosedFuture.future.complete(channelFuture.channel()); - } else { - channelClosedFuture.future.completeExceptionally(closeFuture.cause()); - } + channelAndFutureWork.getChannelKeyContext() + " with value=" + v).log(); if (channelAndFutureWork.hasWorkRemaining()) { log.atWarn().setMessage(() -> "Work items are still remaining for this connection session" + @@ -170,19 +140,9 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN "). " + channelAndFutureWork.calculateSizeSlowly() + " requests that were enqueued won't be run").log(); } - var schedule = channelAndFutureWork.schedule; - while (channelAndFutureWork.schedule.hasPendingTransmissions()) { - var scheduledItemToKill = schedule.peekFirstItem(); - schedule.removeFirstItem(); - } - }); - }, () -> "calling channel.close()") - .exceptionally(t->{ - log.atWarn().setMessage(()->"client connection encountered an exception while closing") - .setCause(t).log(); - channelClosedFuture.future.completeExceptionally(t); - return null; - }, () -> "handling any potential exceptions"); - return channelClosedFuture; + channelAndFutureWork.schedule.clear(); + return channelFuture.channel(); + }, () -> "clearing work"); + }, () -> ""); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java index 094e36e1d..6709b863c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java @@ -244,8 +244,8 @@ private Duration getDelayFromNowMs(Instant to) { return consumeFuture.thenCompose(cf -> NettyToCompletableFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, Duration.between(now(), startAt.plus(interval.multipliedBy(counter.get())))) - .getDeferredFutureThroughHandle((v,t)-> sendSendingRestOfPackets(packetReceiver, eventLoop, - iterator, startAt, interval, counter), () -> "sending next packet"), + .thenCompose(v -> sendSendingRestOfPackets(packetReceiver, eventLoop, iterator, + startAt, interval, counter), () -> "sending next packet"), () -> "recursing, once ready"); } else { return consumeFuture.getDeferredFutureThroughHandle((v,t) -> packetReceiver.finalizeRequest(), diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index de4cbc52f..e149cd847 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -32,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; @@ -316,7 +317,8 @@ public static void main(String[] args) throws Exception { cf-> DiagnosticTrackableCompletableFutureJsonFormatter.format(cf, TrafficReplayerTopLevel::formatWorkItem), activeContextLogger); ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor; scheduledExecutorService.scheduleAtFixedRate(()->{ - activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log(); + activeContextLogger.atInfo().setMessage(()->"Total requests outstanding at " + Instant.now() + + ": " + tr.requestWorkTracker.size()).log(); finalActiveContextMonitor.run(); }, ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index 17aa3db3f..e64aac458 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -3,7 +3,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -19,8 +18,10 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutHandler; +import lombok.Lombok; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.NettyToCompletableFutureBinders; import org.opensearch.migrations.replay.AggregatedRawResponse; import org.opensearch.migrations.replay.datahandlers.http.helpers.ReadMeteringHandler; import org.opensearch.migrations.replay.datahandlers.http.helpers.WriteMeteringHandler; @@ -41,7 +42,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; @Slf4j public class NettyPacketToHttpConsumer implements IPacketFinalizingConsumer { @@ -89,43 +89,40 @@ public NettyPacketToHttpConsumer(ConnectionReplaySession replaySession, var parentContext = ctx.createTargetRequestContext(); this.setCurrentMessageContext(parentContext.createHttpSendingContext()); responseBuilder = AggregatedRawResponse.builder(Instant.now()); - this.activeChannelFuture = new StringTrackableCompletableFuture<>( - () -> "incoming connection is ready for " + replaySession); - var initialFuture = this.activeChannelFuture; - - log.atDebug().setMessage(() -> - "C'tor: incoming session=" + replaySession).log(); - activateLiveChannel(initialFuture); + log.atDebug().setMessage(() -> "C'tor: incoming session=" + replaySession).log(); + this.activeChannelFuture = activateLiveChannel(); } - private void activateLiveChannel(DiagnosticTrackableCompletableFuture initialFuture) { - replaySession.getFutureThatReturnsChannelFuture(true).thenAccept(channelFuture-> { - channelFuture.addListener(connectFuture -> { - final var ctx = replaySession.getChannelKeyContext(); - if (connectFuture.isSuccess()) { - final var c = channelFuture.channel(); - if (c.isActive()) { - this.channel = c; - initializeChannelPipeline(); - log.atDebug().setMessage(()->"Channel initialized for " + ctx + " signaling future").log(); - initialFuture.future.complete(null); - } else { - // this may loop forever - until the event loop is shutdown - // (see the ClientConnectionPool::shutdownNow()) - ctx.addFailedChannelCreation(); - log.atWarn().setMessage(()->"Channel wasn't active, trying to create another for this request") - .log(); - activateLiveChannel(initialFuture); - } - } else { - ctx.addFailedChannelCreation(); - ctx.addTraceException(channelFuture.cause(), true); - log.atWarn().setMessage(()->"error creating channel, not retrying") - .setCause(connectFuture.cause()).log(); - initialFuture.future.completeExceptionally(connectFuture.cause()); - } - }); - }, () -> "creating an alive connection"); + private DiagnosticTrackableCompletableFuture activateLiveChannel() { + final var ctx = replaySession.getChannelKeyContext(); + return replaySession.getFutureThatReturnsChannelFutureInAnyState(true) + .thenCompose(channelFuture -> NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(channelFuture, + "waiting for newly acquired channel to be ready") + .getDeferredFutureThroughHandle((connectFuture,t)->{ + if (t != null) { + ctx.addFailedChannelCreation(); + ctx.addTraceException(channelFuture.cause(), true); + log.atWarn().setMessage(()->"error creating channel, not retrying") + .setCause(t).log(); + throw Lombok.sneakyThrow(t); + } + + final var c = channelFuture.channel(); + if (c.isActive()) { + this.channel = c; + initializeChannelPipeline(); + log.atDebug().setMessage(()->"Channel initialized for " + ctx + " signaling future").log(); + return StringTrackableCompletableFuture.completedFuture(null, ()->"Done"); + } else { + // this may recurse forever - until the event loop is shutdown + // (see the ClientConnectionPool::shutdownNow()) + ctx.addFailedChannelCreation(); + log.atWarn().setMessage(()->"Channel wasn't active, trying to create another for this request") + .log(); + return activateLiveChannel(); + } + }, () -> "acting on ready channelFuture to retry if inactive or to return"), + () -> "taking newly acquired channel and making it active"); } private & @@ -142,11 +139,12 @@ public IReplayContexts.ITargetRequestContext getParentContext() { return currentRequestContextUnion.getLogicalEnclosingScope(); } - public static ChannelFuture createClientConnection(EventLoopGroup eventLoopGroup, - SslContext sslContext, - URI serverUri, - IReplayContexts.IChannelKeyContext channelKeyContext, - Duration timeout) { + public static DiagnosticTrackableCompletableFuture + createClientConnection(EventLoopGroup eventLoopGroup, + SslContext sslContext, + URI serverUri, + IReplayContexts.IChannelKeyContext channelKeyContext, + Duration timeout) { String host = serverUri.getHost(); int port = serverUri.getPort(); log.atTrace().setMessage(()->"Active - setting up backend connection to " + host + ":" + port).log(); @@ -165,38 +163,29 @@ protected void initChannel(@NonNull Channel ch) throws Exception { var outboundChannelFuture = b.connect(host, port); - var rval = new DefaultChannelPromise(outboundChannelFuture.channel()); - outboundChannelFuture.addListener((ChannelFutureListener) connectFuture -> { - if (connectFuture.isSuccess()) { - final var channel = connectFuture.channel(); - log.atTrace().setMessage(()-> channelKeyContext.getChannelKey() + - " Done setting up client channel & it was successful for " + channel).log(); - var pipeline = channel.pipeline(); - if (sslContext != null) { - var sslEngine = sslContext.newEngine(channel.alloc()); - sslEngine.setUseClientMode(true); - var sslHandler = new SslHandler(sslEngine); - addLoggingHandlerLast(pipeline, "A"); - pipeline.addLast(SSL_HANDLER_NAME, sslHandler); - sslHandler.handshakeFuture().addListener(handshakeFuture -> { - if (handshakeFuture.isSuccess()) { - rval.setSuccess(); + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(outboundChannelFuture, "") + .thenCompose(voidVal-> { + if (outboundChannelFuture.isSuccess()) { + final var channel = outboundChannelFuture.channel(); + log.atTrace().setMessage(() -> channelKeyContext.getChannelKey() + + " Done setting up client channel & it was successful for " + channel).log(); + var pipeline = channel.pipeline(); + if (sslContext != null) { + var sslEngine = sslContext.newEngine(channel.alloc()); + sslEngine.setUseClientMode(true); + var sslHandler = new SslHandler(sslEngine); + addLoggingHandlerLast(pipeline, "A"); + pipeline.addLast(SSL_HANDLER_NAME, sslHandler); + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(sslHandler.handshakeFuture(), + ()->"") + .thenApply(voidVal2->outboundChannelFuture, ()->""); } else { - rval.setFailure(handshakeFuture.cause()); + return StringTrackableCompletableFuture.completedFuture(outboundChannelFuture, ()->""); } - }); - } else { - rval.setSuccess(); - } - } else { - // Close the connection if the connection attempt has failed. - log.atWarn().setCause(connectFuture.cause()) - .setMessage(() -> channelKeyContext.getChannelKey() + " CONNECT future was not successful, " + - "so setting the channel future's result to an exception").log(); - rval.setFailure(connectFuture.cause()); - } - }); - return rval; + } else { + return StringTrackableCompletableFuture.failedFuture(outboundChannelFuture.cause(), ()->""); + } + }, () -> ""); } private static boolean channelIsInUse(Channel c) { @@ -309,37 +298,8 @@ private IReplayContexts.IReplayerHttpTransactionContext httpContext() { private DiagnosticTrackableCompletableFuture writePacketAndUpdateFuture(ByteBuf packetData) { - final var completableFuture = new DiagnosticTrackableCompletableFuture(new CompletableFuture<>(), - ()->"CompletableFuture that will wait for the netty future to fill in the completion value"); - channel.writeAndFlush(packetData) - .addListener((ChannelFutureListener) future -> { - Throwable cause = null; - try { - if (!future.isSuccess()) { - log.atWarn().setMessage(()-> httpContext().getReplayerRequestKey() + "closing outbound channel " + - "because WRITE future was not successful " + future.cause() + " hash=" + - System.identityHashCode(packetData) + " will be sending the exception to " + - completableFuture).log(); - future.channel().close(); // close the backside - cause = future.cause(); - } - } catch (Exception e) { - cause = e; - } - if (cause == null) { - log.atTrace().setMessage(()->"Previously returned CompletableFuture packet write was " + - "successful: " + packetData + " hash=" + System.identityHashCode(packetData)).log(); - completableFuture.future.complete(null); - } else { - log.atInfo().setMessage(()->"Previously returned CompletableFuture packet write had " + - " an exception :" + packetData + " hash=" + System.identityHashCode(packetData)).log(); - completableFuture.future.completeExceptionally(cause); - channel.close(); - } - }); - log.atTrace().setMessage(()->"Writing packet data=" + packetData + - ". Created future for writing data="+completableFuture).log(); - return completableFuture; + return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(channel.writeAndFlush(packetData), + "CompletableFuture that will wait for the netty future to fill in the completion value"); } @Override diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java index a1d1f0df7..4fe33d8ab 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java @@ -2,7 +2,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; @@ -13,13 +12,7 @@ import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture; import java.io.IOException; -import java.util.Comparator; -import java.util.Map; -import java.util.Optional; -import java.util.PriorityQueue; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -import java.util.function.UnaryOperator; /** * This class contains everything that is needed to replay packets to a specific channel. @@ -61,7 +54,7 @@ public ConnectionReplaySession(EventLoop eventLoop, IReplayContexts.IChannelKeyC } public DiagnosticTrackableCompletableFuture - getFutureThatReturnsChannelFuture(boolean requireActiveChannel) { + getFutureThatReturnsChannelFutureInAnyState(boolean requireActiveChannel) { StringTrackableCompletableFuture eventLoopFuture = new StringTrackableCompletableFuture<>("procuring a connection"); eventLoop.submit(() -> { @@ -113,6 +106,6 @@ public boolean hasWorkRemaining() { } public long calculateSizeSlowly() { - return schedule.calculateSizeSlowly() + scheduleSequencer.size(); + return (long) schedule.timeToRunnableMap.size() + scheduleSequencer.size(); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java index 255952d86..da1fb9bf3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java @@ -10,6 +10,7 @@ public class TimeToResponseFulfillmentFutureMap { + public static class FutureWorkPoint { public final Instant startTime; public final DiagnosticTrackableCompletableFuture scheduleFuture; @@ -42,6 +43,10 @@ public boolean isEmpty() { return timeToRunnableMap.isEmpty(); } + public void clear() { + timeToRunnableMap.clear(); + } + public boolean hasPendingTransmissions() { if (timeToRunnableMap.isEmpty()) { return false; @@ -50,13 +55,9 @@ public boolean hasPendingTransmissions() { } } - public long calculateSizeSlowly() { - return timeToRunnableMap.size(); - } - @Override public String toString() { - return "[" + this.calculateSizeSlowly() + "]: {" + formatBookends() + "}"; + return "[" + (long) timeToRunnableMap.size() + "]: {" + formatBookends() + "}"; } private String formatBookends() { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java index 7979e95a0..e96e8a158 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/OnlineRadixSorter.java @@ -72,13 +72,13 @@ public OnlineRadixSorter(int startingOffset) { */ public DiagnosticTrackableCompletableFuture addFutureForWork(final int index, FutureTransformer processor) { - var oldWorkItem = items.get(index); - if (oldWorkItem == null) { + var workItem = items.get(index); + if (workItem == null) { if (index < currentOffset) { throw new IllegalArgumentException("index (" + index + ")" + " must be > last processed item (" + currentOffset + ")"); } - for (int nextKey = Math.max(currentOffset, items.isEmpty() ? 0 : items.lastKey()+1); + for (int nextKey = Math.max(currentOffset, items.isEmpty() ? Integer.MIN_VALUE : items.lastKey()+1); nextKey<=index; ++nextKey) { int finalNextKey = nextKey; @@ -86,20 +86,20 @@ public OnlineRadixSorter(int startingOffset) { new StringTrackableCompletableFuture( CompletableFuture.completedFuture(null), "unlinked signaling future for slot #" + finalNextKey) : - items.get(items.lastKey()).signalWorkCompletedFuture + items.get(finalNextKey-1).signalWorkCompletedFuture .thenAccept(v-> {}, ()->"Kickoff for slot #" + finalNextKey); - oldWorkItem = new IndexedWork(signalFuture, null, + workItem = new IndexedWork(signalFuture, null, new StringTrackableCompletableFuture(()->"Work to finish for slot #" + finalNextKey + " is awaiting [" + getAwaitingText() + "]")); - oldWorkItem.signalWorkCompletedFuture.whenComplete((v,t)->{ + workItem.signalWorkCompletedFuture.whenComplete((v,t)->{ ++currentOffset; items.remove(finalNextKey); }, ()->"cleaning up spent work for idx #" + finalNextKey); - items.put(nextKey, oldWorkItem); + items.put(nextKey, workItem); } } - return oldWorkItem.addWorkFuture(processor, index); + return workItem.addWorkFuture(processor, index); } public String getAwaitingText() { diff --git a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties index 36a75d032..1fb20bd93 100644 --- a/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties +++ b/TrafficCapture/trafficReplayer/src/main/resources/log4j2.properties @@ -63,7 +63,7 @@ appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.name = AllActiveWorkMonitorFile appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.fileName = logs/longRunningActivity.log appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.filePattern = logs/%d{yyyy-MM}{UTC}/longRunningActivity-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log.gz appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.layout.type = PatternLayout -appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.layout.pattern = %msg ([%-5level] %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC})%n +appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.layout.pattern = %msg%n appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.policies.type = Policies appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.policies.time.type = TimeBasedTriggeringPolicy appender.ALL_ACTIVE_WORK_MONITOR_LOGFILE.policies.time.interval = 60 diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 8a01d9dae..5a44c3054 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.migrations.NettyToCompletableFutureBinders; import org.opensearch.migrations.replay.AggregatedRawResponse; import org.opensearch.migrations.replay.ClientConnectionPool; import org.opensearch.migrations.replay.PacketToTransformingHttpHandlerFactory; @@ -149,10 +150,8 @@ public void testHttpResponseIsSuccessfullyCaptured(boolean useTls, boolean large var channelContext = httpContext.getChannelKeyContext(); var eventLoop = new NioEventLoopGroup(1, new DefaultThreadFactory("test")).next(); var replaySession = new ConnectionReplaySession(eventLoop, channelContext, - () -> ClientConnectionPool.getCompletedChannelFutureAsCompletableFuture( - httpContext.getChannelKeyContext(), - NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, - testServer.localhostEndpoint(), channelContext, REGULAR_RESPONSE_TIMEOUT))); + () -> NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, + testServer.localhostEndpoint(), channelContext, REGULAR_RESPONSE_TIMEOUT)); var nphc = new NettyPacketToHttpConsumer(replaySession, httpContext); nphc.consumeBytes((EXPECTED_REQUEST_STRING).getBytes(StandardCharsets.UTF_8)); var aggregatedResponse = nphc.finalizeRequest().get(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java index d49e5ac9a..1020d9757 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java @@ -300,9 +300,9 @@ public void makeSureThatCollateralDamageDoesntFreezeTests() throws Throwable { @ParameterizedTest @CsvSource(value = { -// "3,false", -// "-1,false", -// "3,true", + "3,false", + "-1,false", + "3,true", "-1,true", }) @Tag("longTest")