Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two flaky tests #30278

Merged
merged 5 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -146,10 +147,15 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio
Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, (value) -> {})),
Collections.emptyList());

AtomicBoolean isReady = new AtomicBoolean(false);
Future<?> future =
executor.submit(
() -> {
observer.accept(dataWith("ABC"));
synchronized (isReady) {
isReady.set(true);
isReady.notify();
}
assertThrows(
BeamFnDataInboundObserver.CloseException.class,
() -> {
Expand All @@ -165,6 +171,11 @@ public void testCloseVisibleToAwaitCompletionCallerAndProducer() throws Exceptio
Future<?> future2 =
executor.submit(
() -> {
synchronized (isReady) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, actually this cannot be synchronized (either using this or isReady). Because synchronized block will block other thread's same synchronized block. Here it will wait for signal indefinitely and cause deadlock.

Copy link
Contributor Author

@shunping shunping Feb 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using synchronized block on isReady works here. I don't see there is a deadlock.

(1) If it runs the first synchronized block first, then it will call notify() and isReady is set to true. Then when the second synchronized block is executed, it will not reach the wait() call because isReady is true.

(2) If it runs the second synchronized block first, then it will wait() and give up the lock. Then in the first synchronized block, it will pick up the lock, set isReady to true, and then call notify().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I confused myself, LGTM

while (!isReady.get()) {
isReady.wait();
}
}
observer.close();
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ public void testThreadsAreAddedOnlyAsNeededWithContention() throws Exception {
LOG.info("Created {} threads to execute at most 100 parallel tasks", largestPool);
// Ideally we would never create more than 100, however with contention it is still possible
// some extra threads will be created.
assertTrue(largestPool <= 104);
assertTrue(largestPool <= 110);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, you mind explain briefly about the change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment above the change says, there could be more than 100 threads created in the UnboundedScheduledExecutorService. The previous threshold is set to 104 but it causes some flakiness recently. I attempt to increase to a little bit more to see if it can stop or at least reduce the chance of failure.

executorService.shutdown();
}
}
Loading