Skip to content

Commit

Permalink
Fix a bug that caused lost signals for scheduled items when they were…
Browse files Browse the repository at this point in the history
… scheduled on eventLoops that were already stopping.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Aug 16, 2024
1 parent 900630e commit 2e4bd39
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations;

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -61,8 +62,14 @@ public static TrackedFuture<String, Void> bindNettyScheduleToCompletableFuture(
Duration delay
) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToTrackableFuture(
eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS),
var scheduledFuture = eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS);
if (eventLoop.isShuttingDown()) {
// This is safe to do even though the event was scheduled because the scheduled future hasn't
// been wired to trigger anything else yet. This gives us an opportunity for an easy 2-phase-commit.
return TextTrackedFuture.failedFuture(new CancellationException("event loop is already shutting down"),
() -> "Signalling that work cannot be scheduled because the even loop is already being shut down");
}
return bindNettyFutureToTrackableFuture(scheduledFuture,
"scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)"
);
}
Expand All @@ -73,6 +80,14 @@ public static CompletableFuture<Void> bindNettyScheduleToCompletableFuture(
CompletableFuture<Void> cf
) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToCompletableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS), cf);
eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS);
var scheduledFuture = eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS);
if (eventLoop.isShuttingDown()) {
// This is safe to do even though the event was scheduled because the scheduled future hasn't
// been wired to trigger anything else yet. This gives us an opportunity for an easy 2-phase-commit.
cf.completeExceptionally(new CancellationException("event loop is already shutting down"));
return cf;
}
return bindNettyFutureToCompletableFuture(scheduledFuture, cf);
}
}

0 comments on commit 2e4bd39

Please sign in to comment.