Skip to content

Commit

Permalink
[virtual thread] drop some synchronized runtime blocks to avoid pinni…
Browse files Browse the repository at this point in the history
…ng using virtual threads
  • Loading branch information
rmannibucau committed Feb 20, 2024
1 parent 8217163 commit 269f020
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import io.yupiik.fusion.framework.api.RuntimeContainer;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class DefaultInstance<T> implements Instance<T> {
private final FusionBean<T> bean;
private final RuntimeContainer container;
private final T instance;
private final AtomicBoolean closed = new AtomicBoolean();
private final List<io.yupiik.fusion.framework.api.Instance<?>> dependencies;

public DefaultInstance(final FusionBean<T> bean, final RuntimeContainer container,
Expand All @@ -45,7 +47,10 @@ public T instance() {
}

@Override
public synchronized void close() {
public void close() {
if (!closed.compareAndSet(false, true)) {
return;
}
if (bean != null) {
bean.destroy(container, instance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import java.util.Iterator;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class IteratorSubscription<S> implements Flow.Subscription {
private final Flow.Subscriber<? super S> subscriber;
private final Iterator<S> remaining;
private final Lock lock = new ReentrantLock();

private volatile boolean cancelled;

Expand All @@ -30,14 +33,15 @@ public IteratorSubscription(final Flow.Subscriber<? super S> subscriber, final I
}

@Override
public synchronized void request(final long n) {
public void request(final long n) {
if (n <= 0) {
throw new IllegalArgumentException("Invalid request: " + n + ", should be > 0");
}
if (cancelled) {
return;
}

lock.lock();
try {
long loop = n;
while (loop > 0 && remaining.hasNext()) {
Expand All @@ -49,11 +53,13 @@ public synchronized void request(final long n) {
}
} catch (final RuntimeException re) {
subscriber.onError(re);
} finally {
lock.unlock();
}
}

@Override
public synchronized void cancel() {
public void cancel() {
cancelled = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -34,6 +36,7 @@ public class ServletInputStreamSubscription implements Flow.Subscription, ReadLi
private final ServletInputStream inputStream;
private final ReadableByteChannel channel;
private final Flow.Subscriber<? super ByteBuffer> downstream;
private final Lock lock = new ReentrantLock();

private volatile boolean cancelled = false;
private volatile long requested = 0;
Expand Down Expand Up @@ -73,44 +76,64 @@ private void readIfPossible() {
}

@Override
public synchronized void onDataAvailable() {
readIfPossible();
public void onDataAvailable() {
lock.lock();
try {
readIfPossible();
} finally {
lock.unlock();
}
}

@Override
public synchronized void onAllDataRead() {
public void onAllDataRead() {
if (cancelled) {
return;
}

readIfPossible();
if (!cancelled) {
downstream.onComplete();
doClose();
lock.lock();
try {
readIfPossible();
if (!cancelled) {
downstream.onComplete();
doClose();
}
} finally {
lock.unlock();
}
}

@Override
public synchronized void request(final long n) {
public void request(final long n) {
if (n <= 0) {
throw new IllegalArgumentException("Invalid request: " + n + ", should be > 0");
}
if (cancelled) {
return;
}
requested += n;
readIfPossible();
lock.lock();
try {
requested += n;
readIfPossible();
} finally {
lock.unlock();
}
}

@Override
public synchronized void onError(final Throwable throwable) {
public void onError(final Throwable throwable) {
LOGGER.log(Level.SEVERE, throwable, throwable::getMessage);
if (cancelled) {
return;
}
doClose();
downstream.onError(throwable);
cancelled = true;

lock.lock();
try {
doClose();
downstream.onError(throwable);
cancelled = true;
} finally {
lock.unlock();
}
}

private void doClose() {
Expand All @@ -124,7 +147,7 @@ private void doClose() {
}

@Override
public synchronized void cancel() {
public void cancel() {
cancelled = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected void writeResponse(final HttpServletResponse resp, final Response resp
final var channel = Channels.newChannel(stream);
body.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private boolean closed = false;
private volatile boolean closed = false;

@Override
public void onSubscribe(final Flow.Subscription subscription) {
Expand Down Expand Up @@ -199,16 +199,16 @@ public void onComplete() {
doClose();
}

private synchronized void doClose() {
private void doClose() {
if (closed) {
return;
}
closed = true;
try {
channel.close();
} catch (final IOException e) {
logger.log(SEVERE, e, e::getMessage);
}
closed = true;
}
});
}
Expand Down

0 comments on commit 269f020

Please sign in to comment.