Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Nov 24, 2023
1 parent d6704ef commit 54a36a7
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 57 deletions.
168 changes: 124 additions & 44 deletions core/src/main/java/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;

import static jox.CellState.*;
import static jox.Segment.findAndMoveForward;

public class Channel<T> {
/*
Expand All @@ -23,32 +23,41 @@ the Kotlin implementation (https://github.com/Kotlin/kotlinx.coroutines/blob/mas
allocation when the sender suspends.
* as we don't directly store elements in the buffer, we don't need to clear them on interrupt etc. This is done
automatically when the cell's state is set to something else than a Continuation/Buffered.
Other notes:
* we need the previous pointers in segments to physically remove segments full of cells in the interrupted state.
Segments before such an interrupted segments might still hold awaiting continuations. When physically removing a
segment, we need to update the `next` pointer of the `previous` ("alive") segment. That way the memory usage is
bounded by the number of awaiting threads.
* after a `send`, if we know that r >= s, or after a `receive`, when we know that s >= r, we can set the `previous`
pointer in the segment to `null`, so that the previous segments can be GCd. Even if there are still ongoing
operations on these (previous) segments, and we'll end up wanting to remove such a segment, subsequent channel
operations won't use them, so the relinking won't be useful.
*/

/**
* The total number of `send` operations ever invoked. Each invocation gets a unique cell to process.
*/
private final AtomicLong senders = new AtomicLong(0L);
private final AtomicLong receivers = new AtomicLong(0L);

/**
* The buffer holding the state of each cell. State can be {@link CellState}, {@link Buffered}, or {@link Continuation}.
* Segments holding cell states. State can be {@link CellState}, {@link Buffered}, or {@link Continuation}.
*/
private final AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<>(20_000_000); // TODO

private void setState(Object state, long index) {
buffer.set((int) index, state);
private final AtomicReference<Segment> sendSegment;
private final AtomicReference<Segment> receiveSegment;
private final AtomicReference<Segment> bufferEndSegment;

public Channel() {
var isRendezvous = true; // TODO: add capacity
var firstSegment = new Segment(0, null, isRendezvous ? 2 : 3);
sendSegment = new AtomicReference<>(firstSegment);
receiveSegment = new AtomicReference<>(firstSegment);
bufferEndSegment = new AtomicReference<>(isRendezvous ? Segment.NULL_SEGMENT : firstSegment);
}

// passed to continuation to set the interrupt state
private final BiConsumer<Object, Long> setStateMethod = this::setState;

private Object getState(long index) {
return buffer.get((int) index);
}

private boolean casState(long index, Object expected, Object newValue) {
return buffer.compareAndSet((int) index, expected, newValue);
}
private final TriConsumer<Segment, Integer, Object> setStateMethod = Segment::setCell;

//

Expand All @@ -73,19 +82,54 @@ public Object sendSafe(T value) throws InterruptedException {
throw new NullPointerException();
}
while (true) {
var s = senders.incrementAndGet(); // reserving the next cell
if (updateCellSend(s, value)) return null;
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = sendSegment.get();
// reserving the next cell
var s = senders.incrementAndGet();

// calculating the segment id and the index within the segment
var id = s / Segment.SEGMENT_SIZE;
var i = (int) (s % Segment.SEGMENT_SIZE);

// check if `sendSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(sendSegment, segment, id);

// if we still have another segment, the cell (as well as all other ones) must have been interrupted
if (segment.getId() != id) {
// skipping all interrupted cells, and trying with a new one
senders.compareAndSet(s, segment.getId() * Segment.SEGMENT_SIZE);
continue;
}
}

var sendResult = updateCellSend(segment, i, s, value);
/*
After `updateCellSend` completes, we can be sure that r >= s:
- if we stored and awaited a continuation, and it was resumed, then a receiver must have appeared
- if we buffered a value, then a receiver is in progress in that cell
- if a continuation was present, then the receiver must have been there
- if the cell was interrupted, that could have been only because of a receiver
- same, if the cell is broken
The only case when r < s is when awaiting on the continuation is interrupted, in which case the exception
propagates outside of this method.
*/
segment.cleanPrev();
if (sendResult) return null;
}
}

