Skip to content

Commit

Permalink
test(web): try to make integration tests less flaky
Browse files Browse the repository at this point in the history
  • Loading branch information
kris7t committed Jul 10, 2024
1 parent 3943d64 commit 83c1598
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
public class AwaitTerminationExecutorServiceProvider extends ExecutorServiceProvider {
private final List<RestartableCachedThreadPool> servicesToShutDown = new ArrayList<>();

@Override
public ExecutorService get(String key) {
synchronized (servicesToShutDown) {
return super.get(key);
}
}

@Override
protected ExecutorService createInstance(String key) {
var instance = new RestartableCachedThreadPool(() -> super.createInstance(key));
Expand All @@ -35,8 +42,8 @@ public void waitForAllTasksToFinish() {

@Override
public void dispose() {
super.dispose();
synchronized (servicesToShutDown) {
super.dispose();
for (var executorService : servicesToShutDown) {
executorService.waitForTermination();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,34 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

public class RestartableCachedThreadPool implements ExecutorService {
private static final Logger LOG = LoggerFactory.getLogger(RestartableCachedThreadPool.class);

private ExecutorService delegate;
private final AtomicReference<ExecutorService> delegate = new AtomicReference<>();

private final Provider<ExecutorService> executorServiceProvider;

public RestartableCachedThreadPool(Provider<ExecutorService> executorServiceProvider) {
this.executorServiceProvider = executorServiceProvider;
delegate = executorServiceProvider.get();
delegate.set(executorServiceProvider.get());
}

public void waitForAllTasksToFinish() {
delegate.shutdown();
waitForTermination();
delegate = executorServiceProvider.get();
var oldDelegate = delegate.getAndSet(executorServiceProvider.get());
oldDelegate.shutdown();
waitForTermination(oldDelegate);
}

public void waitForTermination() {
waitForTermination(delegate.get());
}

private static void waitForTermination(ExecutorService executorService) {
boolean result = false;
try {
result = delegate.awaitTermination(10, TimeUnit.SECONDS);
result = executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for delegate executor to stop", e);
}
Expand All @@ -46,70 +51,71 @@ public void waitForTermination() {

@Override
public boolean awaitTermination(long arg0, @NotNull TimeUnit arg1) throws InterruptedException {
return delegate.awaitTermination(arg0, arg1);
return delegate.get().awaitTermination(arg0, arg1);
}

@Override
public void execute(@NotNull Runnable arg0) {
delegate.execute(arg0);
delegate.get().execute(arg0);
}

@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> arg0, long arg1,
@NotNull TimeUnit arg2)
public <T> @NotNull List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> arg0, long arg1,
@NotNull TimeUnit arg2)
throws InterruptedException {
return delegate.invokeAll(arg0, arg1, arg2);
return delegate.get().invokeAll(arg0, arg1, arg2);
}

@Override
public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> arg0) throws InterruptedException {
return delegate.invokeAll(arg0);
public <T> @NotNull List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> arg0)
throws InterruptedException {
return delegate.get().invokeAll(arg0);
}

@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> arg0, long arg1, @NotNull TimeUnit arg2)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(arg0, arg1, arg2);
return delegate.get().invokeAny(arg0, arg1, arg2);
}

@Override
public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> arg0) throws InterruptedException,
public <T> @NotNull T invokeAny(@NotNull Collection<? extends Callable<T>> arg0) throws InterruptedException,
ExecutionException {
return delegate.invokeAny(arg0);
return delegate.get().invokeAny(arg0);
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
return delegate.get().isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
return delegate.get().isTerminated();
}

@Override
public void shutdown() {
delegate.shutdown();
delegate.get().shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
public @NotNull List<Runnable> shutdownNow() {
return delegate.get().shutdownNow();
}

@Override
public <T> Future<T> submit(@NotNull Callable<T> arg0) {
return delegate.submit(arg0);
public <T> @NotNull Future<T> submit(@NotNull Callable<T> arg0) {
return delegate.get().submit(arg0);
}

@Override
public <T> Future<T> submit(@NotNull Runnable arg0, T arg1) {
return delegate.submit(arg0, arg1);
public <T> @NotNull Future<T> submit(@NotNull Runnable arg0, T arg1) {
return delegate.get().submit(arg0, arg1);
}

@Override
public Future<?> submit(@NotNull Runnable arg0) {
return delegate.submit(arg0);
public @NotNull Future<?> submit(@NotNull Runnable arg0) {
return delegate.get().submit(arg0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static org.junit.jupiter.api.Assertions.fail;

public abstract class WebSocketIntegrationTestClient {
private static final long TIMEOUT_MILLIS = Duration.ofSeconds(30).toMillis();
private static final long TIMEOUT_MILLIS = Duration.ofSeconds(10).toMillis();

private boolean finished = false;

Expand Down

0 comments on commit 83c1598

Please sign in to comment.