From 6d10e0f40fd2411b9c4102a7c219535fe7a46066 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sun, 11 Feb 2024 09:37:40 -0500 Subject: [PATCH] Fix two flaky tests (#30278) * Fix a flaky test caused by race condition * Use notify and wait per reviewer request * Increase assert threshold to allow more room for extra threads * Move synchronized block outside of assertThrows * Replace this with isReady when defining synchronized block --- .../sdk/fn/data/BeamFnDataInboundObserverTest.java | 11 +++++++++++ .../util/UnboundedScheduledExecutorServiceTest.java | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java index bd279cee208a..91556cf2ac19 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -146,10 +147,15 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})), Collections.emptyList()); + AtomicBoolean isReady = new AtomicBoolean(false); Future future = executor.submit( () -> { observer.accept(dataWith("ABC")); + synchronized (isReady) { + isReady.set(true); + isReady.notify(); + } assertThrows( BeamFnDataInboundObserver.CloseException.class, () -> { @@ -165,6 +171,11 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio Future future2 = executor.submit( () -> { + synchronized (isReady) { + while (!isReady.get()) { + isReady.wait(); + } + } observer.close(); return null; }); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java index b8efa292bd2f..4b5bb55439a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java @@ -553,7 +553,7 @@ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception { LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool); // Ideally we would never create more than 100, however with contention it is still possible // some extra threads will be created. - assertTrue(largestPool <= 104); + assertTrue(largestPool <= 110); executorService.shutdown(); } }