From 06490efc81f288ee051d0e938b8772c6d3e8285b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 9 Feb 2024 10:30:21 -0500 Subject: [PATCH 1/5] Fix a flaky test caused by race condition --- .../apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java index bd279cee208a..ee7a2d9aebaa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java @@ -165,6 +165,7 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio Future future2 = executor.submit( () -> { + Thread.sleep(500); // give the previous future some time to reach the inf while loop observer.close(); return null; }); From 3d72cbfb8d15c081ded221b94b9d8327ac208b1e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 9 Feb 2024 13:37:57 -0500 Subject: [PATCH 2/5] Use notify and wait per reviewer request --- .../data/BeamFnDataInboundObserverTest.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java index ee7a2d9aebaa..c5b4cbe5c6b5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -146,6 +147,7 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})), Collections.emptyList()); + AtomicBoolean isReady = new AtomicBoolean(false); Future future = executor.submit( () -> { @@ -153,11 +155,17 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio assertThrows( BeamFnDataInboundObserver.CloseException.class, () -> { - while (true) { - // keep trying to send messages since the queue buffers messages and the - // consumer - // may have not yet noticed the bad state. - observer.accept(dataWith("ABC")); + { + synchronized (this) { + isReady.set(true); + notify(); + } + while (true) { + // keep trying to send messages since the queue buffers messages and the + // consumer + // may have not yet noticed the bad state. + observer.accept(dataWith("ABC")); + } } }); return null; @@ -165,7 +173,11 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio Future future2 = executor.submit( () -> { - Thread.sleep(500); // give the previous future some time to reach the inf while loop + synchronized (this) { + while (!isReady.get()) { + wait(); + } + } observer.close(); return null; }); From d437cab032beebdeb372ed3af7fecc08f986da1d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 9 Feb 2024 14:01:55 -0500 Subject: [PATCH 3/5] Increase assert threshold to allow more room for extra threads --- .../beam/sdk/util/UnboundedScheduledExecutorServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java index b8efa292bd2f..4b5bb55439a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java @@ -553,7 +553,7 @@ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception { LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool); // Ideally we would never create more than 100, however with contention it is still possible // some extra threads will be created. - assertTrue(largestPool <= 104); + assertTrue(largestPool <= 110); executorService.shutdown(); } } From 9ffd54bd474e4c910922f58fd0e37ac134c9de92 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 9 Feb 2024 14:38:52 -0500 Subject: [PATCH 4/5] Move synchronized block outside of assertThrows --- .../data/BeamFnDataInboundObserverTest.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java index c5b4cbe5c6b5..affc7eded24f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java @@ -152,20 +152,18 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio executor.submit( () -> { observer.accept(dataWith("ABC")); + synchronized (this) { + isReady.set(true); + notify(); + } assertThrows( BeamFnDataInboundObserver.CloseException.class, () -> { - { - synchronized (this) { - isReady.set(true); - notify(); - } - while (true) { - // keep trying to send messages since the queue buffers messages and the - // consumer - // may have not yet noticed the bad state. - observer.accept(dataWith("ABC")); - } + while (true) { + // keep trying to send messages since the queue buffers messages and the + // consumer + // may have not yet noticed the bad state. + observer.accept(dataWith("ABC")); } }); return null; From 4a004f5b3f349fd0ddd618ee36a2b64676416af9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 10 Feb 2024 21:45:24 -0500 Subject: [PATCH 5/5] Replace this with isReady when defining synchronized block --- .../beam/sdk/fn/data/BeamFnDataInboundObserverTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java index affc7eded24f..91556cf2ac19 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java @@ -152,9 +152,9 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio executor.submit( () -> { observer.accept(dataWith("ABC")); - synchronized (this) { + synchronized (isReady) { isReady.set(true); - notify(); + isReady.notify(); } assertThrows( BeamFnDataInboundObserver.CloseException.class, @@ -171,9 +171,9 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio Future future2 = executor.submit( () -> { - synchronized (this) { + synchronized (isReady) { while (!isReady.get()) { - wait(); + isReady.wait(); } } observer.close();