Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Single.concat(Publisher) behavior consistent on cancel #2383

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ private abstract static class AbstractConcatSubscriber<T> extends DelayedCancell
*/
static final Object INITIAL = new Object();
/**
* If {@link #cancel()} is called, the first call to request(n) is invalid, or the terminal signal was already
* delivered.
* If {@link #cancel()} is called after subscrubed to the next Publisher.
*/
static final Object CANCELLED = new Object();
/**
* Cancelled after {@link #onSuccess(Object)} or terminal signal received (prevents duplicate terminals).
* If {@link #cancel()} is called before subscribed to the next Publisher, the first call to request(n) is
* invalid, or terminal signal received (prevents duplicate terminals).
*/
static final Object TERMINAL = new Object();
/**
Expand Down Expand Up @@ -90,6 +90,8 @@ private abstract static class AbstractConcatSubscriber<T> extends DelayedCancell
this.propagateCancel = propagateCancel;
}

abstract boolean deferSubscribe();

@Override
public final void onSubscribe(final Cancellable cancellable) {
delayedCancellable(cancellable);
Expand Down Expand Up @@ -142,7 +144,7 @@ private void onErrorPropagateCancel(Throwable t) {

@Override
public final void onComplete() {
if (propagateCancel) {
if (propagateCancel || !deferSubscribe()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't expecting the changes described in this PR to be made here. for example I was expecting the SingleSubscriber.onSuccess(..) methods such that we subscribe eve when completed if CANCELLED (perhaps conditionally?)

thing to watch out for is if propagateCancel we may subscribe to both sources concurrently, and mayBeResultUpdater is used to ensure we only terminate a single time.

can we make the termination probation (on error / on complete) is consistent.

onCompletePropagateCancel();
} else {
target.onComplete();
Expand Down Expand Up @@ -207,7 +209,7 @@ final boolean tryEmitSingleSuccessToTarget(@Nullable final T result) {
}
}

private boolean finallyShouldSubscribeToNext(@Nullable Object oldState) {
private static boolean finallyShouldSubscribeToNext(@Nullable Object oldState) {
return oldState != PUBLISHER_SUBSCRIBED;
}

Expand Down Expand Up @@ -242,6 +244,11 @@ private static final class ConcatSubscriber<T> extends AbstractConcatSubscriber<
super(target, next, propagateCancel);
}

@Override
boolean deferSubscribe() {
return false;
}

@Override
public void onSuccess(@Nullable final T result) {
for (;;) {
Expand All @@ -254,8 +261,12 @@ public void onSuccess(@Nullable final T result) {
}
break;
}
} else if (oldValue == CANCELLED || oldValue == TERMINAL ||
mayBeResultUpdater.compareAndSet(this, INITIAL, result)) {
} else if (oldValue == TERMINAL) {
// This may happen only if returned Publisher was cancelled before Single terminates, subscribe to
// the next source to propagate cancellation.
next.subscribeInternal(this);
break;
} else if (oldValue == CANCELLED || mayBeResultUpdater.compareAndSet(this, INITIAL, result)) {
break;
}
}
Expand Down Expand Up @@ -327,6 +338,11 @@ private static final class ConcatDeferNextSubscriber<T> extends AbstractConcatSu
super(target, next, propagateCancel);
}

@Override
boolean deferSubscribe() {
return true;
}

@Override
public void onSuccess(@Nullable final T result) {
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

class ConcatPublisherTest {
Expand Down Expand Up @@ -67,4 +70,23 @@ void testSecondEmitsError() {
assertThat(subscriber.takeOnNext(2), contains("Hello1", "Hello2"));
assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
}

@Test
void sourceCancel() {
Publisher<String> p = first.concat(second);
toSource(p).subscribe(subscriber);
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
subscriber.awaitSubscription().cancel();

TestSubscription firstSubscription = new TestSubscription();
first.onSubscribe(firstSubscription);
assertThat("Source subscription not cancelled.", firstSubscription.isCancelled(), is(true));
assertThat("Next source subscribed on cancellation.", second.isSubscribed(), is(false));
first.onComplete();

TestSubscription secondSubscription = new TestSubscription();
second.onSubscribe(secondSubscription);
assertThat("Next source not subscribed.", second.isSubscribed(), is(true));
assertThat("Next cancellable not cancelled.", secondSubscription.isCancelled(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ void testCancelSource() {
toSource(source.concat(next)).subscribe(subscriber);
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
subscriber.awaitSubscription().cancel();
TestCancellable cancellable = new TestCancellable();
source.onSubscribe(cancellable);
assertTrue(cancellable.isCancelled());
assertFalse(next.isSubscribed());
TestCancellable sourceCancellable = new TestCancellable();
source.onSubscribe(sourceCancellable);
assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled.");
assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly.");

source.onComplete();
TestCancellable nextCancellable = new TestCancellable();
next.onSubscribe(nextCancellable);
assertTrue(next.isSubscribed(), "Next source not subscribed.");
assertTrue(nextCancellable.isCancelled(), "Next source not cancelled.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ void cancelSource() {
assertTrue(cancellable.isCancelled(), "Original completable not cancelled.");
assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly.");
triggerNextSubscribe();
assertTrue(next.isSubscribed(), "Next source not subscribed.");
assertTrue(subscription.isCancelled(), "Next source not cancelled.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,16 @@ void testSourceError() {
void testCancelSource() {
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
subscriber.awaitSubscription().cancel();
TestCancellable cancellable = new TestCancellable();
source.onSubscribe(cancellable);
assertTrue(cancellable.isCancelled());
assertFalse(next.isSubscribed());
TestCancellable sourceCancellable = new TestCancellable();
source.onSubscribe(sourceCancellable);
assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled.");
assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly.");

source.onComplete();
TestCancellable nextCancellable = new TestCancellable();
next.onSubscribe(nextCancellable);
assertTrue(next.isSubscribed(), "Next source not subscribed.");
assertTrue(nextCancellable.isCancelled(), "Next source not cancelled.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ void nextError(boolean propagateCancel) {
@CsvSource(value = {"false,false", "false,true", "true,false", "true,true"})
void sourceCancel(boolean propagateCancel, boolean onError) {
setup(propagateCancel);
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
subscriber.awaitSubscription().cancel();
assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true));
assertThat(completable.isSubscribed(), is(propagateCancel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void nextError() {

@Test
void sourceCancel() {
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
subscriber.awaitSubscription().cancel();
assertThat("Source subscription not cancelled.", subscription.isCancelled(), is(true));
assertThat("Next source subscribed on cancellation.", single.isSubscribed(), is(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,16 @@ void testSourceError() {
void testCancelSource() {
assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
subscriber.awaitSubscription().cancel();
TestCancellable cancellable = new TestCancellable();
source.onSubscribe(cancellable);
assertTrue(cancellable.isCancelled());
assertFalse(next.isSubscribed());
TestCancellable sourceCancellable = new TestCancellable();
source.onSubscribe(sourceCancellable);
assertTrue(sourceCancellable.isCancelled(), "Original completable not cancelled.");
assertFalse(next.isSubscribed(), "Next source subscribed unexpectedly.");

source.onSuccess(1);
TestCancellable nextCancellable = new TestCancellable();
next.onSubscribe(nextCancellable);
assertTrue(next.isSubscribed(), "Next source not subscribed.");
assertTrue(nextCancellable.isCancelled(), "Next source not cancelled.");
}

@Test
Expand Down
Loading