From 269f020669bbd60ce0a2698f887ab8c0b8c4d12a Mon Sep 17 00:00:00 2001 From: Romain Manni-Bucau Date: Tue, 20 Feb 2024 09:26:23 +0100 Subject: [PATCH] [virtual thread] drop some synchronized runtime blocks to avoid pinning using virtual threads --- .../api/container/DefaultInstance.java | 7 ++- .../impl/flow/IteratorSubscription.java | 10 +++- .../flow/ServletInputStreamSubscription.java | 55 +++++++++++++------ .../server/impl/servlet/FusionServlet.java | 6 +- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/fusion-api/src/main/java/io/yupiik/fusion/framework/api/container/DefaultInstance.java b/fusion-api/src/main/java/io/yupiik/fusion/framework/api/container/DefaultInstance.java index b96f57c4..5ed838c3 100644 --- a/fusion-api/src/main/java/io/yupiik/fusion/framework/api/container/DefaultInstance.java +++ b/fusion-api/src/main/java/io/yupiik/fusion/framework/api/container/DefaultInstance.java @@ -19,11 +19,13 @@ import io.yupiik.fusion.framework.api.RuntimeContainer; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; public class DefaultInstance implements Instance { private final FusionBean bean; private final RuntimeContainer container; private final T instance; + private final AtomicBoolean closed = new AtomicBoolean(); private final List> dependencies; public DefaultInstance(final FusionBean bean, final RuntimeContainer container, @@ -45,7 +47,10 @@ public T instance() { } @Override - public synchronized void close() { + public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } if (bean != null) { bean.destroy(container, instance); } diff --git a/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/IteratorSubscription.java b/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/IteratorSubscription.java index 770fd3eb..598a7113 100644 --- a/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/IteratorSubscription.java +++ b/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/IteratorSubscription.java @@ -17,10 +17,13 @@ import java.util.Iterator; import java.util.concurrent.Flow; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class IteratorSubscription implements Flow.Subscription { private final Flow.Subscriber subscriber; private final Iterator remaining; + private final Lock lock = new ReentrantLock(); private volatile boolean cancelled; @@ -30,7 +33,7 @@ public IteratorSubscription(final Flow.Subscriber subscriber, final I } @Override - public synchronized void request(final long n) { + public void request(final long n) { if (n <= 0) { throw new IllegalArgumentException("Invalid request: " + n + ", should be > 0"); } @@ -38,6 +41,7 @@ public synchronized void request(final long n) { return; } + lock.lock(); try { long loop = n; while (loop > 0 && remaining.hasNext()) { @@ -49,11 +53,13 @@ public synchronized void request(final long n) { } } catch (final RuntimeException re) { subscriber.onError(re); + } finally { + lock.unlock(); } } @Override - public synchronized void cancel() { + public void cancel() { cancelled = true; } } diff --git a/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/ServletInputStreamSubscription.java b/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/ServletInputStreamSubscription.java index 5ade1e18..e9990a31 100644 --- a/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/ServletInputStreamSubscription.java +++ b/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/flow/ServletInputStreamSubscription.java @@ -23,6 +23,8 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.concurrent.Flow; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -34,6 +36,7 @@ public class ServletInputStreamSubscription implements Flow.Subscription, ReadLi private final ServletInputStream inputStream; private final ReadableByteChannel channel; private final Flow.Subscriber downstream; + private final Lock lock = new ReentrantLock(); private volatile boolean cancelled = false; private volatile long requested = 0; @@ -73,44 +76,64 @@ private void readIfPossible() { } @Override - public synchronized void onDataAvailable() { - readIfPossible(); + public void onDataAvailable() { + lock.lock(); + try { + readIfPossible(); + } finally { + lock.unlock(); + } } @Override - public synchronized void onAllDataRead() { + public void onAllDataRead() { if (cancelled) { return; } - - readIfPossible(); - if (!cancelled) { - downstream.onComplete(); - doClose(); + lock.lock(); + try { + readIfPossible(); + if (!cancelled) { + downstream.onComplete(); + doClose(); + } + } finally { + lock.unlock(); } } @Override - public synchronized void request(final long n) { + public void request(final long n) { if (n <= 0) { throw new IllegalArgumentException("Invalid request: " + n + ", should be > 0"); } if (cancelled) { return; } - requested += n; - readIfPossible(); + lock.lock(); + try { + requested += n; + readIfPossible(); + } finally { + lock.unlock(); + } } @Override - public synchronized void onError(final Throwable throwable) { + public void onError(final Throwable throwable) { LOGGER.log(Level.SEVERE, throwable, throwable::getMessage); if (cancelled) { return; } - doClose(); - downstream.onError(throwable); - cancelled = true; + + lock.lock(); + try { + doClose(); + downstream.onError(throwable); + cancelled = true; + } finally { + lock.unlock(); + } } private void doClose() { @@ -124,7 +147,7 @@ private void doClose() { } @Override - public synchronized void cancel() { + public void cancel() { cancelled = true; } } diff --git a/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/servlet/FusionServlet.java b/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/servlet/FusionServlet.java index ff627d70..40941501 100644 --- a/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/servlet/FusionServlet.java +++ b/fusion-http-server/src/main/java/io/yupiik/fusion/http/server/impl/servlet/FusionServlet.java @@ -164,7 +164,7 @@ protected void writeResponse(final HttpServletResponse resp, final Response resp final var channel = Channels.newChannel(stream); body.subscribe(new Flow.Subscriber<>() { private Flow.Subscription subscription; - private boolean closed = false; + private volatile boolean closed = false; @Override public void onSubscribe(final Flow.Subscription subscription) { @@ -199,16 +199,16 @@ public void onComplete() { doClose(); } - private synchronized void doClose() { + private void doClose() { if (closed) { return; } + closed = true; try { channel.close(); } catch (final IOException e) { logger.log(SEVERE, e, e::getMessage); } - closed = true; } }); }