/**
* @param s Index of the reserved cell.
* @param value The value to send.
* @param segment The segment in which to store the cell's state.
* @param i The index within the {@code segment}.
* @param s Index of the reserved cell.
* @param value The value to send.
* @return {@code true}, if sending was successful; {@code false}, if it should be restarted.
*/
private boolean updateCellSend(long s, T value) throws InterruptedException {
private boolean updateCellSend(Segment segment, int i, long s, T value) throws InterruptedException {
while (true) {
var state = getState(s); // reading the current state of the cell; we'll try to update it atomically
var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically
var r = receivers.get(); // reading the receiver's counter

switch (state) {
Expand All @@ -94,14 +138,14 @@ private boolean updateCellSend(long s, T value) throws InterruptedException {
// cell is empty, and no receiver -> suspend
// storing the value to send as the continuation's payload, so that the receiver can use it
var c = new Continuation(value);
if (casState(s, null, c)) {
c.await(setStateMethod, s);
if (segment.casCell(i, null, c)) {
c.await(setStateMethod, segment, i);
return true;
}
// else: CAS unsuccessful, repeat
} else {
// cell is empty, but a receiver is in progress -> elimination
if (casState(s, null, new Buffered(value))) {
if (segment.casCell(i, null, new Buffered(value))) {
return true;
}
// else: CAS unsuccessful, repeat
Expand All @@ -110,7 +154,7 @@ private boolean updateCellSend(long s, T value) throws InterruptedException {
case Continuation c -> {
// a receiver is waiting -> trying to resume
if (c.tryResume(value)) {
setState(DONE, s);
segment.setCell(i, DONE);
return true;
} else {
// cell interrupted -> trying with a new one
Expand Down Expand Up @@ -144,8 +188,40 @@ public T receive() throws InterruptedException {
*/
public Object receiveSafe() throws InterruptedException {
while (true) {
var r = receivers.incrementAndGet(); // reserving the next cell
var result = updateCellReceive(r);
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = receiveSegment.get();
// reserving the next cell
var r = receivers.incrementAndGet();

// calculating the segment id and the index within the segment
var id = r / Segment.SEGMENT_SIZE;
var i = (int) (r % Segment.SEGMENT_SIZE);

// check if `sendSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(receiveSegment, segment, id);

// if we still have another segment, the cell (as well as all other ones) must have been interrupted
if (segment.getId() != id) {
// skipping all interrupted cells, and trying with a new one
senders.compareAndSet(r, segment.getId() * Segment.SEGMENT_SIZE);
continue;
}
}

var result = updateCellReceive(segment, i, r);
/*
After `updateCellReceive` completes, we can be sure that s >= r:
- if we stored and awaited a continuation, and it was resumed, then a sender must have appeared
- if we marked the cell as broken, then a sender is in progress in that cell
- if a continuation was present, then the sender must have been there
- if the cell was interrupted, that could have been only because of a sender
- if a value was buffered, that's because there's a matching sender
The only case when s < r is when awaiting on the continuation is interrupted, in which case the exception
propagates outside of this method.
*/
segment.cleanPrev();
if (result != UpdateCellReceiveResult.RESTART) {
return result;
}
Expand All @@ -156,9 +232,9 @@ public Object receiveSafe() throws InterruptedException {
* @param r Index of the reserved cell.
* @return Either a restart ({@link UpdateCellReceiveResult#RESTART}), or the received value.
*/
private Object updateCellReceive(long r) throws InterruptedException {
private Object updateCellReceive(Segment segment, int i, long r) throws InterruptedException {
while (true) {
var state = getState(r); // reading the current state of the cell; we'll try to update it atomically
var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically
var s = senders.get(); // reading the sender's counter

switch (state) {
Expand All @@ -167,13 +243,13 @@ private Object updateCellReceive(long r) throws InterruptedException {
// cell is empty, and no sender -> suspend
// not using any payload
var c = new Continuation(null);
if (casState(r, null, c)) {
return c.await(setStateMethod, r);
if (segment.casCell(i, null, c)) {
return c.await(setStateMethod, segment, i);
}
// else: CAS unsuccessful, repeat
} else {
// sender in progress, receiver changed state first -> restart
if (casState(r, null, BROKEN)) {
if (segment.casCell(i, null, BROKEN)) {
return UpdateCellReceiveResult.RESTART;
}
// else: CAS unsuccessful, repeat
Expand All @@ -182,7 +258,7 @@ private Object updateCellReceive(long r) throws InterruptedException {
case Continuation c -> {
// a sender is waiting -> trying to resume
if (c.tryResume(0)) {
setState(DONE, r);
segment.setCell(i, DONE);
return c.getPayload();
} else {
// cell interrupted -> trying with a new one
Expand All @@ -201,12 +277,13 @@ private Object updateCellReceive(long r) throws InterruptedException {
}
}
}
}

// possible return values of updateCellReceive: one of the enum constants below, or the received value

enum UpdateCellReceiveResult {
RESTART
/**
* Possible return values of {@link Channel#updateCellReceive(Segment, int, long)}: one of the enum constants below, or the received value.
*/
enum UpdateCellReceiveResult {
RESTART
}
}

// possible states of a cell: one of the enum constants below, Buffered, or Continuation
Expand All @@ -217,12 +294,12 @@ enum CellState {
BROKEN;
}

// a java record called Buffered with a single value field; the type should be T
record Buffered(Object value) {}

final class Continuation {
/**
* The number of busy-looping iterations before yielding, during {@link Continuation#await(Runnable)}. {@code 0}, if there's a single CPU.
* The number of busy-looping iterations before yielding, during {@link Continuation#await(TriConsumer, Segment, int)}.
* {@code 0}, if there's a single CPU.
*/
private static final int SPINS = Runtime.getRuntime().availableProcessors() == 1 ? 0 : 10000;

Expand Down Expand Up @@ -252,10 +329,11 @@ boolean tryResume(Object value) {
* Await for the continuation to be resumed.
*
* @param setStateMethod The method to call which will change the cell's state to interrupted, if interruption happens.
* @param segment The segment in which the cell is located.
* @param cellIndex The index of the cell for which to change the state to interrupted, if interruption happens.
* @return The value with which the continuation was resumed.
*/
Object await(BiConsumer<Object, Long> setStateMethod, long cellIndex) throws InterruptedException {
Object await(TriConsumer<Segment, Integer, Object> setStateMethod, Segment segment, int cellIndex) throws InterruptedException {
var spinIterations = SPINS;
while (data == null) {
if (spinIterations > 0) {
Expand All @@ -267,7 +345,9 @@ Object await(BiConsumer<Object, Long> setStateMethod, long cellIndex) throws Int
if (Thread.interrupted()) {
// potential race with `tryResume`
if (Continuation.DATA.compareAndSet(this, null, ContinuationMarker.INTERRUPTED)) {
setStateMethod.accept(INTERRUPTED, cellIndex);
setStateMethod.accept(segment, cellIndex, INTERRUPTED);
// notifying the segment - if all cells become interrupted, the segment can be removed
segment.cellInterrupted();
throw new InterruptedException();
} else {
// another thread already set the data; setting the interrupt status (so that the next blocking
Expand Down Expand Up @@ -302,4 +382,4 @@ Object getPayload() {
// the marker value is used only to mark in the continuation's `data` that interruption won the race with `tryResume`
enum ContinuationMarker {
INTERRUPTED
}
}
Loading

0 comments on commit 54a36a7

Please sign in to comment.