From b00d1297b0e47ab475ad8050e12fd0fdcedf6e6f Mon Sep 17 00:00:00 2001 From: "a.fink" Date: Sat, 26 Nov 2022 16:34:29 +0300 Subject: [PATCH 1/6] DelegatingScheduler: Bill Pugh Singleton Implementation instead of volatile+synchronized --- .../internal/util/DelegatingScheduler.java | 66 +++++++++++-------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index 15f5558c..d7e64007 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -17,7 +17,16 @@ import dev.failsafe.spi.Scheduler; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * A {@link Scheduler} implementation that schedules delays on an internal, common ScheduledExecutorService and executes @@ -34,8 +43,6 @@ */ public final class DelegatingScheduler implements Scheduler { public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(); - private static volatile ForkJoinPool FORK_JOIN_POOL; - private static volatile ScheduledThreadPoolExecutor DELAYER; private final ExecutorService executorService; @@ -47,8 +54,16 @@ public DelegatingScheduler(ExecutorService executor) { this.executorService = executor; } - private static final class DelayerThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { + private static final class LazyDelayerHolder { + private static final ScheduledThreadPoolExecutor DELAYER = create(); + + private static ScheduledThreadPoolExecutor create() { + ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, LazyDelayerHolder::newThread); + delayer.setRemoveOnCancelPolicy(true); + return delayer; + } + + public static Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("FailsafeDelayScheduler"); @@ -56,6 +71,18 @@ public Thread newThread(Runnable r) { } } + private static final class LazyForkJoinPoolHolder { + private static final ForkJoinPool FORK_JOIN_POOL = create(); + + private static ForkJoinPool create(){ + return ForkJoinPool.getCommonPoolParallelism() > 1 + ? ForkJoinPool.commonPool() + : new ForkJoinPool(Math.max(Runtime.getRuntime().availableProcessors(), 2), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); + } + } + static final class ScheduledCompletableFuture extends CompletableFuture implements ScheduledFuture { // Guarded by this volatile Future delegate; @@ -96,32 +123,13 @@ public boolean cancel(boolean mayInterruptIfRunning) { } private static ScheduledExecutorService delayer() { - if (DELAYER == null) { - synchronized (DelegatingScheduler.class) { - if (DELAYER == null) { - ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, new DelayerThreadFactory()); - delayer.setRemoveOnCancelPolicy(true); - DELAYER = delayer; - } - } - } - return DELAYER; + return LazyDelayerHolder.DELAYER; } private ExecutorService executorService() { - if (executorService != null) - return executorService; - if (FORK_JOIN_POOL == null) { - synchronized (DelegatingScheduler.class) { - if (FORK_JOIN_POOL == null) { - if (ForkJoinPool.getCommonPoolParallelism() > 1) - FORK_JOIN_POOL = ForkJoinPool.commonPool(); - else - FORK_JOIN_POOL = new ForkJoinPool(2); - } - } - } - return FORK_JOIN_POOL; + return executorService != null + ? executorService + : LazyForkJoinPoolHolder.FORK_JOIN_POOL; } @Override @@ -133,7 +141,7 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit un Callable completingCallable = () -> { try { if (isForkJoinPool) { - // Guard against race with promise.cancel + // Guard against race with promise.cancel synchronized (promise) { promise.forkJoinPoolThread = Thread.currentThread(); } From 95627ff290d53cc71e48002f52c63f0329708adf Mon Sep 17 00:00:00 2001 From: "a.fink" Date: Sat, 26 Nov 2022 20:33:20 +0300 Subject: [PATCH 2/6] bug (delay == 0); fix bad commonPool() as executor; use less memory: don't capture variable with commonPool --- .../internal/util/DelegatingScheduler.java | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index d7e64007..5a7980f1 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -50,8 +50,11 @@ private DelegatingScheduler() { this.executorService = null; } - public DelegatingScheduler(ExecutorService executor) { - this.executorService = executor; + public DelegatingScheduler(ExecutorService executor){ + if (executor != ForkJoinPool.commonPool() || useCommonPool()) + this.executorService = executor; + else // don't use commonPool(): cannot support parallelism @see CompletableFuture#useCommonPool + this.executorService = null; } private static final class LazyDelayerHolder { @@ -75,14 +78,17 @@ private static final class LazyForkJoinPoolHolder { private static final ForkJoinPool FORK_JOIN_POOL = create(); private static ForkJoinPool create(){ - return ForkJoinPool.getCommonPoolParallelism() > 1 - ? ForkJoinPool.commonPool() + return useCommonPool() ? ForkJoinPool.commonPool() : new ForkJoinPool(Math.max(Runtime.getRuntime().availableProcessors(), 2), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } } + static boolean useCommonPool () { + return ForkJoinPool.getCommonPoolParallelism() > 1; + } + static final class ScheduledCompletableFuture extends CompletableFuture implements ScheduledFuture { // Guarded by this volatile Future delegate; @@ -159,12 +165,22 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit un return null; }; - if (delay == 0) + if (delay <= 0) promise.delegate = es.submit(completingCallable); + + // use less memory: don't capture variable with commonPool + else if (es == LazyForkJoinPoolHolder.FORK_JOIN_POOL) + promise.delegate = delayer().schedule(()->{ + // Guard against race with promise.cancel + synchronized(promise) { + if (!promise.isCancelled()) + promise.delegate = LazyForkJoinPoolHolder.FORK_JOIN_POOL.submit(completingCallable); + } + }, delay, unit); else - promise.delegate = delayer().schedule(() -> { + promise.delegate = delayer().schedule(()->{ // Guard against race with promise.cancel - synchronized (promise) { + synchronized(promise) { if (!promise.isCancelled()) promise.delegate = es.submit(completingCallable); } From 953aa91acf297b0da8515161d58ec5e647c6d810 Mon Sep 17 00:00:00 2001 From: "a.fink" Date: Sun, 27 Nov 2022 17:00:27 +0300 Subject: [PATCH 3/6] Optimizations pre-calculate everything use less memory compact lambdas --- .../internal/util/DelegatingScheduler.java | 129 ++++++++++++------ 1 file changed, 86 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index 5a7980f1..cd8eea83 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.ForkJoinPool.commonPool; + /** * A {@link Scheduler} implementation that schedules delays on an internal, common ScheduledExecutorService and executes * tasks on either a provided ExecutorService, {@link ForkJoinPool#commonPool()}, or an internal {@link ForkJoinPool} @@ -45,16 +47,42 @@ public final class DelegatingScheduler implements Scheduler { public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(); private final ExecutorService executorService; + private final int executorType; + + private static final int EX_FORK_JOIN = 1; + private static final int EX_SCHEDULED = 2; + private static final int EX_COMMON = 4; + private static final int EX_INTERNAL = 8; + private DelegatingScheduler() { - this.executorService = null; + this(null, false); } - public DelegatingScheduler(ExecutorService executor){ - if (executor != ForkJoinPool.commonPool() || useCommonPool()) - this.executorService = executor; - else // don't use commonPool(): cannot support parallelism @see CompletableFuture#useCommonPool - this.executorService = null; + public DelegatingScheduler(ExecutorService executor) { + this(executor, false); + } + + public DelegatingScheduler(ExecutorService executor, boolean canUseScheduledExecutorService) { + final int type; + if (executor == null || executor == commonPool()) { + if (ForkJoinPool.getCommonPoolParallelism() > 1) {// @see CompletableFuture#useCommonPool + executorService = commonPool(); + type = EX_COMMON | EX_FORK_JOIN; + + } else {// don't use commonPool(): cannot support parallelism + executorService = null; + type = EX_INTERNAL | EX_FORK_JOIN; + } + } else { + executorService = executor; + type = executor instanceof ForkJoinPool + ? EX_FORK_JOIN + : 0; + } + executorType = canUseScheduledExecutorService && executorService instanceof ScheduledExecutorService + ? type | EX_SCHEDULED + : type; } private static final class LazyDelayerHolder { @@ -75,19 +103,12 @@ public static Thread newThread(Runnable r) { } private static final class LazyForkJoinPoolHolder { - private static final ForkJoinPool FORK_JOIN_POOL = create(); - - private static ForkJoinPool create(){ - return useCommonPool() ? ForkJoinPool.commonPool() - : new ForkJoinPool(Math.max(Runtime.getRuntime().availableProcessors(), 2), - ForkJoinPool.defaultForkJoinWorkerThreadFactory, - null, true); - } + private static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool( + Math.max(Runtime.getRuntime().availableProcessors(), 2), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true/*asyncMode*/); } - static boolean useCommonPool () { - return ForkJoinPool.getCommonPoolParallelism() > 1; - } static final class ScheduledCompletableFuture extends CompletableFuture implements ScheduledFuture { // Guarded by this @@ -128,13 +149,14 @@ public boolean cancel(boolean mayInterruptIfRunning) { } } - private static ScheduledExecutorService delayer() { - return LazyDelayerHolder.DELAYER; + private ScheduledExecutorService delayer() { + return ((executorType & EX_SCHEDULED) == EX_SCHEDULED) + ? (ScheduledExecutorService) executorService() + : LazyDelayerHolder.DELAYER; } private ExecutorService executorService() { - return executorService != null - ? executorService + return executorService != null ? executorService : LazyForkJoinPoolHolder.FORK_JOIN_POOL; } @@ -142,50 +164,71 @@ private ExecutorService executorService() { @SuppressWarnings({ "unchecked", "rawtypes" }) public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>(delay, unit); - ExecutorService es = executorService(); - boolean isForkJoinPool = es instanceof ForkJoinPool; - Callable completingCallable = () -> { - try { - if (isForkJoinPool) { + final Callable completingCallable; + if ((executorType & EX_FORK_JOIN) == EX_FORK_JOIN) {// but why? Other ExecutorServices also support cancellation + completingCallable = () -> { + try { // Guard against race with promise.cancel synchronized (promise) { promise.forkJoinPoolThread = Thread.currentThread(); } - } - promise.complete(callable.call()); - } catch (Throwable t) { - promise.completeExceptionally(t); - } finally { - if (isForkJoinPool) { + promise.complete(callable.call()); + } catch (Throwable t) { + promise.completeExceptionally(t); + } finally { synchronized (promise) { promise.forkJoinPoolThread = null; } } - } - return null; - }; + return null; + }; + } else {// not forkJoin + completingCallable = () ->{ + try { + promise.complete(callable.call()); + } catch (Throwable t) { + promise.completeExceptionally(t); + } + return null; + }; + } - if (delay <= 0) - promise.delegate = es.submit(completingCallable); + if (delay <= 0) { + promise.delegate = executorService().submit(completingCallable); + return promise; + } + + final ExecutorService es = executorService(); + final Runnable r;// use less memory: don't capture variable with commonPool - // use less memory: don't capture variable with commonPool - else if (es == LazyForkJoinPoolHolder.FORK_JOIN_POOL) - promise.delegate = delayer().schedule(()->{ + if ((executorType & EX_COMMON) == EX_COMMON) + r = ()->{ + // Guard against race with promise.cancel + synchronized(promise) { + if (!promise.isCancelled()) + promise.delegate = commonPool().submit(completingCallable); + } + }; + + else if ((executorType & EX_INTERNAL) == EX_INTERNAL) + r = ()->{ // Guard against race with promise.cancel synchronized(promise) { if (!promise.isCancelled()) promise.delegate = LazyForkJoinPoolHolder.FORK_JOIN_POOL.submit(completingCallable); } - }, delay, unit); + }; + else - promise.delegate = delayer().schedule(()->{ + r = ()->{ // Guard against race with promise.cancel synchronized(promise) { if (!promise.isCancelled()) promise.delegate = es.submit(completingCallable); } - }, delay, unit); + }; + promise.delegate = delayer().schedule(r, delay, unit); return promise; } } \ No newline at end of file From 2651f47a578f7fbdfa4e44d76ea9d041c8a31e99 Mon Sep 17 00:00:00 2001 From: "a.fink" Date: Sun, 27 Nov 2022 19:34:24 +0300 Subject: [PATCH 4/6] DelegatingScheduler 100% test coverage -1 lambda --- .../internal/util/DelegatingScheduler.java | 23 +++-- .../util/DelegatingSchedulerTest.java | 90 ++++++++++++++++++- 2 files changed, 100 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index cd8eea83..5ca1e69e 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static java.util.concurrent.ForkJoinPool.commonPool; @@ -76,8 +77,7 @@ public DelegatingScheduler(ExecutorService executor, boolean canUseScheduledExec } } else { executorService = executor; - type = executor instanceof ForkJoinPool - ? EX_FORK_JOIN + type = executor instanceof ForkJoinPool ? EX_FORK_JOIN : 0; } executorType = canUseScheduledExecutorService && executorService instanceof ScheduledExecutorService @@ -85,16 +85,21 @@ public DelegatingScheduler(ExecutorService executor, boolean canUseScheduledExec : type; } - private static final class LazyDelayerHolder { - private static final ScheduledThreadPoolExecutor DELAYER = create(); + DelegatingScheduler (byte flags) { + executorService = null; + executorType = flags; + }//new for tests - private static ScheduledThreadPoolExecutor create() { - ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, LazyDelayerHolder::newThread); - delayer.setRemoveOnCancelPolicy(true); - return delayer; + private static final class LazyDelayerHolder extends ScheduledThreadPoolExecutor implements ThreadFactory { + private static final ScheduledThreadPoolExecutor DELAYER = new LazyDelayerHolder(); + + public LazyDelayerHolder(){ + super(1); + setThreadFactory(this); + setRemoveOnCancelPolicy(true); } - public static Thread newThread(Runnable r) { + @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("FailsafeDelayScheduler"); diff --git a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java index 0a3f8559..875a9c6d 100644 --- a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java +++ b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java @@ -15,18 +15,26 @@ */ package dev.failsafe.internal.util; -import net.jodah.concurrentunit.Waiter; -import dev.failsafe.testing.Asserts; import dev.failsafe.spi.Scheduler; +import dev.failsafe.testing.Asserts; +import net.jodah.concurrentunit.Waiter; import org.testng.annotations.Test; import java.io.IOException; import java.time.Duration; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; @Test public class DelegatingSchedulerTest { @@ -87,4 +95,78 @@ public void shouldClearInterruptFlagInForkJoinPoolThreads() throws Throwable { }, 0, TimeUnit.MILLISECONDS); waiter.await(1000); } -} + + + @Test + public void testInternalPool() throws TimeoutException, ExecutionException, InterruptedException{ + DelegatingScheduler ds = new DelegatingScheduler((byte) 8);// internal, not ForkJoin + + Waiter waiter = new Waiter(); + + ScheduledFuture sf = ds.schedule(()->{ + waiter.rethrow(new IOException("OK! testInternalPool")); + return 42; + }, 5, TimeUnit.MILLISECONDS); + + try { + waiter.await(1000); + fail(); + } catch (Throwable e) { + assertEquals(e.toString(), "java.io.IOException: OK! testInternalPool"); + } + assertTrue(sf.isDone()); + + try { + sf.get(); + fail(); + } catch (ExecutionException e) { + assertEquals(e.toString(), "java.util.concurrent.ExecutionException: java.io.IOException: OK! testInternalPool"); + } + } + + @Test + public void testExternalScheduler() throws TimeoutException, ExecutionException, InterruptedException{ + ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1); + DelegatingScheduler ds = new DelegatingScheduler(stpe, true); + + Waiter waiter = new Waiter(); + + ScheduledFuture sf1 = ds.schedule(()->{ + waiter.rethrow(new IOException("OK! fail 1")); + return 42; + }, 3, TimeUnit.SECONDS); + ScheduledFuture sf2 = ds.schedule(()->{ + waiter.rethrow(new IOException("OK! fail 2 fast")); + return 42; + }, 1, TimeUnit.SECONDS); + assertEquals(1, sf1.compareTo(sf2)); + assertEquals(0, sf1.compareTo(sf1)); + assertTrue(sf1.getDelay(TimeUnit.MILLISECONDS) > 2000); + + try { + waiter.await(3200); + fail(); + } catch (Throwable e) { + assertEquals(e.toString(), "java.io.IOException: OK! fail 2 fast"); + } + assertTrue(sf2.isDone()); + Thread.sleep(2500);//3-1 = 2 for slow sf1 + assertTrue(sf1.isDone()); + + try { + sf1.get(); + fail(); + } catch (ExecutionException e) { + assertEquals(e.toString(), "java.util.concurrent.ExecutionException: java.io.IOException: OK! fail 1"); + } + try { + sf2.get(); + fail(); + } catch (ExecutionException e) { + assertEquals(e.toString(), "java.util.concurrent.ExecutionException: java.io.IOException: OK! fail 2 fast"); + } + assertEquals(stpe.shutdownNow().size(), 0); + + assertEquals(-1, sf2.compareTo(sf1)); + } +} \ No newline at end of file From 9fe5b90adcd89c93fc9dbf9821bed141f65bd9b1 Mon Sep 17 00:00:00 2001 From: "a.fink" Date: Mon, 28 Nov 2022 23:11:33 +0300 Subject: [PATCH 5/6] Boss level: all fet is removed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ScheduledCompletableFuture doesn't have to keep `time`: it can simply call delegate ThreadFactory.newThread: setName can be done in ctor if → ? : --- .../internal/util/DelegatingScheduler.java | 42 +++---- .../util/DelegatingSchedulerTest.java | 108 ++++++++++++++++-- 2 files changed, 115 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index 5ca1e69e..aa36d88d 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -100,9 +100,8 @@ public LazyDelayerHolder(){ } @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r); + Thread t = new Thread(r, "FailsafeDelayScheduler"); t.setDaemon(true); - t.setName("FailsafeDelayScheduler"); return t; } } @@ -114,30 +113,23 @@ private static final class LazyForkJoinPoolHolder { null, true/*asyncMode*/); } - static final class ScheduledCompletableFuture extends CompletableFuture implements ScheduledFuture { // Guarded by this volatile Future delegate; // Guarded by this Thread forkJoinPoolThread; - private final long time; - - ScheduledCompletableFuture(long delay, TimeUnit unit) { - this.time = System.nanoTime() + unit.toNanos(delay); - } @Override - public long getDelay(TimeUnit unit) { - return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS); + public long getDelay(TimeUnit unit){ + Future f = delegate; + return f instanceof Delayed ? ((Delayed) f).getDelay(unit) + : 0; // we are executed now } @Override public int compareTo(Delayed other) { - if (other == this) { + if (other == this)// ScheduledFuture gives no extra info return 0; - } else if (other instanceof ScheduledCompletableFuture) { - return Long.compare(time, ((ScheduledCompletableFuture) other).time); - } return Long.compare(getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS)); } @@ -152,11 +144,10 @@ public boolean cancel(boolean mayInterruptIfRunning) { } return result; } - } + }//ScheduledCompletableFuture private ScheduledExecutorService delayer() { - return ((executorType & EX_SCHEDULED) == EX_SCHEDULED) - ? (ScheduledExecutorService) executorService() + return ((executorType & EX_SCHEDULED) == EX_SCHEDULED) ? (ScheduledExecutorService) executorService() : LazyDelayerHolder.DELAYER; } @@ -165,13 +156,11 @@ private ExecutorService executorService() { : LazyForkJoinPoolHolder.FORK_JOIN_POOL; } - @Override - @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>(delay, unit); - final Callable completingCallable; - if ((executorType & EX_FORK_JOIN) == EX_FORK_JOIN) {// but why? Other ExecutorServices also support cancellation - completingCallable = () -> { + ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>(); + final Callable completingCallable = (executorType & EX_FORK_JOIN) == EX_FORK_JOIN + ? () -> { try { // Guard against race with promise.cancel synchronized (promise) { @@ -186,9 +175,8 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit un } } return null; - }; - } else {// not forkJoin - completingCallable = () ->{ + }// else not ForkJoin BTW: but why? Other ExecutorServices also support cancellation + : () ->{ try { promise.complete(callable.call()); } catch (Throwable t) { @@ -196,7 +184,6 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit un } return null; }; - } if (delay <= 0) { promise.delegate = executorService().submit(completingCallable); @@ -232,7 +219,6 @@ else if ((executorType & EX_INTERNAL) == EX_INTERNAL) promise.delegate = es.submit(completingCallable); } }; - promise.delegate = delayer().schedule(r, delay, unit); return promise; } diff --git a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java index 875a9c6d..533a8b86 100644 --- a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java +++ b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java @@ -22,18 +22,23 @@ import java.io.IOException; import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; +import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; @Test @@ -50,7 +55,7 @@ public void shouldSchedule() throws Throwable { scheduler.schedule(() -> { waiter.resume(); return null; - }, delay.toMillis(), TimeUnit.MILLISECONDS); + }, delay.toMillis(), MILLISECONDS); // Then waiter.await(1000); @@ -60,11 +65,11 @@ public void shouldSchedule() throws Throwable { public void shouldWrapCheckedExceptions() { Asserts.assertThrows(() -> scheduler.schedule(() -> { throw new IOException(); - }, 1, TimeUnit.MILLISECONDS).get(), ExecutionException.class, IOException.class); + }, 1, MILLISECONDS).get(), ExecutionException.class, IOException.class); } public void shouldNotInterruptAlreadyDoneTask() throws Throwable { - Future future1 = scheduler.schedule(() -> null, 0, TimeUnit.MILLISECONDS); + Future future1 = scheduler.schedule(() -> null, 0, MILLISECONDS); Thread.sleep(100); assertFalse(future1.cancel(true)); } @@ -83,7 +88,7 @@ public void shouldClearInterruptFlagInForkJoinPoolThreads() throws Throwable { waiter.resume(); Thread.sleep(10000); return null; - }, 0, TimeUnit.MILLISECONDS); + }, 0, MILLISECONDS); waiter.await(1000); threadRef.get().interrupt(); @@ -92,7 +97,7 @@ public void shouldClearInterruptFlagInForkJoinPoolThreads() throws Throwable { waiter.assertFalse(Thread.currentThread().isInterrupted()); waiter.resume(); return null; - }, 0, TimeUnit.MILLISECONDS); + }, 0, MILLISECONDS); waiter.await(1000); } @@ -106,7 +111,7 @@ public void testInternalPool() throws TimeoutException, ExecutionException, Inte ScheduledFuture sf = ds.schedule(()->{ waiter.rethrow(new IOException("OK! testInternalPool")); return 42; - }, 5, TimeUnit.MILLISECONDS); + }, 5, MILLISECONDS); try { waiter.await(1000); @@ -141,7 +146,7 @@ public void testExternalScheduler() throws TimeoutException, ExecutionException, }, 1, TimeUnit.SECONDS); assertEquals(1, sf1.compareTo(sf2)); assertEquals(0, sf1.compareTo(sf1)); - assertTrue(sf1.getDelay(TimeUnit.MILLISECONDS) > 2000); + assertTrue(sf1.getDelay(MILLISECONDS) > 2000); try { waiter.await(3200); @@ -169,4 +174,93 @@ public void testExternalScheduler() throws TimeoutException, ExecutionException, assertEquals(-1, sf2.compareTo(sf1)); } + + + @Test + public void testScheduleAndWork() throws TimeoutException, ExecutionException, InterruptedException{ + Waiter w = new Waiter(); + ScheduledFuture sf1 = DelegatingScheduler.INSTANCE.schedule(()->{ + w.resume();// after ~ 1 sec + Thread.sleep(5000);//hard work + w.resume(); + return 42; + }, 1, TimeUnit.SECONDS); + + ScheduledFuture sf2 = DelegatingScheduler.INSTANCE.schedule(()->112, 3, TimeUnit.SECONDS); + + assertTrue(sf1.getDelay(MILLISECONDS) > 600); + assertTrue(sf2.getDelay(MILLISECONDS) > 2600); + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + w.await(1200, 1);// sf1 in normal executor + + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertTrue(sf2.getDelay(MILLISECONDS) > 1600); + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + w.await(5200, 1);// sf2 is done + + assertEquals(0, sf1.compareTo(sf2));// no more time info inside + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertEquals(sf2.getDelay(MILLISECONDS), 0); + assertTrue(sf1.isDone()); + assertTrue(sf2.isDone()); + assertEquals(42, sf1.get()); + assertEquals(112, sf2.get()); + } + + @Test + public void testScheduleAndCancel() throws TimeoutException, ExecutionException, InterruptedException{ + Waiter w = new Waiter(); + ScheduledFuture sf1 = DelegatingScheduler.INSTANCE.schedule(()->{ + w.resume();// after ~ 1 sec + Thread.sleep(5000);//hard work + w.resume(); + return 42; + }, 1, TimeUnit.SECONDS); + + ScheduledFuture sf2 = DelegatingScheduler.INSTANCE.schedule(()->112, 3, TimeUnit.SECONDS); + + assertTrue(sf1.getDelay(MILLISECONDS) > 600); + assertTrue(sf2.getDelay(MILLISECONDS) > 2600); + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + w.await(1200, 1);// sf1 in normal executor + + assertEquals(-1, sf1.compareTo(sf2));// 1 sec < 3 sec + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertTrue(sf2.getDelay(MILLISECONDS) > 1600); + assertFalse(sf1.isDone()); + assertFalse(sf2.isDone()); + + sf1.cancel(true); + sf2.cancel(true); + + assertEquals(-1, sf1.compareTo(sf2));// time info inside in sf2's delegate + assertEquals(sf1.getDelay(MILLISECONDS), 0); + assertTrue(sf2.getDelay(MILLISECONDS) > 1000); + assertTrue(sf1.isDone()); + assertTrue(sf2.isDone()); + assertTrue(sf1.isCancelled()); + assertTrue(sf2.isCancelled()); + CancellationException c1 = expectThrows(CancellationException.class, sf1::get); + CancellationException c2 = expectThrows(CancellationException.class, sf2::get); + DelegatingScheduler.ScheduledCompletableFuture scf1 = (DelegatingScheduler.ScheduledCompletableFuture) sf1; + DelegatingScheduler.ScheduledCompletableFuture scf2 = (DelegatingScheduler.ScheduledCompletableFuture) sf2; + + assertTrue(scf1.delegate instanceof ForkJoinTask);// was executing + ForkJoinTask task1 = (ForkJoinTask) scf1.delegate; + assertTrue(scf2.delegate instanceof RunnableScheduledFuture);// was in scheduler's delayQueue + RunnableScheduledFuture task2 = (RunnableScheduledFuture) scf2.delegate; + assertTrue(task1.isCompletedAbnormally()); + assertTrue(task1.isCancelled()); + assertTrue(task2.isCancelled()); + } + } \ No newline at end of file From 60f9b82392ca6148a4186977f1d0a67cd70a5f87 Mon Sep 17 00:00:00 2001 From: "a.fink" Date: Wed, 30 Nov 2022 20:20:46 +0300 Subject: [PATCH 6/6] -1 wrapper (Runnable is wrapped in Callable inside ExecutorService), so use Callable lambda instead of Runnable --- .../internal/util/DelegatingScheduler.java | 51 ++++++++----------- .../util/DelegatingSchedulerTest.java | 2 +- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java index aa36d88d..a521b2d8 100644 --- a/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java +++ b/core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java @@ -45,49 +45,41 @@ * @author Ben Manes */ public final class DelegatingScheduler implements Scheduler { - public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(); + public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(null,null); private final ExecutorService executorService; + private final ScheduledExecutorService scheduler; private final int executorType; private static final int EX_FORK_JOIN = 1; - private static final int EX_SCHEDULED = 2; private static final int EX_COMMON = 4; private static final int EX_INTERNAL = 8; - private DelegatingScheduler() { - this(null, false); - } - public DelegatingScheduler(ExecutorService executor) { - this(executor, false); + this(executor, null); } - public DelegatingScheduler(ExecutorService executor, boolean canUseScheduledExecutorService) { - final int type; + public DelegatingScheduler(ExecutorService executor, ScheduledExecutorService scheduler) { if (executor == null || executor == commonPool()) { if (ForkJoinPool.getCommonPoolParallelism() > 1) {// @see CompletableFuture#useCommonPool executorService = commonPool(); - type = EX_COMMON | EX_FORK_JOIN; + executorType = EX_COMMON | EX_FORK_JOIN; } else {// don't use commonPool(): cannot support parallelism executorService = null; - type = EX_INTERNAL | EX_FORK_JOIN; + executorType = EX_INTERNAL | EX_FORK_JOIN; } } else { executorService = executor; - type = executor instanceof ForkJoinPool ? EX_FORK_JOIN + executorType = executor instanceof ForkJoinPool ? EX_FORK_JOIN : 0; } - executorType = canUseScheduledExecutorService && executorService instanceof ScheduledExecutorService - ? type | EX_SCHEDULED - : type; + this.scheduler = scheduler; } DelegatingScheduler (byte flags) { - executorService = null; - executorType = flags; + executorService = null; executorType = flags; scheduler = null; }//new for tests private static final class LazyDelayerHolder extends ScheduledThreadPoolExecutor implements ThreadFactory { @@ -123,7 +115,7 @@ static final class ScheduledCompletableFuture extends CompletableFuture im public long getDelay(TimeUnit unit){ Future f = delegate; return f instanceof Delayed ? ((Delayed) f).getDelay(unit) - : 0; // we are executed now + : 0; // we are executing now } @Override @@ -147,7 +139,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { }//ScheduledCompletableFuture private ScheduledExecutorService delayer() { - return ((executorType & EX_SCHEDULED) == EX_SCHEDULED) ? (ScheduledExecutorService) executorService() + return scheduler != null ? scheduler : LazyDelayerHolder.DELAYER; } @@ -190,35 +182,36 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit un return promise; } - final ExecutorService es = executorService(); - final Runnable r;// use less memory: don't capture variable with commonPool + final Callable r;// use less memory: don't capture variable with commonPool if ((executorType & EX_COMMON) == EX_COMMON) - r = ()->{ - // Guard against race with promise.cancel + r = ()->{ // Guard against race with promise.cancel synchronized(promise) { if (!promise.isCancelled()) promise.delegate = commonPool().submit(completingCallable); } + return null; }; else if ((executorType & EX_INTERNAL) == EX_INTERNAL) - r = ()->{ - // Guard against race with promise.cancel + r = ()->{// Guard against race with promise.cancel synchronized(promise) { if (!promise.isCancelled()) promise.delegate = LazyForkJoinPoolHolder.FORK_JOIN_POOL.submit(completingCallable); } + return null; }; - else - r = ()->{ - // Guard against race with promise.cancel - synchronized(promise) { + else { + final ExecutorService es = executorService(); + r = ()->{// Guard against race with promise.cancel + synchronized(promise){ if (!promise.isCancelled()) promise.delegate = es.submit(completingCallable); } + return null; }; + } promise.delegate = delayer().schedule(r, delay, unit); return promise; } diff --git a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java index 533a8b86..795e450f 100644 --- a/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java +++ b/core/src/test/java/dev/failsafe/internal/util/DelegatingSchedulerTest.java @@ -132,7 +132,7 @@ public void testInternalPool() throws TimeoutException, ExecutionException, Inte @Test public void testExternalScheduler() throws TimeoutException, ExecutionException, InterruptedException{ ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1); - DelegatingScheduler ds = new DelegatingScheduler(stpe, true); + DelegatingScheduler ds = new DelegatingScheduler(stpe, stpe); Waiter waiter = new Waiter();