diff --git a/README.md b/README.md index 8aa0587..ac09554 100644 --- a/README.md +++ b/README.md @@ -222,13 +222,13 @@ Benchmark (capacity) (cha // jox - multi channel -ChainedBenchmark.channelChain 0 10000 N/A avgt 20 156.385 ± 1.428 ns/op -ChainedBenchmark.channelChain 16 10000 N/A avgt 20 15.478 ± 0.109 ns/op -ChainedBenchmark.channelChain 100 10000 N/A avgt 20 7.502 ± 0.141 ns/op +ChainedBenchmark.channelChain 0 10000 N/A avgt 10 171.100 ± 3.122 ns/op +ChainedBenchmark.channelChain 16 10000 N/A avgt 10 12.697 ± 0.340 ns/op +ChainedBenchmark.channelChain 100 10000 N/A avgt 10 6.468 ± 0.565 ns/op -ParallelBenchmark.parallelChannels 0 N/A 10000 avgt 20 155.535 ± 4.153 ns/op -ParallelBenchmark.parallelChannels 16 N/A 10000 avgt 20 23.127 ± 0.188 ns/op -ParallelBenchmark.parallelChannels 100 N/A 10000 avgt 20 9.193 ± 0.111 ns/op +ParallelBenchmark.parallelChannels 0 N/A 10000 avgt 10 146.830 ± 10.941 ns/op +ParallelBenchmark.parallelChannels 16 N/A 10000 avgt 10 14.863 ± 2.556 ns/op +ParallelBenchmark.parallelChannels 100 N/A 10000 avgt 10 8.582 ± 0.523 ns/op // kotlin - multi channel @@ -246,20 +246,20 @@ ParallelKotlinBenchmark.parallelChannels_defaultDispatcher 100 // java built-in - multi queues -ChainedBenchmark.queueChain 0 10000 N/A avgt 20 94.836 ± 14.374 ns/op -ChainedBenchmark.queueChain 16 10000 N/A avgt 20 8.534 ± 0.119 ns/op -ChainedBenchmark.queueChain 100 10000 N/A avgt 20 4.215 ± 0.042 ns/op +ChainedBenchmark.queueChain 0 10000 N/A avgt 10 79.284 ± 5.376 ns/op +ChainedBenchmark.queueChain 16 10000 N/A avgt 10 8.772 ± 0.152 ns/op +ChainedBenchmark.queueChain 100 10000 N/A avgt 10 4.268 ± 0.231 ns/op -ParallelBenchmark.parallelQueues 0 N/A 10000 avgt 20 98.573 ± 12.233 ns/op -ParallelBenchmark.parallelQueues 16 N/A 10000 avgt 20 24.144 ± 0.957 ns/op -ParallelBenchmark.parallelQueues 100 N/A 10000 avgt 20 15.537 ± 0.112 ns/op +ParallelBenchmark.parallelQueues 0 N/A 10000 avgt 10 84.382 ± 20.473 ns/op +ParallelBenchmark.parallelQueues 16 N/A 10000 avgt 10 15.043 ± 2.096 ns/op +ParallelBenchmark.parallelQueues 100 N/A 10000 avgt 10 6.182 ± 0.685 ns/op // jox - single channel -RendezvousBenchmark.channel N/A N/A N/A avgt 20 173.645 ± 4.181 ns/op +RendezvousBenchmark.channel N/A N/A N/A avgt 10 199.199 ± 11.493 ns/op -BufferedBenchmark.channel 16 N/A N/A avgt 20 178.973 ± 45.096 ns/op -BufferedBenchmark.channel 100 N/A N/A avgt 20 144.355 ± 28.172 ns/op +BufferedBenchmark.channel 16 N/A N/A avgt 10 201.319 ± 18.463 ns/op +BufferedBenchmark.channel 100 N/A N/A avgt 10 102.972 ± 9.247 ns/op // kotlin - single channel @@ -270,8 +270,8 @@ BufferedKotlinBenchmark.channel_defaultDispatcher 100 // jox - selects -SelectBenchmark.selectWithSingleClause N/A N/A N/A avgt 20 190.910 ± 2.997 ns/op -SelectBenchmark.selectWithTwoClauses N/A N/A N/A avgt 20 812.192 ± 36.830 ns/op +SelectBenchmark.selectWithSingleClause N/A N/A N/A avgt 10 229.320 ± 23.705 ns/op +SelectBenchmark.selectWithTwoClauses N/A N/A N/A avgt 10 761.067 ± 30.963 ns/op // kotlin - selects @@ -280,11 +280,11 @@ SelectKotlinBenchmark.selectWithTwoClauses_defaultDispatcher N/A // java built-in - single queue -BufferedBenchmark.arrayBlockingQueue 16 N/A N/A avgt 20 366.444 ± 67.573 ns/op -BufferedBenchmark.arrayBlockingQueue 100 N/A N/A avgt 20 110.189 ± 3.494 ns/op +BufferedBenchmark.arrayBlockingQueue 16 N/A N/A avgt 10 264.974 ± 61.166 ns/op +BufferedBenchmark.arrayBlockingQueue 100 N/A N/A avgt 10 108.087 ± 4.545 ns/op -RendezvousBenchmark.exchanger N/A N/A N/A avgt 20 90.830 ± 0.610 ns/op -RendezvousBenchmark.synchronousQueue N/A N/A N/A avgt 20 1501.291 ± 253.663 ns/op +RendezvousBenchmark.exchanger N/A N/A N/A avgt 10 93.386 ± 1.421 ns/op +RendezvousBenchmark.synchronousQueue N/A N/A N/A avgt 10 1714.025 ± 671.140 ns/op // multi queue/channel tests with a larger number of elements diff --git a/core/src/main/java/com/softwaremill/jox/Channel.java b/core/src/main/java/com/softwaremill/jox/Channel.java index 5cf0212..cfb00e9 100644 --- a/core/src/main/java/com/softwaremill/jox/Channel.java +++ b/core/src/main/java/com/softwaremill/jox/Channel.java @@ -90,7 +90,7 @@ operations on these (previous) segments, and we'll end up wanting to remove such // immutable state private final int capacity; - private final boolean isRendezvous; + final boolean isRendezvous; private final boolean isUnlimited; // mutable state @@ -320,7 +320,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns // storing the value to send as the continuation's payload, so that the receiver can use it var c = new Continuation(value); if (segment.casCell(i, null, c)) { - if (c.await(segment, i) == ChannelClosedMarker.CLOSED) { + if (c.await(segment, i, isRendezvous) == ChannelClosedMarker.CLOSED) { return SendResult.CLOSED; } else { return SendResult.AWAITED; @@ -488,7 +488,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance var c = new Continuation(null); if (segment.casCell(i, state, c)) { expandBuffer(); - var result = c.await(segment, i); + var result = c.await(segment, i, isRendezvous); if (result == ChannelClosedMarker.CLOSED) { return ReceiveResult.CLOSED; } else { @@ -1141,16 +1141,20 @@ enum CellState { final class Continuation { /** - * The number of busy-looping iterations before yielding, during {@link Continuation#await(Segment, int)}. - * {@code 0}, if there's a single CPU. When there's no more than 4 CPUs, we use {@code 128} iterations: this is - * based on the (limited) testing that we've done with various systems. Otherwise, we use 1024 iterations. + * For rendezvous channels, the number of busy-looping iterations before yielding, during + * {@link Continuation#await(Segment, int, boolean)}. {@code 0}, if there's a single CPU. When there's no more than + * 4 CPUs, we use {@code 128} iterations: this is based on the (limited) testing that we've done with various + * systems. Otherwise, we use 1024 iterations. + *

+ * For buffered channels, busy-looping is not used, as this negatively affects the performance. + *

* This might need revisiting when more testing & more benchmarks are available. */ - static final int SPINS; + static final int RENDEZVOUS_SPINS; static { var nproc = Runtime.getRuntime().availableProcessors(); - SPINS = (nproc == 1) ? 0 : ((nproc <= 4) ? (1 << 7) : (1 << 10)); + RENDEZVOUS_SPINS = (nproc == 1) ? 0 : ((nproc <= 4) ? (1 << 7) : (1 << 10)); } private final Thread creatingThread; @@ -1190,8 +1194,8 @@ boolean tryResume(Object value) { * @param cellIndex The index of the cell for which to change the state to interrupted, if interruption happens. * @return The value with which the continuation was resumed. */ - Object await(Segment segment, int cellIndex) throws InterruptedException { - var spinIterations = SPINS; + Object await(Segment segment, int cellIndex, boolean isRendezvous) throws InterruptedException { + var spinIterations = isRendezvous ? RENDEZVOUS_SPINS : 0; while (data == null) { if (spinIterations > 0) { Thread.onSpinWait(); diff --git a/core/src/main/java/com/softwaremill/jox/Select.java b/core/src/main/java/com/softwaremill/jox/Select.java index 0e16179..701d19c 100644 --- a/core/src/main/java/com/softwaremill/jox/Select.java +++ b/core/src/main/java/com/softwaremill/jox/Select.java @@ -101,7 +101,7 @@ public static Object selectSafe(SelectClause... clauses) throws Interrupt @SafeVarargs private static Object doSelectSafe(SelectClause... clauses) throws InterruptedException { // check that the clause doesn't refer to a channel that is already used in a different clause - verifyChannelsUnique(clauses); + var allRendezvous = verifyChannelsUnique_getAreAllRendezvous(clauses); var si = new SelectInstance(clauses.length); for (int i = 0; i < clauses.length; i++) { @@ -114,18 +114,22 @@ private static Object doSelectSafe(SelectClause... clauses) throws Interr } } - return si.checkStateAndWait(); + return si.checkStateAndWait(allRendezvous); } - private static void verifyChannelsUnique(SelectClause[] clauses) { + private static boolean verifyChannelsUnique_getAreAllRendezvous(SelectClause[] clauses) { + var allRendezvous = true; // we expect the number of clauses to be small, so that this n^2 double-loop is faster than allocating a set for (int i = 0; i < clauses.length; i++) { + var chi = clauses[i].getChannel(); for (int j = i + 1; j < clauses.length; j++) { - if (clauses[i].getChannel() == clauses[j].getChannel()) { - throw new IllegalArgumentException("Channel " + clauses[i].getChannel() + " is used in multiple clauses"); + if (chi == clauses[j].getChannel()) { + throw new IllegalArgumentException("Channel " + chi + " is used in multiple clauses"); } } + allRendezvous = allRendezvous && (chi == null || chi.isRendezvous); } + return allRendezvous; } public static SelectClause defaultClause(T value) { @@ -210,10 +214,12 @@ boolean register(SelectClause clause) { // main loop /** + * @param allRendezvous If channels for all clauses are rendezvous channels. In such a case, busy-looping is + * initially used, instead of blocking. * @return Either the value returned by the selected clause (which can include {@link RestartSelectMarker#RESTART}), * or {@link ChannelClosed}, when any of the channels is closed. */ - Object checkStateAndWait() throws InterruptedException { + Object checkStateAndWait(boolean allRendezvous) throws InterruptedException { while (true) { var currentState = state; if (currentState == SelectState.REGISTERING) { @@ -221,7 +227,7 @@ Object checkStateAndWait() throws InterruptedException { // we won't leave this case until the state is changed from Thread var currentThread = Thread.currentThread(); if (STATE.compareAndSet(this, SelectState.REGISTERING, currentThread)) { - var spinIterations = Continuation.SPINS; + var spinIterations = allRendezvous ? Continuation.RENDEZVOUS_SPINS : 0; while (state == currentThread) { // same logic as in Continuation if (spinIterations > 0) {