Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Nov 20, 2023
1 parent 3ca06ae commit 09dfde1
Showing 1 changed file with 92 additions and 92 deletions.
184 changes: 92 additions & 92 deletions core/src/main/java/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;

import static jox.CellState.*;

public class Channel<T> {
/*
Expand Down Expand Up @@ -34,6 +37,9 @@ private void setState(Object state, long index) {
buffer.set((int) index, state);
}

// passed to continuation to set the interrupt state
private final BiConsumer<Object, Long> setStateMethod = this::setState;

private Object getState(long index) {
return buffer.get((int) index);
}
Expand Down Expand Up @@ -87,7 +93,7 @@ private boolean updateCellSend(long s, T value) throws InterruptedException {
// storing the value to send as the continuation's payload, so that the receiver can use it
var c = new Continuation(value);
if (casState(s, null, c)) {
c.await(() -> setState(CellState.INTERRUPTED, s));
c.await(setStateMethod, s);
return true;
}
// else: CAS unsuccessful, repeat
Expand All @@ -102,14 +108,14 @@ private boolean updateCellSend(long s, T value) throws InterruptedException {
case Continuation c -> {
// a receiver is waiting -> trying to resume
if (c.tryResume(value)) {
setState(CellState.DONE, s);
setState(DONE, s);
return true;
} else {
// cell interrupted -> trying with a new one
return false;
}
}
case CellState.INTERRUPTED, CellState.BROKEN -> {
case INTERRUPTED, BROKEN -> {
// cell interrupted or poisoned -> trying with a new one
return false;
}
Expand Down Expand Up @@ -160,11 +166,11 @@ private Object updateCellReceive(long r) throws InterruptedException {
// not using any payload
var c = new Continuation(null);
if (casState(r, null, c)) {
return c.await(() -> setState(CellState.INTERRUPTED, r));
return c.await(setStateMethod, r);
}
// else: CAS unsuccessful, repeat
} else {
if (casState(r, null, CellState.BROKEN)) {
if (casState(r, null, BROKEN)) {
return UpdateCellReceiveResult.RESTART;
}
// else: CAS unsuccessful, repeat
Expand All @@ -173,7 +179,7 @@ private Object updateCellReceive(long r) throws InterruptedException {
case Continuation c -> {
// a sender is waiting -> trying to resume
if (c.tryResume(0)) {
setState(CellState.DONE, r);
setState(DONE, r);
return c.getPayload();
} else {
// cell interrupted -> trying with a new one
Expand All @@ -184,119 +190,113 @@ private Object updateCellReceive(long r) throws InterruptedException {
// an elimination has happened -> finish
return b.value();
}
case CellState.INTERRUPTED -> {
case INTERRUPTED -> {
// cell interrupted -> trying with a new one
return UpdateCellReceiveResult.RESTART;
}
default -> throw new IllegalStateException("Unexpected state: " + state);
}
}
}
}

// possible return values of updateCellReceive: one of the enum constants below, or the received value
// possible return values of updateCellReceive: one of the enum constants below, or the received value

private enum UpdateCellReceiveResult {
RESTART
}
enum UpdateCellReceiveResult {
RESTART
}

// possible states of a cell: one of the enum constants below, Buffered, or Continuation
// possible states of a cell: one of the enum constants below, Buffered, or Continuation

private enum CellState {
DONE,
INTERRUPTED,
BROKEN;
}
enum CellState {
DONE,
INTERRUPTED,
BROKEN;
}

// a java record called Buffered with a single value field; the type should be T
private record Buffered(Object value) {}
// a java record called Buffered with a single value field; the type should be T
record Buffered(Object value) {}

private static final class Continuation {
/**
* The number of busy-looping iterations before yielding, during {@link Continuation#await(Runnable)}. {@code 0}, if there's a single CPU.
*/
private static final int SPINS = Runtime.getRuntime().availableProcessors() == 1 ? 0 : 10000;
final class Continuation {
/**
* The number of busy-looping iterations before yielding, during {@link Continuation#await(Runnable)}. {@code 0}, if there's a single CPU.
*/
private static final int SPINS = Runtime.getRuntime().availableProcessors() == 1 ? 0 : 10000;

private final Thread creatingThread;
private volatile Object data; // set using DATA var handle
private final Thread creatingThread;
private volatile Object data; // set using DATA var handle

private final Object payload;
private final Object payload;

Continuation(Object payload) {
this.payload = payload;
this.creatingThread = Thread.currentThread();
}
Continuation(Object payload) {
this.payload = payload;
this.creatingThread = Thread.currentThread();
}

/**
* Resume the continuation with the given value.
*
* @param value Should not be {@code null}.
* @return {@code true} tf the continuation was resumed successfully. {@code false} if it was interrupted.
*/
boolean tryResume(Object value) {
var result = Continuation.DATA.compareAndSet(this, null, value);
LockSupport.unpark(creatingThread);
return result;
}
/**
* Resume the continuation with the given value.
*
* @param value Should not be {@code null}.
* @return {@code true} tf the continuation was resumed successfully. {@code false} if it was interrupted.
*/
boolean tryResume(Object value) {
var result = Continuation.DATA.compareAndSet(this, null, value);
LockSupport.unpark(creatingThread);
return result;
}

/**
* Await for the continuation to be resumed.
*
* @param onInterrupt
* @return The value with which the continuation was resumed.
*/
Object await(Runnable onInterrupt) throws InterruptedException {
var spinIterations = SPINS;
while (data == null) {
if (spinIterations > 0) {
Thread.onSpinWait();
spinIterations -= 1;
} else {
LockSupport.park();

if (Thread.interrupted()) {
// potential race with `tryResume`
if (Continuation.DATA.compareAndSet(this, null, ContinuationMarker.INTERRUPTED)) {
var e = new InterruptedException();

try {
onInterrupt.run();
} catch (Throwable ee) {
e.addSuppressed(ee);
}

throw e;
} else {
// another thread already set the data; setting the interrupt status (so that the next blocking
// operation throws), and continuing
Thread.currentThread().interrupt();
}
/**
* Await for the continuation to be resumed.
*
* @param setStateMethod The method to call which will change the cell's state to interrupted, if interruption happens.
* @param cellIndex The index of the cell for which to change the state to interrupted, if interruption happens.
* @return The value with which the continuation was resumed.
*/
Object await(BiConsumer<Object, Long> setStateMethod, long cellIndex) throws InterruptedException {
var spinIterations = SPINS;
while (data == null) {
if (spinIterations > 0) {
Thread.onSpinWait();
spinIterations -= 1;
} else {
LockSupport.park();

if (Thread.interrupted()) {
// potential race with `tryResume`
if (Continuation.DATA.compareAndSet(this, null, ContinuationMarker.INTERRUPTED)) {
setStateMethod.accept(INTERRUPTED, cellIndex);
throw new InterruptedException();
} else {
// another thread already set the data; setting the interrupt status (so that the next blocking
// operation throws), and continuing
Thread.currentThread().interrupt();
}
}
}

return data;
}

Object getPayload() {
return payload;
}
return data;
}

Object getPayload() {
return payload;
}

//
//

private static final VarHandle DATA;
private static final VarHandle DATA;

static {
var l = MethodHandles.lookup();
try {
DATA = l.findVarHandle(Continuation.class, "data", Object.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
static {
var l = MethodHandles.lookup();
try {
DATA = l.findVarHandle(Continuation.class, "data", Object.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}

// the marker value is used only to mark in the continuation's `data` that interruption won the race with `tryResume`
private enum ContinuationMarker {
INTERRUPTED
}
// the marker value is used only to mark in the continuation's `data` that interruption won the race with `tryResume`
enum ContinuationMarker {
INTERRUPTED
}

0 comments on commit 09dfde1

Please sign in to comment.