diff --git a/bench/bench-go/go.mod b/bench/bench-go/go.mod new file mode 100644 index 0000000..c61b4f7 --- /dev/null +++ b/bench/bench-go/go.mod @@ -0,0 +1,3 @@ +module bench + +go 1.22.0 diff --git a/bench/bench-go/parallel.go b/bench/bench-go/parallel.go new file mode 100644 index 0000000..52a7925 --- /dev/null +++ b/bench/bench-go/parallel.go @@ -0,0 +1,9 @@ +package main + +import ( + "fmt" +) + +func main() { + fmt.Println("hello world") +} diff --git a/bench/bench-go/parallel_test.go b/bench/bench-go/parallel_test.go new file mode 100644 index 0000000..8bd7eb1 --- /dev/null +++ b/bench/bench-go/parallel_test.go @@ -0,0 +1,38 @@ +package main + +import ( + "sync" + "testing" +) + +// go test -bench=. -benchtime=1000000000x -count 5 +func BenchmarkParallel(b *testing.B) { + const capacity = 16 + const parallelism = 10000 + + elementsPerChannel := b.N / parallelism + + var wg sync.WaitGroup + + for i := 0; i < parallelism; i++ { + c := make(chan int, capacity) + + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < elementsPerChannel; j++ { + c <- 91 + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < elementsPerChannel; j++ { + <-c + } + }() + } + + wg.Wait() +} diff --git a/bench/bench-java/src/main/java/com/softwaremill/jox/ParallelBenchmark.java b/bench/bench-java/src/main/java/com/softwaremill/jox/ParallelBenchmark.java index 09e7c04..21ea030 100644 --- a/bench/bench-java/src/main/java/com/softwaremill/jox/ParallelBenchmark.java +++ b/bench/bench-java/src/main/java/com/softwaremill/jox/ParallelBenchmark.java @@ -2,10 +2,7 @@ import org.openjdk.jmh.annotations.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Send-receive test for {@link Channel} and {@link BlockingQueue} - a number of (send, receive) thread pairs, @@ -30,20 +27,15 @@ public class ParallelBenchmark { @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) public void parallelChannels() throws InterruptedException { // we want to measure the amount of time a send-receive pair takes - int elements = OPERATIONS_PER_INVOCATION / parallelism; - Channel[] channels = new Channel[parallelism]; - for (int i = 0; i < parallelism; i++) { - channels[i] = new Channel<>(capacity); - } + int elementsPerChannel = OPERATIONS_PER_INVOCATION / parallelism; - Thread[] threads = new Thread[parallelism * 2]; + var latch = new CountDownLatch(parallelism); - // senders for (int t = 0; t < parallelism; t++) { - int finalT = t; - threads[t] = Thread.startVirtualThread(() -> { - var ch = channels[finalT]; - for (int i = 0; i < elements; i++) { + var ch = new Channel(capacity); + // sender + Thread.startVirtualThread(() -> { + for (int i = 0; i < elementsPerChannel; i++) { try { ch.send(91); } catch (InterruptedException e) { @@ -51,52 +43,42 @@ public void parallelChannels() throws InterruptedException { } } }); - } - // receivers - for (int t = 0; t < parallelism; t++) { - int finalT = t; - threads[t + parallelism] = Thread.startVirtualThread(() -> { - var ch = channels[finalT]; - for (int i = 0; i < elements; i++) { + // receiver + Thread.startVirtualThread(() -> { + for (int i = 0; i < elementsPerChannel; i++) { try { ch.receive(); } catch (InterruptedException e) { throw new RuntimeException(e); } } + latch.countDown(); }); } - for (Thread thread : threads) { - thread.join(); - } + latch.await(); } @Benchmark @OperationsPerInvocation(OPERATIONS_PER_INVOCATION) public void parallelQueues() throws InterruptedException { // we want to measure the amount of time a send-receive pair takes - int elements = OPERATIONS_PER_INVOCATION / parallelism; - BlockingQueue[] queues = new BlockingQueue[parallelism]; - if (capacity == 0) { - for (int i = 0; i < parallelism; i++) { - queues[i] = new SynchronousQueue<>(); - } - } else { - for (int i = 0; i < parallelism; i++) { - queues[i] = new ArrayBlockingQueue<>(capacity); - } - } + int elementsPerChannel = OPERATIONS_PER_INVOCATION / parallelism; - Thread[] threads = new Thread[parallelism * 2]; + var latch = new CountDownLatch(parallelism); - // senders for (int t = 0; t < parallelism; t++) { - int finalT = t; - threads[t] = Thread.startVirtualThread(() -> { - var q = queues[finalT]; - for (int i = 0; i < elements; i++) { + BlockingQueue q; + if (capacity == 0) { + q = new SynchronousQueue<>(); + } else { + q = new ArrayBlockingQueue<>(capacity); + } + + // sender + Thread.startVirtualThread(() -> { + for (int i = 0; i < elementsPerChannel; i++) { try { q.put(91); } catch (InterruptedException e) { @@ -104,25 +86,20 @@ public void parallelQueues() throws InterruptedException { } } }); - } - // receivers - for (int t = 0; t < parallelism; t++) { - int finalT = t; - threads[t + parallelism] = Thread.startVirtualThread(() -> { - var q = queues[finalT]; - for (int i = 0; i < elements; i++) { + // receiver + Thread.startVirtualThread(() -> { + for (int i = 0; i < elementsPerChannel; i++) { try { q.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } } + latch.countDown(); }); } - for (Thread thread : threads) { - thread.join(); - } + latch.await(); } } diff --git a/bench/bench-kotlin/src/com/softwaremill/jox/ParallelKotlinBenchmark.kt b/bench/bench-kotlin/src/com/softwaremill/jox/ParallelKotlinBenchmark.kt index 0df189c..04f2185 100644 --- a/bench/bench-kotlin/src/com/softwaremill/jox/ParallelKotlinBenchmark.kt +++ b/bench/bench-kotlin/src/com/softwaremill/jox/ParallelKotlinBenchmark.kt @@ -26,23 +26,18 @@ open class ParallelKotlinBenchmark { fun parallelChannels_defaultDispatcher() { runBlocking { // we want to measure the amount of time a send-receive pair takes - var elements = OPERATIONS_PER_INVOCATION_PARALLEL / parallelism + val elements = OPERATIONS_PER_INVOCATION_PARALLEL / parallelism - // create an array of channelCount channels - val channels = Array(parallelism) { Channel(capacity) } - - // senders for (t in 0 until parallelism) { + val ch = Channel(capacity) + + // sender launch(Dispatchers.Default) { - val ch = channels[t] for (x in 1..elements) ch.send(91) } - } - // receivers - for (t in 0 until parallelism) { + // receiver launch(Dispatchers.Default) { - val ch = channels[t] for (x in 1..elements) ch.receive() } }