Skip to content

Commit

Permalink
Implement isClosedForSend / isClosedForReceive (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Feb 9, 2024
1 parent dbdb5a0 commit 782ddb9
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 65 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ the sink processes the received values.
use the `sendSafe()` and `receiveSafe()` methods, which return either a `ChannelClosed` value (reason of closure),
or `null` / the received value.

Channels can also be inspected whether they are closed, using the `isClosed()`, `isDone()` and `isError()` methods.
Channels can also be inspected whether they are closed, using the `isClosedForReceive()` and `isClosedForSend()`.

```java
import com.softwaremill.jox.Channel;
Expand Down
101 changes: 89 additions & 12 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
while (true) {
var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically

if (state == null || state == IN_BUFFER) { // means that state == null || state == IN_BUFFER
if (state == null || state == IN_BUFFER) {
if (r >= getSendersCounter(sendersAndClosedFlag.get())) { // reading the sender's counter
if (select != null) {
// cell is empty, no sender, and we are in a select -> store the select instance
Expand Down Expand Up @@ -763,25 +763,102 @@ private void updateCellClose(Segment segment, int i) {
}

@Override
public boolean isClosed() {
return closedReason.get() != null;
public ChannelClosed closedForSend() {
return isClosed(sendersAndClosedFlag.get()) ? closedReason.get() : null;
}

@Override
public boolean isDone() {
return closedReason.get() instanceof ChannelDone;
}

@Override
public Throwable isError() {
var reason = closedReason.get();
if (reason instanceof ChannelError e) {
return e.cause();
public ChannelClosed closedForReceive() {
if (isClosed(sendersAndClosedFlag.get())) {
var cr = closedReason.get(); // cannot be null
if (cr instanceof ChannelError) {
return cr;
} else {
// channel is done, checking if there's anything left to receive
return hasValuesToReceive() ? null : cr;
}
} else {
return null;
}
}

private boolean hasValuesToReceive() {
while (true) {
// reading the segment before the counter - this is needed to find the required segment later
var segment = receiveSegment.get();
// r is the cell which will be used by the next receive
var r = receivers.get();
var s = getSendersCounter(sendersAndClosedFlag.get());

if (s <= r) {
// for sure, nothing is buffered / no senders are waiting
return false;
}

// calculating the segment id and the index within the segment
var id = r / Segment.SEGMENT_SIZE;
var i = (int) (r % Segment.SEGMENT_SIZE);

// check if `receiveSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(receiveSegment, segment, id);
if (segment == null) {
// the channel has been closed, r points to a segment which doesn't exist
return false;
}

// if we still have another segment, the segment must have been removed
if (segment.getId() != id) {
// skipping all interrupted cells, and trying with a new one
receivers.compareAndSet(r, segment.getId() * Segment.SEGMENT_SIZE);
continue;
}
}

// we know that s > r
segment.cleanPrev();

if (hasValueToReceive(segment, i, r)) {
return true;
} else {
// nothing to receive, we can (try to, if not already done) bump the counter and try again
receivers.compareAndSet(r, r + 1);
}
}
}

private boolean hasValueToReceive(Segment segment, int i, long r) {
while (true) {
var state = segment.getCell(i); // reading the current state of the cell

if (state == null || state == IN_BUFFER) {
// this can only happen if a sender is in progress (we checked before that s > r)
// waiting what the sender is going to do -> repeat
Thread.onSpinWait();
} else if (state instanceof Continuation c) {
// a receiver might have gotten suspended while hasValuesToReceive() is running - then, no value to receive here & the r counter is updated.
return c.isSender();
} else if (state instanceof StoredSelectClause ss) {
return ss.isSender(); // as above
} else if (state instanceof Buffered) {
return true;
} else if (state == INTERRUPTED_SEND || state == INTERRUPTED_RECEIVE) {
// cell interrupted -> nothing to receive; in case of an interrupted receiver, the counter is already updated
return false;
} else if (state == RESUMING) {
// receive() or expandBuffer() is resuming the sender -> repeat
Thread.onSpinWait();
} else if (state == CLOSED) {
return false;
} else if (state == DONE || state == BROKEN) {
// a concurrent receiver already finished / poisoned the cell
return false;
} else {
throw new IllegalStateException("Unexpected state: " + state);
}
}
}

