Skip to content

Commit

Permalink
Merge pull request #538 from gregschohn/FixConnectionLeaks
Browse files Browse the repository at this point in the history
Fix connection leaks
  • Loading branch information
gregschohn authored Mar 28, 2024
2 parents 0cbcdbc + 077976c commit 5138484
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,50 +97,28 @@ public ConnectionReplaySession buildConnectionReplaySession(final IReplayContext
return clientConnectionChannelCreatedFuture;
}

public Future shutdownNow() {
public CompletableFuture<Void> shutdownNow() {
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
connectionId2ChannelCache.invalidateAll();
return eventLoopGroup.shutdownGracefully();
}

public DiagnosticTrackableCompletableFuture<String, Void> closeConnectionsAndShutdown() {
StringTrackableCompletableFuture<Void> eventLoopFuture =
new StringTrackableCompletableFuture<>(new CompletableFuture<>(), () -> "all channels closed");
this.eventLoopGroup.submit(() -> {
try {
var channelClosedFuturesArray =
connectionId2ChannelCache.asMap().values().stream()
.map(this::closeClientConnectionChannel)
.collect(Collectors.toList());
StringTrackableCompletableFuture.<Channel>allOf(channelClosedFuturesArray.stream(),
() -> "all channels closed")
.handle((v, t) -> {
if (t == null) {
eventLoopFuture.future.complete(v);
} else {
eventLoopFuture.future.completeExceptionally(t);
}
return null;
},
() -> "Waiting for all channels to close: Remaining=" +
(channelClosedFuturesArray.stream().filter(c -> !c.isDone()).count()));
} catch (Exception e) {
log.atError().setCause(e).setMessage("Caught error while closing cached connections").log();
eventLoopFuture.future.completeExceptionally(e);
eventLoopGroup.shutdownGracefully().addListener(f->{
if (f.isSuccess()) {
shutdownFuture.complete(null);
} else {
shutdownFuture.completeExceptionally(f.cause());
}
});
return eventLoopFuture.map(f -> f.whenComplete((c, t) -> shutdownNow()),
() -> "Final shutdown for " + this.getClass().getSimpleName());
return shutdownFuture;
}

public void closeConnection(IReplayContexts.IChannelKeyContext ctx) {
public void closeConnection(IReplayContexts.IChannelKeyContext ctx, int sessionNumber) {
var connId = ctx.getConnectionId();
log.atInfo().setMessage(() -> "closing connection for " + connId).log();
var channelsFuture = connectionId2ChannelCache.getIfPresent(connId);
if (channelsFuture != null) {
closeClientConnectionChannel(channelsFuture);
var connectionReplaySession = connectionId2ChannelCache.getIfPresent(getKey(connId, sessionNumber));
if (connectionReplaySession != null) {
closeClientConnectionChannel(connectionReplaySession);
connectionId2ChannelCache.invalidate(connId);
} else {
log.atTrace().setMessage(()->"No ChannelFuture for " + ctx +
log.atInfo().setMessage(()->"No ChannelFuture for " + ctx +
" in closeConnection. The connection may have already been closed").log();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,6 @@ private static void logStartOfWork(Object stringableKey, long newCount, Instant
return hookWorkFinishingUpdates(future, timestamp, channelKey, label);
}

public DiagnosticTrackableCompletableFuture<String, Void> closeConnectionsAndShutdown() {
return networkSendOrchestrator.clientConnectionPool.closeConnectionsAndShutdown();
}

public void setFirstTimestamp(Instant firstPacketTimestamp) {
timeShifter.setFirstTimestamp(firstPacketTimestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public StringTrackableCompletableFuture<Void> scheduleClose(IReplayContexts.ICha
channelFutureAndRequestSchedule, finalTunneledResponse, timestamp,
new ChannelTask(ChannelTaskType.CLOSE, () -> {
log.trace("Closing client connection " + channelInteraction);
clientConnectionPool.closeConnection(ctx);
clientConnectionPool.closeConnection(ctx, sessionNumber);
finalTunneledResponse.future.complete(null);
})));
return finalTunneledResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public class TrafficReplayer implements AutoCloseable {
private final AtomicReference<Error> shutdownReasonRef;
private final AtomicReference<CompletableFuture<Void>> shutdownFutureRef;
private final AtomicReference<CompletableFuture<List<ITrafficStreamWithKey>>> nextChunkFutureRef;
private Future nettyShutdownFuture;

public static class DualException extends Exception {
public final Throwable originalCause;
Expand Down Expand Up @@ -502,10 +501,10 @@ public void close() {
}
}

void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout,
BlockingTrafficSource trafficSource,
TimeShifter timeShifter,
Consumer<SourceTargetCaptureTuple> resultTupleConsumer)
public void setupRunAndWaitForReplayToFinish(Duration observedPacketConnectionTimeout,
BlockingTrafficSource trafficSource,
TimeShifter timeShifter,
Consumer<SourceTargetCaptureTuple> resultTupleConsumer)
throws InterruptedException, ExecutionException {

var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool);
Expand Down Expand Up @@ -578,7 +577,7 @@ public void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketCo
Consumer<SourceTargetCaptureTuple> resultTupleConsumer)
throws TerminationException, ExecutionException, InterruptedException {
try {
setupRunAndWaitForReplay(observedPacketConnectionTimeout, trafficSource,
setupRunAndWaitForReplayToFinish(observedPacketConnectionTimeout, trafficSource,
timeShifter, resultTupleConsumer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -591,7 +590,6 @@ public void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketCo
}
// if nobody has run shutdown yet, do so now so that we can tear down the netty resources
shutdown(null).get(); // if somebody already HAD run shutdown, it will return the future already created
nettyShutdownFuture.sync();
}

@AllArgsConstructor
Expand Down Expand Up @@ -794,14 +792,6 @@ private void waitForRemainingWork(Level logLevel,
} finally {
allRemainingWorkFutureOrShutdownSignalRef.set(null);
}
allWorkFuture.getDeferredFutureThroughHandle((t, v) -> {
log.info("stopping packetHandlerFactory's group");
replayEngine.closeConnectionsAndShutdown();
// squash exceptions for individual requests
return StringTrackableCompletableFuture.completedFuture(null, () -> "finished all work");
}, () -> "TrafficReplayer.PacketHandlerFactory->stopGroup")
.get(); // allWorkFuture already completed - here we're just going to wait for the
// rest of the cleanup to finish, as per the name of the function
}

private void handleAlreadySetFinishedSignal() throws InterruptedException, ExecutionException {
Expand Down Expand Up @@ -964,12 +954,13 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture<String
}
stopReadingRef.set(true);
liveTrafficStreamLimiter.close();
nettyShutdownFuture = clientConnectionPool.shutdownNow()
.addListener(f->{
if (f.isSuccess()) {
shutdownFutureRef.get().complete(null);

var nettyShutdownFuture = clientConnectionPool.shutdownNow();
nettyShutdownFuture.whenComplete((v,t) -> {
if (t != null) {
shutdownFutureRef.get().completeExceptionally(t);
} else {
shutdownFutureRef.get().completeExceptionally(f.cause());
shutdownFutureRef.get().complete(null);
}
});
Optional.ofNullable(this.nextChunkFutureRef.get()).ifPresent(f->f.cancel(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public static void setup() {
"Connection: Keep-Alive\r\n" +
"Host: localhost\r\n" +
"User-Agent: UnitTest\r\n" +
"Connection: Keep-Alive\r\n" +
"\r\n";

final static String SAMPLE_REQUEST_AS_BLOCKS = "[G],[E],[T],[ ],[/],[ ],[H],[T],[T],[P],[/],[1],[.],[1],[\r" +
Expand All @@ -41,16 +40,13 @@ public static void setup() {
"],[\n" +
"],[U],[s],[e],[r],[-],[A],[g],[e],[n],[t],[:],[ ],[U],[n],[i],[t],[T],[e],[s],[t],[\r" +
"],[\n" +
"],[C],[o],[n],[n],[e],[c],[t],[i],[o],[n],[:],[ ],[K],[e],[e],[p],[-],[A],[l],[i],[v],[e],[\r" +
"],[\n" +
"],[\r" +
"],[\n" +
"]";
final static String SAMPLE_REQUEST_AS_PARSED_HTTP = "GET / HTTP/1.1\n" +
"Connection: Keep-Alive\n" +
"Host: localhost\n" +
"User-Agent: UnitTest\n" +
"Connection: Keep-Alive\n" +
"content-length: 0\n" +
"\n";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,9 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls, boolean lar
new TransformationLoader().getTransformerFactoryLoader(null), null);
var timeShifter = new TimeShifter();
timeShifter.setFirstTimestamp(Instant.now());
var sendingFactory = new ReplayEngine(
new RequestSenderOrchestrator(
new ClientConnectionPool(testServer.localhostEndpoint(), sslContext,
"targetPool for testThatConnectionsAreKeptAliveAndShared", 1)),
var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext,
"targetPool for testThatConnectionsAreKeptAliveAndShared", 1);
var sendingFactory = new ReplayEngine(new RequestSenderOrchestrator(clientConnectionPool),
new TestFlowController(), timeShifter);
for (int j = 0; j < 2; ++j) {
for (int i = 0; i < 2; ++i) {
Expand All @@ -210,7 +209,7 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls, boolean lar
}
}
}
var stopFuture = sendingFactory.closeConnectionsAndShutdown();
var stopFuture = clientConnectionPool.shutdownNow();
log.info("waiting for factory to shutdown: " + stopFuture);
stopFuture.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ protected TestContext makeInstrumentationContext() {
}

@ParameterizedTest
@ValueSource(ints = {1,2})
@ValueSource(ints = {1, 2})
@ResourceLock("TrafficReplayerRunner")
public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) throws Throwable {
var random = new Random(1);
try (var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2),
try (var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMinutes(10),
response->TestHttpServerContext.makeResponse(random, response))) {
var baseTime = Instant.now();
var fixedTimestamp =
Expand All @@ -63,19 +63,21 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro
tsb = tsb
.addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp)
.setRead(ReadObservation.newBuilder()
.setData(ByteString.copyFrom(("GET /" + i + " HTTP/1.0\r\n")
.setData(ByteString.copyFrom(("GET /" + i + " HTTP/1.1\r\n" +
"Connection: Keep-Alive\r\n" +
"Host: localhost\r\n")
.getBytes(StandardCharsets.UTF_8)))
.build())
.build())
.addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp)
.setEndOfMessageIndicator(EndOfMessageIndication.newBuilder()
.setFirstLineByteLength(14)
.setHeadersByteLength(14)
.setHeadersByteLength(58)
.build())
.build())
.addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp)
.setWrite(WriteObservation.newBuilder()
.setData(ByteString.copyFrom("HTTP/1.0 OK 200\r\n".getBytes(StandardCharsets.UTF_8)))
.setData(ByteString.copyFrom("HTTP/1.1 OK 200\r\n".getBytes(StandardCharsets.UTF_8)))
.build())
.build());
}
Expand All @@ -92,16 +94,18 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro
true, 10, 10 * 1024,
"targetConnectionPool for testStreamWithRequestsWithCloseIsCommittedOnce");
var blockingTrafficSource = new BlockingTrafficSource(trafficSource, Duration.ofMinutes(2))) {
tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(70), blockingTrafficSource,
new TimeShifter(10 * 1000), (t) -> {
tr.setupRunAndWaitForReplayToFinish(Duration.ofSeconds(70), blockingTrafficSource,
new TimeShifter(10 * 1000),
t -> {
var wasNew = tuplesReceived.add(t.getRequestKey().toString());
Assertions.assertTrue(wasNew);
});
Assertions.assertEquals(numRequests, tuplesReceived.size());
Thread.sleep(1000);
checkSpansForSimpleReplayedTransactions(rootContext.inMemoryInstrumentationBundle, numRequests);
tr.shutdown(null).get();
}
Assertions.assertEquals(numRequests, tuplesReceived.size());
}
checkSpansForSimpleReplayedTransactions(rootContext.inMemoryInstrumentationBundle, numRequests);
log.info("done");
}

private static class TraceProcessor {
Expand All @@ -128,31 +132,10 @@ public String getRemainingItemsString() {
private void checkSpansForSimpleReplayedTransactions(InMemoryInstrumentationBundle inMemoryBundle,
int numRequests) {
var traceProcessor = new TraceProcessor(inMemoryBundle.getFinishedSpans());
for (int numTries=1; ; ++numTries) {
final String TCP_CONNECTION_SCOPE_NAME = "tcpConnection";
var numTraces = traceProcessor.getCountAndRemoveSpan(TCP_CONNECTION_SCOPE_NAME);
switch (numTraces) {
case 1:
break;
case -1:
traceProcessor = new TraceProcessor(inMemoryBundle.getFinishedSpans());
if (numTries > 5) {
Assertions.fail("Even after waiting/polling, no " + TCP_CONNECTION_SCOPE_NAME + " was found.");
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw Lombok.sneakyThrow(e);
}
continue;
default:
Assertions.fail("Found " + numTraces + " for " + TCP_CONNECTION_SCOPE_NAME);
}
break;
}

Assertions.assertEquals(1, traceProcessor.getCountAndRemoveSpan("channel"));
Assertions.assertEquals(1, traceProcessor.getCountAndRemoveSpan("trafficStreamLifetime"));
Assertions.assertEquals(1, traceProcessor.getCountAndRemoveSpan("tcpConnection"));
Assertions.assertEquals(numRequests, traceProcessor.getCountAndRemoveSpan("httpTransaction"));
Assertions.assertEquals(numRequests, traceProcessor.getCountAndRemoveSpan("accumulatingRequest"));
Assertions.assertEquals(numRequests, traceProcessor.getCountAndRemoveSpan("accumulatingResponse"));
Expand Down

0 comments on commit 5138484

Please sign in to comment.