From 2e4bd39fd54d77ee800dea8f6eed5e8335bc169a Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 16 Aug 2024 11:18:24 -0400 Subject: [PATCH] Fix a bug that caused lost signals for scheduled items when they were scheduled on eventLoops that were already stopping. Signed-off-by: Greg Schohn --- .../migrations/NettyFutureBinders.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyFutureBinders.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyFutureBinders.java index f20e5fbdd..b82939617 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyFutureBinders.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/NettyFutureBinders.java @@ -1,6 +1,7 @@ package org.opensearch.migrations; import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -61,8 +62,14 @@ public static TrackedFuture bindNettyScheduleToCompletableFuture( Duration delay ) { var delayMs = Math.max(0, delay.toMillis()); - return bindNettyFutureToTrackableFuture( - eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS), + var scheduledFuture = eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS); + if (eventLoop.isShuttingDown()) { + // This is safe to do even though the event was scheduled because the scheduled future hasn't + // been wired to trigger anything else yet. This gives us an opportunity for an easy 2-phase-commit. + return TextTrackedFuture.failedFuture(new CancellationException("event loop is already shutting down"), + () -> "Signalling that work cannot be scheduled because the even loop is already being shut down"); + } + return bindNettyFutureToTrackableFuture(scheduledFuture, "scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)" ); } @@ -73,6 +80,14 @@ public static CompletableFuture bindNettyScheduleToCompletableFuture( CompletableFuture cf ) { var delayMs = Math.max(0, delay.toMillis()); - return bindNettyFutureToCompletableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS), cf); + eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS); + var scheduledFuture = eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS); + if (eventLoop.isShuttingDown()) { + // This is safe to do even though the event was scheduled because the scheduled future hasn't + // been wired to trigger anything else yet. This gives us an opportunity for an easy 2-phase-commit. + cf.completeExceptionally(new CancellationException("event loop is already shutting down")); + return cf; + } + return bindNettyFutureToCompletableFuture(scheduledFuture, cf); } }