Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only use busy-looping for rendezvous channels #50

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ Benchmark (capacity) (cha

// jox - multi channel

ChainedBenchmark.channelChain 0 10000 N/A avgt 20 156.385 ± 1.428 ns/op
ChainedBenchmark.channelChain 16 10000 N/A avgt 20 15.478 ± 0.109 ns/op
ChainedBenchmark.channelChain 100 10000 N/A avgt 20 7.502 ± 0.141 ns/op
ChainedBenchmark.channelChain 0 10000 N/A avgt 10 171.100 ± 3.122 ns/op
ChainedBenchmark.channelChain 16 10000 N/A avgt 10 12.697 ± 0.340 ns/op
ChainedBenchmark.channelChain 100 10000 N/A avgt 10 6.468 ± 0.565 ns/op

ParallelBenchmark.parallelChannels 0 N/A 10000 avgt 20 155.535 ± 4.153 ns/op
ParallelBenchmark.parallelChannels 16 N/A 10000 avgt 20 23.127 ± 0.188 ns/op
ParallelBenchmark.parallelChannels 100 N/A 10000 avgt 20 9.193 ± 0.111 ns/op
ParallelBenchmark.parallelChannels 0 N/A 10000 avgt 10 146.830 ± 10.941 ns/op
ParallelBenchmark.parallelChannels 16 N/A 10000 avgt 10 14.863 ± 2.556 ns/op
ParallelBenchmark.parallelChannels 100 N/A 10000 avgt 10 8.582 ± 0.523 ns/op

// kotlin - multi channel

Expand All @@ -246,20 +246,20 @@ ParallelKotlinBenchmark.parallelChannels_defaultDispatcher 100

// java built-in - multi queues

ChainedBenchmark.queueChain 0 10000 N/A avgt 20 94.836 ± 14.374 ns/op
ChainedBenchmark.queueChain 16 10000 N/A avgt 20 8.534 ± 0.119 ns/op
ChainedBenchmark.queueChain 100 10000 N/A avgt 20 4.215 ± 0.042 ns/op
ChainedBenchmark.queueChain 0 10000 N/A avgt 10 79.284 ± 5.376 ns/op
ChainedBenchmark.queueChain 16 10000 N/A avgt 10 8.772 ± 0.152 ns/op
ChainedBenchmark.queueChain 100 10000 N/A avgt 10 4.268 ± 0.231 ns/op

ParallelBenchmark.parallelQueues 0 N/A 10000 avgt 20 98.573 ± 12.233 ns/op
ParallelBenchmark.parallelQueues 16 N/A 10000 avgt 20 24.144 ± 0.957 ns/op
ParallelBenchmark.parallelQueues 100 N/A 10000 avgt 20 15.537 ± 0.112 ns/op
ParallelBenchmark.parallelQueues 0 N/A 10000 avgt 10 84.382 ± 20.473 ns/op
ParallelBenchmark.parallelQueues 16 N/A 10000 avgt 10 15.043 ± 2.096 ns/op
ParallelBenchmark.parallelQueues 100 N/A 10000 avgt 10 6.182 ± 0.685 ns/op

// jox - single channel

RendezvousBenchmark.channel N/A N/A N/A avgt 20 173.645 ± 4.181 ns/op
RendezvousBenchmark.channel N/A N/A N/A avgt 10 199.199 ± 11.493 ns/op

BufferedBenchmark.channel 16 N/A N/A avgt 20 178.973 ± 45.096 ns/op
BufferedBenchmark.channel 100 N/A N/A avgt 20 144.355 ± 28.172 ns/op
BufferedBenchmark.channel 16 N/A N/A avgt 10 201.319 ± 18.463 ns/op
BufferedBenchmark.channel 100 N/A N/A avgt 10 102.972 ± 9.247 ns/op

// kotlin - single channel

Expand All @@ -270,8 +270,8 @@ BufferedKotlinBenchmark.channel_defaultDispatcher 100

// jox - selects

SelectBenchmark.selectWithSingleClause N/A N/A N/A avgt 20 190.910 ± 2.997 ns/op
SelectBenchmark.selectWithTwoClauses N/A N/A N/A avgt 20 812.192 ± 36.830 ns/op
SelectBenchmark.selectWithSingleClause N/A N/A N/A avgt 10 229.320 ± 23.705 ns/op
SelectBenchmark.selectWithTwoClauses N/A N/A N/A avgt 10 761.067 ± 30.963 ns/op

// kotlin - selects

Expand All @@ -280,11 +280,11 @@ SelectKotlinBenchmark.selectWithTwoClauses_defaultDispatcher N/A

// java built-in - single queue

BufferedBenchmark.arrayBlockingQueue 16 N/A N/A avgt 20 366.444 ± 67.573 ns/op
BufferedBenchmark.arrayBlockingQueue 100 N/A N/A avgt 20 110.189 ± 3.494 ns/op
BufferedBenchmark.arrayBlockingQueue 16 N/A N/A avgt 10 264.974 ± 61.166 ns/op
BufferedBenchmark.arrayBlockingQueue 100 N/A N/A avgt 10 108.087 ± 4.545 ns/op

RendezvousBenchmark.exchanger N/A N/A N/A avgt 20 90.830 ± 0.610 ns/op
RendezvousBenchmark.synchronousQueue N/A N/A N/A avgt 20 1501.291 ± 253.663 ns/op
RendezvousBenchmark.exchanger N/A N/A N/A avgt 10 93.386 ± 1.421 ns/op
RendezvousBenchmark.synchronousQueue N/A N/A N/A avgt 10 1714.025 ± 671.140 ns/op

// multi queue/channel tests with a larger number of elements

Expand Down
24 changes: 14 additions & 10 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ operations on these (previous) segments, and we'll end up wanting to remove such
// immutable state

private final int capacity;
private final boolean isRendezvous;
final boolean isRendezvous;
private final boolean isUnlimited;

// mutable state
Expand Down Expand Up @@ -320,7 +320,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns
// storing the value to send as the continuation's payload, so that the receiver can use it
var c = new Continuation(value);
if (segment.casCell(i, null, c)) {
if (c.await(segment, i) == ChannelClosedMarker.CLOSED) {
if (c.await(segment, i, isRendezvous) == ChannelClosedMarker.CLOSED) {
return SendResult.CLOSED;
} else {
return SendResult.AWAITED;
Expand Down Expand Up @@ -488,7 +488,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
var c = new Continuation(null);
if (segment.casCell(i, state, c)) {
expandBuffer();
var result = c.await(segment, i);
var result = c.await(segment, i, isRendezvous);
if (result == ChannelClosedMarker.CLOSED) {
return ReceiveResult.CLOSED;
} else {
Expand Down Expand Up @@ -1141,16 +1141,20 @@ enum CellState {

final class Continuation {
/**
* The number of busy-looping iterations before yielding, during {@link Continuation#await(Segment, int)}.
* {@code 0}, if there's a single CPU. When there's no more than 4 CPUs, we use {@code 128} iterations: this is
* based on the (limited) testing that we've done with various systems. Otherwise, we use 1024 iterations.
* For rendezvous channels, the number of busy-looping iterations before yielding, during
* {@link Continuation#await(Segment, int, boolean)}. {@code 0}, if there's a single CPU. When there's no more than
* 4 CPUs, we use {@code 128} iterations: this is based on the (limited) testing that we've done with various
* systems. Otherwise, we use 1024 iterations.
* <p>
* For buffered channels, busy-looping is not used, as this negatively affects the performance.
* <p>
* This might need revisiting when more testing & more benchmarks are available.
*/
static final int SPINS;
static final int RENDEZVOUS_SPINS;

static {
var nproc = Runtime.getRuntime().availableProcessors();
SPINS = (nproc == 1) ? 0 : ((nproc <= 4) ? (1 << 7) : (1 << 10));
RENDEZVOUS_SPINS = (nproc == 1) ? 0 : ((nproc <= 4) ? (1 << 7) : (1 << 10));
}

private final Thread creatingThread;
Expand Down Expand Up @@ -1190,8 +1194,8 @@ boolean tryResume(Object value) {
* @param cellIndex The index of the cell for which to change the state to interrupted, if interruption happens.
* @return The value with which the continuation was resumed.
*/
Object await(Segment segment, int cellIndex) throws InterruptedException {
var spinIterations = SPINS;
Object await(Segment segment, int cellIndex, boolean isRendezvous) throws InterruptedException {
var spinIterations = isRendezvous ? RENDEZVOUS_SPINS : 0;
while (data == null) {
if (spinIterations > 0) {
Thread.onSpinWait();
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static <U> Object selectSafe(SelectClause<U>... clauses) throws Interrupt
@SafeVarargs
private static <U> Object doSelectSafe(SelectClause<U>... clauses) throws InterruptedException {
// check that the clause doesn't refer to a channel that is already used in a different clause
verifyChannelsUnique(clauses);
var allRendezvous = verifyChannelsUnique_getAreAllRendezvous(clauses);

var si = new SelectInstance(clauses.length);
for (int i = 0; i < clauses.length; i++) {
Expand All @@ -114,18 +114,22 @@ private static <U> Object doSelectSafe(SelectClause<U>... clauses) throws Interr
}
}

return si.checkStateAndWait();
return si.checkStateAndWait(allRendezvous);
}

private static void verifyChannelsUnique(SelectClause<?>[] clauses) {
private static boolean verifyChannelsUnique_getAreAllRendezvous(SelectClause<?>[] clauses) {
var allRendezvous = true;
// we expect the number of clauses to be small, so that this n^2 double-loop is faster than allocating a set
for (int i = 0; i < clauses.length; i++) {
var chi = clauses[i].getChannel();
for (int j = i + 1; j < clauses.length; j++) {
if (clauses[i].getChannel() == clauses[j].getChannel()) {
throw new IllegalArgumentException("Channel " + clauses[i].getChannel() + " is used in multiple clauses");
if (chi == clauses[j].getChannel()) {
throw new IllegalArgumentException("Channel " + chi + " is used in multiple clauses");
}
}
allRendezvous = allRendezvous && (chi == null || chi.isRendezvous);
}
return allRendezvous;
}

public static <T> SelectClause<T> defaultClause(T value) {
Expand Down Expand Up @@ -210,18 +214,20 @@ <U> boolean register(SelectClause<U> clause) {
// main loop

/**
* @param allRendezvous If channels for all clauses are rendezvous channels. In such a case, busy-looping is
* initially used, instead of blocking.
* @return Either the value returned by the selected clause (which can include {@link RestartSelectMarker#RESTART}),
* or {@link ChannelClosed}, when any of the channels is closed.
*/
Object checkStateAndWait() throws InterruptedException {
Object checkStateAndWait(boolean allRendezvous) throws InterruptedException {
while (true) {
var currentState = state;
if (currentState == SelectState.REGISTERING) {
// registering done, waiting until a clause is selected - setting the thread to wake up as the state
// we won't leave this case until the state is changed from Thread
var currentThread = Thread.currentThread();
if (STATE.compareAndSet(this, SelectState.REGISTERING, currentThread)) {
var spinIterations = Continuation.SPINS;
var spinIterations = allRendezvous ? Continuation.RENDEZVOUS_SPINS : 0;
while (state == currentThread) {
// same logic as in Continuation
if (spinIterations > 0) {
Expand Down
Loading