From 1c94b48b3a10ab83241a476313c5f80b8a09ca83 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 30 Sep 2022 15:14:12 -0700 Subject: [PATCH 1/3] Make `Single.concat(Publisher)` behavior consistent on cancel Motivation: Behavior of `Single.concat(Publisher)` is different compare to all other `concat` variants: `Single.concat(Completable)`, `Single.concat(Single)`, `Completable.concat(Completable), `Completable.concat(Single)`, `Completable.concat(Publisher)`, `Publisher.concat(Completable)`, `Publisher.concat(Single)`, `Publisher.concat(Publisher)`. It does not subscribe to the next source to propagate cancellation if `onSuccess` is delivered after `cancel`. Modifications: - Make tests for all `concat` valiants consistent in regards to cancellation; - Modify `Single.concat(Publisher)` behavior to subscribe in case `onSuccess` is delivered after `cancel`; Result: Behavior of `Single.concat(Publisher)` is consistent with all other `concat` variants. --- .../api/SingleConcatWithPublisher.java | 6 ++- .../concurrent/api/ConcatPublisherTest.java | 22 +++++++++ .../CompletableConcatWithCompletableTest.java | 14 ++++-- .../CompletableConcatWithPublisherTest.java | 1 + .../CompletableConcatWithSingleTest.java | 14 ++++-- .../PublisherConcatWithCompletableTest.java | 1 + .../PublisherConcatWithSingleTest.java | 1 + .../SingleConcatWithCompletableTest.java | 14 ++++-- .../single/SingleConcatWithPublisherTest.java | 47 ++++++++++--------- 9 files changed, 86 insertions(+), 34 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index df1e5a536a..dca14b1496 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -150,7 +150,11 @@ public void onSuccess(@Nullable final T result) { next.subscribeInternal(this); } break; - } else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { + } else if (oldValue == CANCELLED) { + // Subscribe to the next source to propagate cancellation + next.subscribeInternal(this); + break; + } else if (mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java index 86492f81fd..99f6ae37b7 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/ConcatPublisherTest.java @@ -21,8 +21,11 @@ import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; class ConcatPublisherTest { @@ -67,4 +70,23 @@ void testSecondEmitsError() { assertThat(subscriber.takeOnNext(2), contains("Hello1", "Hello2")); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } + + @Test + void sourceCancel() { + Publisher p = first.concat(second); + toSource(p).subscribe(subscriber); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); + subscriber.awaitSubscription().cancel(); + + TestSubscription firstSubscription = new TestSubscription(); + first.onSubscribe(firstSubscription); + assertThat("Source subscription not cancelled.", firstSubscription.isCancelled(), is(true)); + assertThat("Next source subscribed on cancellation.", second.isSubscribed(), is(false)); + first.onComplete(); + + TestSubscription secondSubscription = new TestSubscription(); + second.onSubscribe(secondSubscription); + assertThat("Next source not subscribed.", second.isSubscribed(), is(true)); + assertThat("Next cancellable not cancelled.", secondSubscription.isCancelled(), is(true)); + } } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java index 8ed046be8f..223c25c500 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithCompletableTest.java @@ -75,10 +75,16 @@ void testCancelSource() { toSource(source.concat(next)).subscribe(subscriber); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); - TestCancellable cancellable = new TestCancellable(); - source.onSubscribe(cancellable); - assertTrue(cancellable.isCancelled()); - assertFalse(next.isSubscribed()); + TestCancellable sourceCancellable = new TestCancellable(); + source.onSubscribe(sourceCancellable); + assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled."); + assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); + + source.onComplete(); + TestCancellable nextCancellable = new TestCancellable(); + next.onSubscribe(nextCancellable); + assertTrue(next.isSubscribed(), "Next source not subscribed."); + assertTrue(nextCancellable.isCancelled(), "Next source not cancelled."); } @Test diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java index de83733231..2e82112812 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithPublisherTest.java @@ -146,6 +146,7 @@ void cancelSource() { assertTrue(cancellable.isCancelled(), "Original completable not cancelled."); assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); triggerNextSubscribe(); + assertTrue(next.isSubscribed(), "Next source not subscribed."); assertTrue(subscription.isCancelled(), "Next source not cancelled."); } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java index 15ef15908e..4d9ff6ae04 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/completable/CompletableConcatWithSingleTest.java @@ -74,10 +74,16 @@ void testSourceError() { void testCancelSource() { assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); - TestCancellable cancellable = new TestCancellable(); - source.onSubscribe(cancellable); - assertTrue(cancellable.isCancelled()); - assertFalse(next.isSubscribed()); + TestCancellable sourceCancellable = new TestCancellable(); + source.onSubscribe(sourceCancellable); + assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled."); + assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); + + source.onComplete(); + TestCancellable nextCancellable = new TestCancellable(); + next.onSubscribe(nextCancellable); + assertTrue(next.isSubscribed(), "Next source not subscribed."); + assertTrue(nextCancellable.isCancelled(), "Next source not cancelled."); } @Test diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java index c945f742b9..ef5db65ce0 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithCompletableTest.java @@ -80,6 +80,7 @@ void nextError() { @Test void sourceCancel() { + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true)); assertThat("Next source subscribed on cancellation.", completable.isSubscribed(), is(false)); diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java index 845f326ace..cd5d0622d7 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/publisher/PublisherConcatWithSingleTest.java @@ -127,6 +127,7 @@ void nextError() { @Test void sourceCancel() { + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true)); assertThat("Next source subscribed on cancellation.", single.isSubscribed(), is(false)); diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java index 0c18226011..bd6a112af6 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithCompletableTest.java @@ -74,10 +74,16 @@ void testSourceError() { void testCancelSource() { assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); subscriber.awaitSubscription().cancel(); - TestCancellable cancellable = new TestCancellable(); - source.onSubscribe(cancellable); - assertTrue(cancellable.isCancelled()); - assertFalse(next.isSubscribed()); + TestCancellable sourceCancellable = new TestCancellable(); + source.onSubscribe(sourceCancellable); + assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled."); + assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly."); + + source.onSuccess(1); + TestCancellable nextCancellable = new TestCancellable(); + next.onSubscribe(nextCancellable); + assertTrue(next.isSubscribed(), "Next source not subscribed."); + assertTrue(nextCancellable.isCancelled(), "Next source not cancelled."); } @Test diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index cd5bab845b..450a223b3c 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -86,7 +86,7 @@ private static Collection onNextErrorPropagatedParams() { return args; } - @ParameterizedTest(name = "deferSubscribe={0} requestN={1} singleCompletesFirst={2}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0} requestN={1} singleCompletesFirst={2}") @MethodSource("onNextErrorPropagatedParams") void onNextErrorPropagated(boolean deferSubscribe, long n, boolean singleCompletesFirst) { toSource(source.concat(next, deferSubscribe).map(x -> { @@ -105,7 +105,7 @@ void onNextErrorPropagated(boolean deferSubscribe, long n, boolean singleComplet assertThat(next.isSubscribed(), is(false)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void bothCompletion(boolean deferSubscribe) { setUp(deferSubscribe); @@ -119,7 +119,7 @@ void bothCompletion(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void sourceCompletionNextError(boolean deferSubscribe) { setUp(deferSubscribe); @@ -130,7 +130,7 @@ void sourceCompletionNextError(boolean deferSubscribe) { assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @ParameterizedTest(name = "deferSubscribe={0} invalidRequestN={1}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0} invalidRequestN={1}") @MethodSource("invalidRequestN") void invalidRequestNBeforeNextSubscribe(boolean deferSubscribe, long invalidRequestN) { setUp(deferSubscribe); @@ -139,7 +139,7 @@ void invalidRequestNBeforeNextSubscribe(boolean deferSubscribe, long invalidRequ assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void invalidRequestNWithInlineSourceCompletion(boolean deferSubscribe) { toSource(succeeded(1).concat(empty(), deferSubscribe)).subscribe(subscriber); @@ -147,7 +147,7 @@ void invalidRequestNWithInlineSourceCompletion(boolean deferSubscribe) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void invalidRequestAfterNextSubscribe(boolean deferSubscribe) { setUp(deferSubscribe); @@ -156,7 +156,7 @@ void invalidRequestAfterNextSubscribe(boolean deferSubscribe) { assertThat("Invalid request-n not propagated.", subscription.requested(), is(lessThan(0L))); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void multipleInvalidRequest(boolean deferSubscribe) { setUp(deferSubscribe); @@ -165,7 +165,7 @@ void multipleInvalidRequest(boolean deferSubscribe) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0} invalidRequestN={1}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0} invalidRequestN={1}") @MethodSource("invalidRequestN") void invalidThenValidRequest(boolean deferSubscribe, long invalidRequestN) { setUp(deferSubscribe); @@ -175,7 +175,7 @@ void invalidThenValidRequest(boolean deferSubscribe, long invalidRequestN) { assertThat(cancellable.isCancelled(), is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void request0PropagatedAfterSuccess(boolean deferSubscribe) { setUp(deferSubscribe); @@ -189,7 +189,7 @@ void request0PropagatedAfterSuccess(boolean deferSubscribe) { is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void sourceError(boolean deferSubscribe) { setUp(deferSubscribe); @@ -198,7 +198,7 @@ void sourceError(boolean deferSubscribe) { assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void cancelSource(boolean deferSubscribe) { setUp(deferSubscribe); @@ -206,9 +206,14 @@ void cancelSource(boolean deferSubscribe) { subscriber.awaitSubscription().cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); + if (!deferSubscribe) { + triggerNextSubscribe(false); + assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); + assertThat("Next source not cancelled.", cancellable.isCancelled(), is(true)); + } } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void cancelSourcePostRequest(boolean deferSubscribe) { setUp(deferSubscribe); @@ -219,7 +224,7 @@ void cancelSourcePostRequest(boolean deferSubscribe) { assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void cancelNext(boolean deferSubscribe) { setUp(deferSubscribe); @@ -230,7 +235,7 @@ void cancelNext(boolean deferSubscribe) { assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void zeroIsNotRequestedOnTransitionToSubscription(boolean deferSubscribe) { setUp(deferSubscribe); @@ -249,7 +254,7 @@ void zeroIsNotRequestedOnTransitionToSubscription(boolean deferSubscribe) { } } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { source = new TestSingle<>(); @@ -276,7 +281,7 @@ void publisherSubscribeBlockDemandMakesProgress(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void onErrorAfterInvalidRequestN(boolean deferSubscribe) { setUp(deferSubscribe); @@ -297,7 +302,7 @@ void onErrorAfterInvalidRequestN(boolean deferSubscribe) { subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void singleCompletesWithNull(boolean deferSubscribe) { setUp(deferSubscribe); @@ -310,7 +315,7 @@ void singleCompletesWithNull(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void demandAccumulatedBeforeSingleCompletes(boolean deferSubscribe) { setUp(deferSubscribe); @@ -330,7 +335,7 @@ void demandAccumulatedBeforeSingleCompletes(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void requestOneThenMore(boolean deferSubscribe) { setUp(deferSubscribe); @@ -348,7 +353,7 @@ void requestOneThenMore(boolean deferSubscribe) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void reentryWithMoreDemand(boolean deferSubscribe) { List emitted = new ArrayList<>(); @@ -385,7 +390,7 @@ public void onComplete() { assertThat(completed[0], is(true)); } - @ParameterizedTest(name = "deferSubscribe={0}") + @ParameterizedTest(name = "{displayName} [{index}] deferSubscribe={0}") @ValueSource(booleans = {false, true}) void cancelledDuringFirstOnNext(boolean deferSubscribe) { List emitted = new ArrayList<>(); From 4a5a7e08a2deaa15bcd23f2160b25aa4182e5a12 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Mon, 3 Oct 2022 14:54:43 -0700 Subject: [PATCH 2/3] Improve ParameterizedTest names --- .../single/SingleConcatWithPublisherTest.java | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index c584f16718..fd7cbb1480 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -112,10 +112,9 @@ private static Collection onNextErrorPropagatedParams() { return args; } - @ParameterizedTest(name = "mode={0} requestN={2} singleCompletesFirst={3}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} requestN={1} singleCompletesFirst={2}") @MethodSource("onNextErrorPropagatedParams") - void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst) - throws Exception { + void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst) throws Exception { toSource((mode == PROPAGATE_CANCEL ? source.concatPropagateCancel(next) : source.concat(next, mode == DEFER_SUBSCRIBE)) @@ -141,7 +140,7 @@ void onNextErrorPropagated(ConcatMode mode, long n, boolean singleCompletesFirst } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void bothCompletion(ConcatMode mode) { setUp(mode); @@ -155,7 +154,7 @@ void bothCompletion(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void sourceCompletionNextError(ConcatMode mode) { setUp(mode); @@ -166,7 +165,7 @@ void sourceCompletionNextError(ConcatMode mode) { assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } - @ParameterizedTest(name = "mode={0} invalidRequestN={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") void invalidRequestNBeforeNextSubscribe(ConcatMode mode, long invalidRequestN) { setUp(mode); @@ -175,7 +174,7 @@ void invalidRequestNBeforeNextSubscribe(ConcatMode mode, long invalidRequestN) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void invalidRequestNWithInlineSourceCompletion(ConcatMode mode) { toSource(mode == PROPAGATE_CANCEL ? @@ -185,7 +184,7 @@ void invalidRequestNWithInlineSourceCompletion(ConcatMode mode) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void invalidRequestAfterNextSubscribe(ConcatMode mode) { setUp(mode); @@ -194,7 +193,7 @@ void invalidRequestAfterNextSubscribe(ConcatMode mode) { assertThat("Invalid request-n not propagated.", subscription.requested(), is(lessThan(0L))); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void multipleInvalidRequest(ConcatMode mode) { setUp(mode); @@ -203,7 +202,7 @@ void multipleInvalidRequest(ConcatMode mode) { assertThat(subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0} invalidRequestN={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} invalidRequestN={1}") @MethodSource("invalidRequestN") void invalidThenValidRequest(ConcatMode mode, long invalidRequestN) { setUp(mode); @@ -213,7 +212,7 @@ void invalidThenValidRequest(ConcatMode mode, long invalidRequestN) { assertThat(cancellable.isCancelled(), is(true)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void request0PropagatedAfterSuccess(ConcatMode mode) { setUp(mode); @@ -227,7 +226,7 @@ void request0PropagatedAfterSuccess(ConcatMode mode) { is(true)); } - @ParameterizedTest(name = "mode={0} error={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} error={1}") @MethodSource("modeAndError") void sourceError(ConcatMode mode, boolean error) throws InterruptedException { setUp(mode); @@ -249,7 +248,7 @@ void sourceError(ConcatMode mode, boolean error) throws InterruptedException { } } - @ParameterizedTest(name = "mode={0} error={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} error={1}") @MethodSource("modeAndError") void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { setUp(mode); @@ -299,7 +298,7 @@ void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void cancelSourcePostRequest(ConcatMode mode) { setUp(mode); @@ -310,7 +309,7 @@ void cancelSourcePostRequest(ConcatMode mode) { assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(mode == PROPAGATE_CANCEL)); } - @ParameterizedTest(name = "mode={0} error={1}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0} error={1}") @MethodSource("modeAndError") void cancelNext(ConcatMode mode, boolean error) { setUp(mode); @@ -330,7 +329,7 @@ void cancelNext(ConcatMode mode, boolean error) { } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void zeroIsNotRequestedOnTransitionToSubscription(ConcatMode mode) { setUp(mode); @@ -349,7 +348,7 @@ void zeroIsNotRequestedOnTransitionToSubscription(ConcatMode mode) { } } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void publisherSubscribeBlockDemandMakesProgress(ConcatMode mode) { source = new TestSingle<>(); @@ -376,7 +375,7 @@ void publisherSubscribeBlockDemandMakesProgress(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void onErrorAfterInvalidRequestN(ConcatMode mode) { setUp(mode); @@ -397,7 +396,7 @@ void onErrorAfterInvalidRequestN(ConcatMode mode) { subscriber.awaitOnError(), instanceOf(IllegalArgumentException.class)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void singleCompletesWithNull(ConcatMode mode) { setUp(mode); @@ -410,7 +409,7 @@ void singleCompletesWithNull(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void demandAccumulatedBeforeSingleCompletes(ConcatMode mode) { setUp(mode); @@ -430,7 +429,7 @@ void demandAccumulatedBeforeSingleCompletes(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void requestOneThenMore(ConcatMode mode) { setUp(mode); @@ -448,7 +447,7 @@ void requestOneThenMore(ConcatMode mode) { subscriber.awaitOnComplete(); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void reentryWithMoreDemand(ConcatMode mode) { List emitted = new ArrayList<>(); @@ -487,7 +486,7 @@ public void onComplete() { assertThat(completed[0], is(true)); } - @ParameterizedTest(name = "mode={0}") + @ParameterizedTest(name = "{displayName} [{index}] mode={0}") @EnumSource(ConcatMode.class) void cancelledDuringFirstOnNext(ConcatMode mode) { List emitted = new ArrayList<>(); From 3eed6c7cd38737b5ab8f4bea508dd73f12bfce85 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Mon, 3 Oct 2022 16:59:24 -0700 Subject: [PATCH 3/3] Improve cancelSource test --- .../api/SingleConcatWithPublisher.java | 31 ++++++++++++++----- .../single/SingleConcatWithPublisherTest.java | 29 ++++++++++------- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java index 3612973563..e65e18da80 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleConcatWithPublisher.java @@ -56,13 +56,12 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell */ static final Object INITIAL = new Object(); /** - * If {@link #cancel()} is called, or the terminal signal was already - * delivered. + * If {@link #cancel()} is called after subscrubed to the next Publisher. */ static final Object CANCELLED = new Object(); /** - * Cancelled after {@link #onSuccess(Object)}, the first call to request(n) is invalid, or terminal signal - * received (prevents duplicate terminals). + * If {@link #cancel()} is called before subscribed to the next Publisher, the first call to request(n) is + * invalid, or terminal signal received (prevents duplicate terminals). */ static final Object TERMINAL = new Object(); /** @@ -91,6 +90,8 @@ private abstract static class AbstractConcatSubscriber extends DelayedCancell this.propagateCancel = propagateCancel; } + abstract boolean deferSubscribe(); + @Override public final void onSubscribe(final Cancellable cancellable) { delayedCancellable(cancellable); @@ -143,7 +144,7 @@ private void onErrorPropagateCancel(Throwable t) { @Override public final void onComplete() { - if (propagateCancel) { + if (propagateCancel || !deferSubscribe()) { onCompletePropagateCancel(); } else { target.onComplete(); @@ -208,7 +209,7 @@ final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) { } } - private boolean finallyShouldSubscribeToNext(@Nullable Object oldState) { + private static boolean finallyShouldSubscribeToNext(@Nullable Object oldState) { return oldState != PUBLISHER_SUBSCRIBED; } @@ -243,6 +244,11 @@ private static final class ConcatSubscriber extends AbstractConcatSubscriber< super(target, next, propagateCancel); } + @Override + boolean deferSubscribe() { + return false; + } + @Override public void onSuccess(@Nullable final T result) { for (;;) { @@ -255,8 +261,12 @@ public void onSuccess(@Nullable final T result) { } break; } - } else if (oldValue == CANCELLED || oldValue == TERMINAL || - mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { + } else if (oldValue == TERMINAL) { + // This may happen only if returned Publisher was cancelled before Single terminates, subscribe to + // the next source to propagate cancellation. + next.subscribeInternal(this); + break; + } else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) { break; } } @@ -328,6 +338,11 @@ private static final class ConcatDeferNextSubscriber extends AbstractConcatSu super(target, next, propagateCancel); } + @Override + boolean deferSubscribe() { + return true; + } + @Override public void onSuccess(@Nullable final T result) { for (;;) { diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java index fd7cbb1480..33658e5a65 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/single/SingleConcatWithPublisherTest.java @@ -253,25 +253,20 @@ void sourceError(ConcatMode mode, boolean error) throws InterruptedException { void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { setUp(mode); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); - Subscription subscription1 = subscriber.awaitSubscription(); - subscription1.request(2); - subscription1.cancel(); + subscriber.awaitSubscription().cancel(); assertThat("Original single not cancelled.", cancellable.isCancelled(), is(true)); - if (error) { - source.onError(DELIBERATE_EXCEPTION); - } else { - source.onSuccess(1); - } - if (mode == PROPAGATE_CANCEL) { - next.awaitSubscribed(); + assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); next.onSubscribe(subscription); - subscription.awaitCancelled(); + assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); if (error) { + source.onError(DELIBERATE_EXCEPTION); next.onError(new DeliberateException()); } else { + subscriber.awaitSubscription().request(1); + source.onSuccess(1); next.onComplete(); } @@ -281,16 +276,26 @@ void cancelSource(ConcatMode mode, boolean error) throws InterruptedException { } else { assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); assertThat("Next source cancelled unexpectedly.", subscription.isCancelled(), is(false)); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); if (error) { + source.onError(DELIBERATE_EXCEPTION); + assertThat("Next source subscribed unexpectedly.", next.isSubscribed(), is(false)); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } else { + subscriber.awaitSubscription().request(mode == DEFER_SUBSCRIBE ? 2 : 1); + source.onSuccess(1); if (mode == CONCAT) { - triggerNextSubscribe(mode); assertThat("Next source not subscribed.", next.isSubscribed(), is(true)); + next.onSubscribe(subscription); assertThat("Next source not cancelled.", subscription.isCancelled(), is(true)); next.onComplete(); + } else { + assertThat("Next source not subscribed.", next.isSubscribed(), is(false)); + assertThat("Next source not cancelled.", subscription.isCancelled(), is(false)); } + // Demand is not expected to propagate after cancel. + assertThat("Unexpected next items.", subscriber.pollAllOnNext(), Matchers.empty()); // It is not required that no terminal is delivered after cancel but verifies the current implementation // for thread safety on the subscriber and to avoid duplicate terminals. assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));