// **************
// Select clauses
// **************
Expand Down
43 changes: 32 additions & 11 deletions core/src/main/java/com/softwaremill/jox/CloseableChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,44 @@ public interface CloseableChannel {
//

/**
* @return {@code true} if the channel is closed using {@link #done()} or {@link #error(Throwable)}.
* When closed, {@link Sink#send(Object)} will throw {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant),
* while {@link Source#receive()} might return values, if some are still not received (if the channel is done, not in an error).
* @return {@code true} if no more values can be sent to this channel; {@link Sink#send(Object)} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* <p>
* When closed for send, receiving using {@link Source#receive()} might still be possible, if the channel is done,
* and not in an error. This can be verified using {@link #isClosedForReceive()}.
*/
default boolean isClosedForSend() {
return closedForSend() != null;
}

/**
* @return {@code true} if no more values can be received from this channel; {@link Source#receive()} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* <p>
* When closed for receive, sending values is also not possible, {@link #isClosedForSend()} will return {@code true}.
*/
boolean isClosed();
default boolean isClosedForReceive() {
return closedForReceive() != null;
}

/**
* @return {@code true} if the channel is closed using {@link #done()}. {@code false} if it's not closed, or closed with an error.
* When done, {@link Sink#send(Object)} will throw {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant),
* while {@link Source#receive()} might return values, if some are still not received.
* @return Non-{@code null} if no more values can be sent to this channel; {@link Sink#send(Object)} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* <p>
* {@code null} if the channel is not closed, and values can be sent.
* <p>
* When closed for send, receiving using {@link Source#receive()} might still be possible, if the channel is done,
* and not in an error. This can be verified using {@link #isClosedForReceive()}.
*/
boolean isDone();
ChannelClosed closedForSend();

/**
* @return {@code null} if the channel is not closed, or if it's closed with {@link ChannelDone}.
* When the channel is in an error, {@link Sink#send(Object)} and {@link Source#receive()} will always throw
* @return Non-{@code null} if no more values can be received from this channel; {@link Source#receive()} will throw
* {@link ChannelClosedException} or return {@link ChannelClosed} (in the safe variant).
* <p>
* {@code null} if the channel is not closed, and values can be received.
* <p>
* When closed for receive, sending values is also not possible, {@link #isClosedForSend()} will return {@code true}.
*/
Throwable isError();
ChannelClosed closedForReceive();
}
13 changes: 4 additions & 9 deletions core/src/main/java/com/softwaremill/jox/CollectSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,12 @@ public Object errorSafe(Throwable reason) {
}

@Override
public boolean isClosed() {
return original.isClosed();
public ChannelClosed closedForSend() {
return original.closedForSend();
}

@Override
public boolean isDone() {
return original.isDone();
}

@Override
public Throwable isError() {
return original.isError();
public ChannelClosed closedForReceive() {
return original.closedForReceive();
}
}
92 changes: 92 additions & 0 deletions core/src/test/java/com/softwaremill/jox/ChannelClosedTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.softwaremill.jox;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;

import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.*;

public class ChannelClosedTest {
@Test
void testClosed_noValues_whenError() {
// given
Channel<Integer> c = new Channel<>();

// when
c.error(new RuntimeException());

// then
assertTrue(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
}

@Test
void testClosed_noValues_whenDone() {
// given
Channel<Integer> c = new Channel<>();

// when
c.done();

// then
assertTrue(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
}

@Test
void testClosed_hasSuspendedValues_whenDone() throws InterruptedException, ExecutionException {
// given
Channel<Integer> c = new Channel<>();

// when
scoped(scope -> {
var f = forkCancelable(scope, () -> {
c.send(1);
});

try {
Thread.sleep(100); // let the send suspend
c.done();

// then
assertFalse(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
} finally {
f.cancel();
}
});
}

@Test
void testClosed_hasBufferedValues_whenDone() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>(5);

// when
c.send(1);
c.send(2);
c.done();

// then
assertFalse(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
}

@Test
void testClosed_hasValues_whenError() throws InterruptedException {
// given
Channel<Integer> c = new Channel<>(5);

// when
c.send(1);
c.send(2);
c.error(new RuntimeException());

// then
assertTrue(c.isClosedForReceive());
assertTrue(c.isClosedForSend());
}
}
25 changes: 0 additions & 25 deletions core/src/test/java/com/softwaremill/jox/ChannelRendezvousTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,6 @@ void testSendWaitsForRendezvous() throws ExecutionException, InterruptedExceptio
});
}

@Test
void shouldProperlyReportChannelState() {
// given
Channel<Integer> c1 = new Channel<>();
Channel<Integer> c2 = new Channel<>();
Channel<Integer> c3 = new Channel<>();

// when
c1.done();
c2.error(new RuntimeException());

// then
assertTrue(c1.isDone());
assertFalse(c2.isDone());
assertFalse(c3.isDone());

assertNull(c1.isError());
assertNotNull(c2.isError());
assertNull(c3.isError());

assertTrue(c1.isClosed());
assertTrue(c2.isClosed());
assertFalse(c3.isClosed());
}

@Test
void pendingReceivesShouldGetNotifiedThatChannelIsDone() throws InterruptedException, ExecutionException {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.concurrent.ExecutionException;

import static com.softwaremill.jox.TestUtil.forkVoid;
import static com.softwaremill.jox.TestUtil.scoped;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/com/softwaremill/jox/StressTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

import static com.softwaremill.jox.Select.selectSafe;
import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

public class StressTest {
@TestWithCapacities
Expand Down Expand Up @@ -53,7 +52,7 @@ private void testAndVerify(int capacity, boolean direct) throws Exception {
for (int j = 0; j < numberOfIterations; j++) {
stressTestIteration(data, j);
}
// if this is the first fork, after all the iterations complete, close the channel
// if this is the first fork, after all the iterations complete, close the channels
if (finalI == 0) {
for (var ch : chs) {
ch.done();
Expand Down Expand Up @@ -93,6 +92,7 @@ private void testAndVerify(int capacity, boolean direct) throws Exception {
for (var ch : chs) {
List<String> remainingInChannel = drainChannel(ch);
allReceived.addAll(remainingInChannel);
assertTrue(ch.isClosedForReceive());
}
assertEquals(allSent, allReceived, "each sent message should have been received, or drained");

Expand Down

0 comments on commit 782ddb9

Please sign in to comment.