Skip to content

Commit

Permalink
Remove the skip-when-done in selects (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Feb 7, 2024
1 parent 8108958 commit 934b312
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 296 deletions.
9 changes: 0 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,6 @@ class Demo6 {
}
```

### Selecting from "done" channels

Receive clauses, for which channels are "done", will be skipped, and `select` with restart (as long as there are any
clauses left). This is motivated by the fact that a "done" channel is not in an error state, but signals that there are
no more values; while there might be more values available from other clauses.

Optionally, clauses created with `Channel.receiveOrDoneClause`, will cause `select` to throw/ return when the associated
channel is done, bypassing the behavior described above.

## Performance

The project includes benchmarks implemented using JMH - both for the `Channel`, as well as for some built-in Java
Expand Down
26 changes: 0 additions & 26 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -796,21 +796,6 @@ public SelectClause<T> receiveClause() {

@Override
public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
return receiveClause(callback, true);
}

@Override
public SelectClause<T> receiveOrDoneClause() {
//noinspection unchecked
return receiveOrDoneClause((Function<T, T>) IDENTITY);
}

@Override
public <U> SelectClause<U> receiveOrDoneClause(Function<T, U> callback) {
return receiveClause(callback, false);
}

private <U> SelectClause<U> receiveClause(Function<T, U> callback, boolean skipWhenDone) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
Expand All @@ -832,11 +817,6 @@ U transformedRawValue(Object rawValue) {
//noinspection unchecked
return callback.apply((T) rawValue);
}

@Override
boolean skipWhenDone() {
return skipWhenDone;
}
};
}

Expand Down Expand Up @@ -869,12 +849,6 @@ Object register(SelectInstance select) {
U transformedRawValue(Object rawValue) {
return callback.get();
}

@Override
boolean skipWhenDone() {
// sending to a done channel is probably wrong, skipping such channels is not allowed
return false;
}
};
}

Expand Down
26 changes: 0 additions & 26 deletions core/src/main/java/com/softwaremill/jox/CollectSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,6 @@ public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
});
}

@Override
public SelectClause<T> receiveOrDoneClause() {
return original.receiveOrDoneClause(v -> {
var t = f.apply(v);
if (t != null) {
return t;
} else {
//noinspection unchecked
return (T) RestartSelectMarker.RESTART;
}
});
}

@Override
public <U> SelectClause<U> receiveOrDoneClause(Function<T, U> callback) {
return original.receiveOrDoneClause(v -> {
var t = f.apply(v);
if (t != null) {
return callback.apply(t);
} else {
//noinspection unchecked
return (U) RestartSelectMarker.RESTART;
}
});
}

// delegates for closeable channel

@Override
Expand Down
69 changes: 8 additions & 61 deletions core/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ public class Select {
* first.
* <p>
* If no clauses are given, or all clauses become filtered out, throws {@link ChannelDoneException}.
* <p>
* If a receive clause is selected for a channel that is done, select restarts, unless the clause is created with
* {@link Source#receiveOrDoneClause()}.
*
* @param clauses The clauses, from which one will be selected. Not {@code null}.
* @return The value returned by the selected clause.
* @throws ChannelClosedException When any of the channels is closed (done or in error), and the select doesn't restart.
* @throws ChannelClosedException When any of the channels is closed (done or in error).
*/
@SafeVarargs
public static <U> U select(SelectClause<U>... clauses) throws InterruptedException {
Expand All @@ -73,51 +70,27 @@ public static <U> U select(SelectClause<U>... clauses) throws InterruptedExcepti
* first.
* <p>
* If no clauses are given, or all clauses become filtered out, returns {@link ChannelDone}.
* <p>
* If a receive clause is selected for a channel that is done, select restarts, unless the clause is created with
* {@link Source#receiveOrDoneClause()}.
*
* @param clauses The clauses, from which one will be selected. Not {@code null}.
* @return Either the value returned by the selected clause, or {@link ChannelClosed}, when any of the channels
* is closed (done or in error), and the select doesn't restart.
* is closed (done or in error).
*/
@SafeVarargs
public static <U> Object selectSafe(SelectClause<U>... clauses) throws InterruptedException {
var currentClauses = clauses;
while (true) {
if (currentClauses.length == 0) {
if (clauses.length == 0) {
// no clauses given, or all clauses were filtered out
return new ChannelDone();
}

var r = doSelectSafe(currentClauses);
var r = doSelectSafe(clauses);
//noinspection StatementWithEmptyBody
if (r == RestartSelectMarker.RESTART) {
// in case a `CollectSource` function filters out the element (the transformation function returns `null`,
// which is represented as a marker because `null` is a valid result of `doSelectSafe`, e.g. for send clauses),
// we need to restart the selection process

// next loop
} else if (r instanceof RestartSelectWithout rsw) {
// a channel is closed, for which there is a receive clause with skipWhenDone = true
// filtering out the done channel, and restarting the selection process

// as the channel reported that it's done, we know that there are no more pending elements to receive

// there will be one channel less
var newClauses = new SelectClause[currentClauses.length - 1];
var j = 0;
for (int i = 0; i < currentClauses.length; i++) {
SelectClause<U> currentClause = currentClauses[i];
if (currentClause.getChannel() != rsw.ch()) {
newClauses[j] = currentClause;
j += 1;
}
}
//noinspection unchecked
currentClauses = newClauses;

// next loop, with filtered clauses
} else {
return r;
}
Expand Down Expand Up @@ -171,7 +144,6 @@ class SelectInstance {
* - {@link ChannelClosed}
* - a {@link List} of clauses to re-register
* - when selected, {@link SelectClause} (during registration) or {@link StoredSelectClause} (with suspension)
* - {@link SkipBecauseDone}, to skip channels which are done, and are referenced through a receive clause (skipWhenDone=true)
*/
private final AtomicReference<Object> state = new AtomicReference<>(SelectState.REGISTERING);

Expand Down Expand Up @@ -206,9 +178,6 @@ <U> boolean register(SelectClause<U> clause) {
// keeping the stored select to later call cleanup()
storedClauses.add(ss);
return true;
} else if (result instanceof ChannelDone && clause.skipWhenDone()) {
state.set(new SkipBecauseDone(clause.getChannel()));
return false;
} else if (result instanceof ChannelClosed cc) {
// when setting the state, we might override another state:
// - a list of clauses to re-register - there's no point in doing that anyway (since the channel is closed)
Expand All @@ -230,7 +199,7 @@ <U> boolean register(SelectClause<U> clause) {

/**
* @return Either the value returned by the selected clause (which can include {@link RestartSelectMarker#RESTART}),
* {@link RestartSelectWithout}, or {@link ChannelClosed}, when any of the channels is closed.
* or {@link ChannelClosed}, when any of the channels is closed.
*/
Object checkStateAndWait() throws InterruptedException {
while (true) {
Expand Down Expand Up @@ -305,11 +274,6 @@ Object checkStateAndWait() throws InterruptedException {

// running the transformation at the end, after the cleanup is done, in case this throws any exceptions
return selectedClause.transformedRawValue(ss.getPayload());
} else if (currentState instanceof SkipBecauseDone s) {
// a channel which was referenced through a receive clause (skipWhenDone=true) became done - restarting without that clause

cleanup(null);
return new RestartSelectWithout(s.ch());
} else if (currentState instanceof ChannelClosed cc) {
cleanup(null);
return cc;
Expand Down Expand Up @@ -368,9 +332,6 @@ boolean trySelect(StoredSelectClause storedSelectClause) {
} else if (currentState == SelectState.INTERRUPTED) {
// already interrupted, will be cleaned up soon
return false;
} else if (currentState instanceof SkipBecauseDone) {
// closed, will be cleaned up soon (& restarted)
return false;
} else if (currentState instanceof ChannelClosed) {
// closed, will be cleaned up soon
return false;
Expand All @@ -381,24 +342,17 @@ boolean trySelect(StoredSelectClause storedSelectClause) {
}

void channelClosed(StoredSelectClause storedSelectClause, ChannelClosed channelClosed) {
Object targetState;
if (channelClosed instanceof ChannelDone && storedSelectClause.getClause().skipWhenDone()) {
targetState = new SkipBecauseDone(storedSelectClause.getClause().getChannel());
} else {
targetState = channelClosed;
}

while (true) {
var currentState = state.get();
if (currentState == SelectState.REGISTERING) {
// the channel closed state will be discovered when there's a call to `checkStateAndWait` after registration completes
if (state.compareAndSet(currentState, targetState)) {
if (state.compareAndSet(currentState, channelClosed)) {
return;
}
// else: CAS unsuccessful, retry
} else if (currentState instanceof List) {
// same as above
if (state.compareAndSet(currentState, targetState)) {
if (state.compareAndSet(currentState, channelClosed)) {
return;
}
// else: CAS unsuccessful, retry
Expand All @@ -409,17 +363,14 @@ void channelClosed(StoredSelectClause storedSelectClause, ChannelClosed channelC
// already selected
return;
} else if (currentState instanceof Thread t) {
if (state.compareAndSet(currentState, targetState)) {
if (state.compareAndSet(currentState, channelClosed)) {
LockSupport.unpark(t);
return;
}
// else: CAS unsuccessful, retry
} else if (currentState == SelectState.INTERRUPTED) {
// already interrupted
return;
} else if (currentState instanceof SkipBecauseDone) {
// already closed
return;
} else if (currentState instanceof ChannelClosed) {
// already closed
return;
Expand All @@ -435,8 +386,6 @@ enum SelectState {
INTERRUPTED
}

record SkipBecauseDone(Channel<?> ch) {}

//

/**
Expand Down Expand Up @@ -487,5 +436,3 @@ public void setPayload(Object payload) {
enum RestartSelectMarker {
RESTART
}

record RestartSelectWithout(Channel<?> ch) {}
7 changes: 0 additions & 7 deletions core/src/main/java/com/softwaremill/jox/SelectClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public abstract class SelectClause<T> {
* Might throw any exceptions that the provided transformation function throws.
*/
abstract T transformedRawValue(Object rawValue);

abstract boolean skipWhenDone();
}

class DefaultClause<T> extends SelectClause<T> {
Expand All @@ -48,11 +46,6 @@ Object register(SelectInstance select) {
T transformedRawValue(Object rawValue) {
return callback.get();
}

@Override
boolean skipWhenDone() {
return false; // no associated channel, this value is never used
}
}

/**
Expand Down
22 changes: 0 additions & 22 deletions core/src/main/java/com/softwaremill/jox/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,15 @@ public interface Source<T> extends CloseableChannel {
/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will restart with channels that are not done yet.
*/
SelectClause<T> receiveClause();

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel, and transform it using the provided {@code callback}.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will restart with channels that are not done yet.
*/
<U> SelectClause<U> receiveClause(Function<T, U> callback);

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will stop and throw {@link ChannelDoneException}
* or return a {@link ChannelDone} value (in the {@code safe} variant).
*/
SelectClause<T> receiveOrDoneClause();

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel, and transform it using the provided {@code callback}.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will stop and throw {@link ChannelDoneException}
* or return a {@link ChannelDone} value (in the {@code safe} variant).
*/
<U> SelectClause<U> receiveOrDoneClause(Function<T, U> callback);

//

/**
Expand Down
Loading

0 comments on commit 934b312

Please sign in to comment.