Skip to content

Commit

Permalink
Convert remaining netty Future 'addListener' calls to use NettyToComp…
Browse files Browse the repository at this point in the history
…letableFutureBinders + other assorted tweaks.

I've also stripped the longRunningActivity file of the log levels and timestamps.  They were really just getting in the way of understanding what was going on and being able to quickly copy-paste from there into a json file for further analysis.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Apr 28, 2024
1 parent b540e91 commit e35ca8c
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import java.util.function.Supplier;

public class NettyToCompletableFutureBinders {

private NettyToCompletableFutureBinders() {}

public static CompletableFuture<Void>
bindNettyFutureToCompletableFuture(Future<?> nettyFuture, CompletableFuture<Void> cf) {
nettyFuture.addListener(f -> {
Expand Down Expand Up @@ -54,7 +57,7 @@ public class NettyToCompletableFutureBinders {
bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Duration delay) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToTrackableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS),
"scheduling to run next send at " + delay + " in " + delayMs +" ms (clipped)");
"scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)");
}

public static CompletableFuture<Void>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,21 @@
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.NettyToCompletableFutureBinders;
import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer;
import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Slf4j
public class ClientConnectionPool {
Expand Down Expand Up @@ -81,41 +77,24 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha
private DiagnosticTrackableCompletableFuture<String, ChannelFuture>
getResilientClientChannelProducer(EventLoop eventLoop, IReplayContexts.IChannelKeyContext connectionContext) {
return new AdaptiveRateLimiter<String, ChannelFuture>()
.get(() -> {
var channelFuture = NettyPacketToHttpConsumer.createClientConnection(eventLoop,
sslContext, serverUri, connectionContext, timeout);
return getCompletedChannelFutureAsCompletableFuture(connectionContext, channelFuture);
});
}

public static StringTrackableCompletableFuture<ChannelFuture>
getCompletedChannelFutureAsCompletableFuture(IReplayContexts.IChannelKeyContext connectionContext,
ChannelFuture channelFuture) {
var clientConnectionChannelCreatedFuture =
new StringTrackableCompletableFuture<ChannelFuture>("waiting for createClientConnection to finish");
channelFuture.addListener(f -> {
log.atInfo().setMessage(()->
"New network connection result for " + connectionContext + "=" + f.isSuccess()).log();
if (f.isSuccess()) {
clientConnectionChannelCreatedFuture.future.complete(channelFuture);
} else {
clientConnectionChannelCreatedFuture.future.completeExceptionally(f.cause());
}
});
return clientConnectionChannelCreatedFuture;
.get(() ->
NettyPacketToHttpConsumer.createClientConnection(eventLoop,
sslContext, serverUri, connectionContext, timeout)
.whenComplete((v,t)-> {
if (t == null) {
log.atDebug().setMessage(() -> "New network connection result for " +
connectionContext + " =" + v).log();
} else {
log.atInfo().setMessage(() -> "got exception for " + connectionContext)
.setCause(t).log();
}
}, () -> "waiting for createClientConnection to finish"));
}

public CompletableFuture<Void> shutdownNow() {
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
connectionId2ChannelCache.invalidateAll();
eventLoopGroup.shutdownGracefully().addListener(f->{
if (f.isSuccess()) {
shutdownFuture.complete(null);
} else {
shutdownFuture.completeExceptionally(f.cause());
}
});
return shutdownFuture;
return NettyToCompletableFutureBinders.bindNettyFutureToCompletableFuture(eventLoopGroup.shutdownGracefully());
}

public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) {
Expand Down Expand Up @@ -143,25 +122,16 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN

private DiagnosticTrackableCompletableFuture<String, Channel>
closeClientConnectionChannel(ConnectionReplaySession channelAndFutureWork) {
var channelClosedFuture =
new StringTrackableCompletableFuture<Channel>("Waiting for closeFuture() on channel");

channelAndFutureWork.getFutureThatReturnsChannelFuture(false)
.thenAccept(channelFuture-> {
if (channelFuture == null) {
return;
}
return channelAndFutureWork.getFutureThatReturnsChannelFutureInAnyState(false)
.thenCompose(channelFuture-> {
log.atTrace().setMessage(() -> "closing channel " + channelFuture.channel() +
"(" + channelAndFutureWork.getChannelKeyContext() + ")...").log();
channelFuture.channel().close()
.addListener(closeFuture -> {
return NettyToCompletableFutureBinders.bindNettyFutureToTrackableFuture(
channelFuture.channel().close(),
"calling channel.close()")
.thenApply(v -> {
log.atTrace().setMessage(() -> "channel.close() has finished for " +
channelAndFutureWork.getChannelKeyContext()).log();
if (closeFuture.isSuccess()) {
channelClosedFuture.future.complete(channelFuture.channel());
} else {
channelClosedFuture.future.completeExceptionally(closeFuture.cause());
}
channelAndFutureWork.getChannelKeyContext() + " with value=" + v).log();
if (channelAndFutureWork.hasWorkRemaining()) {
log.atWarn().setMessage(() ->
"Work items are still remaining for this connection session" +
Expand All @@ -170,19 +140,9 @@ public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionN
"). " + channelAndFutureWork.calculateSizeSlowly() +
" requests that were enqueued won't be run").log();
}
var schedule = channelAndFutureWork.schedule;
while (channelAndFutureWork.schedule.hasPendingTransmissions()) {
var scheduledItemToKill = schedule.peekFirstItem();
schedule.removeFirstItem();
}
});
}, () -> "calling channel.close()")
.exceptionally(t->{
log.atWarn().setMessage(()->"client connection encountered an exception while closing")
.setCause(t).log();
channelClosedFuture.future.completeExceptionally(t);
return null;
}, () -> "handling any potential exceptions");
return channelClosedFuture;
channelAndFutureWork.schedule.clear();
return channelFuture.channel();
}, () -> "clearing work");
}, () -> "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ private Duration getDelayFromNowMs(Instant to) {
return consumeFuture.thenCompose(cf ->
NettyToCompletableFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop,
Duration.between(now(), startAt.plus(interval.multipliedBy(counter.get()))))
.getDeferredFutureThroughHandle((v,t)-> sendSendingRestOfPackets(packetReceiver, eventLoop,
iterator, startAt, interval, counter), () -> "sending next packet"),
.thenCompose(v -> sendSendingRestOfPackets(packetReceiver, eventLoop, iterator,
startAt, interval, counter), () -> "sending next packet"),
() -> "recursing, once ready");
} else {
return consumeFuture.getDeferredFutureThroughHandle((v,t) -> packetReceiver.finalizeRequest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -316,7 +317,8 @@ public static void main(String[] args) throws Exception {
cf-> DiagnosticTrackableCompletableFutureJsonFormatter.format(cf, TrafficReplayerTopLevel::formatWorkItem), activeContextLogger);
ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor;
scheduledExecutorService.scheduleAtFixedRate(()->{
activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log();
activeContextLogger.atInfo().setMessage(()->"Total requests outstanding at " + Instant.now() +
": " + tr.requestWorkTracker.size()).log();
finalActiveContextMonitor.run();
},
ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS);
Expand Down
Loading

0 comments on commit e35ca8c

Please sign in to comment.