Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change FnApiDoFnRunner to skip trySplit checkpoint requests if not draining and nothing has yet been claimed by the tracker. #32044

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -327,6 +329,11 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
* otherwise.
*/
private RestrictionTracker<RestrictionT, PositionT> currentTracker;
/**
* If non-null, set to true after currentTracker has had a tryClaim issued on it. Used to ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it can be null, mark @Nullable and document the meaning of a null value. (it looks to me like it is just "in between calls or during calls where it is not used" but it is hard to ascertain design intent of this class TBH)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I was being consistent with existing currentTracker but agree it is confusing and me might as well improve it a little.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally makes sense. I might do a pass on this file to narrow the scope of the suppressions. It seems like an important file to leave vulnerable. Thanks!

* checkpoint split requests if no progress was made.
*/
private @Nullable AtomicBoolean currentTrackerClaimed;

/**
* Only valid during {@link #processTimer} and {@link #processOnWindowExpiration}, null otherwise.
Expand Down Expand Up @@ -877,12 +884,18 @@ private void processElementForSplitRestriction(
currentElement = elem.withValue(elem.getValue().getKey());
currentRestriction = elem.getValue().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
private final AtomicBoolean claimed =
Preconditions.checkNotNull(currentTrackerClaimed);

@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
claimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -894,6 +907,7 @@ public void onClaimFailed(PositionT position) {}
currentRestriction = null;
currentWatermarkEstimatorState = null;
currentTracker = null;
currentTrackerClaimed = null;
}

this.stateAccessor.finalizeState();
Expand All @@ -909,12 +923,18 @@ private void processElementForWindowObservingSplitRestriction(
(Iterator<BoundedWindow>) elem.getWindows().iterator();
while (windowIterator.hasNext()) {
currentWindow = windowIterator.next();
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
private final AtomicBoolean claimed =
Preconditions.checkNotNull(currentTrackerClaimed);

@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
claimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -927,6 +947,7 @@ public void onClaimFailed(PositionT position) {}
currentWatermarkEstimatorState = null;
currentWindow = null;
currentTracker = null;
currentTrackerClaimed = null;
}

this.stateAccessor.finalizeState();
Expand All @@ -937,6 +958,8 @@ private void processElementForTruncateRestriction(
currentElement = elem.withValue(elem.getValue().getKey().getKey());
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
// For truncation, we don't set currentTrackerClaimed so that we enable checkpointing even if no
// progress is made.
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
Expand Down Expand Up @@ -989,6 +1012,8 @@ private void processElementForWindowObservingTruncateRestriction(
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
// We leave currentTrackerClaimed unset as we want to split regardless of if tryClaim is
// called.
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
Expand Down Expand Up @@ -1081,12 +1106,18 @@ private void processElementForWindowObservingSizedElementAndRestriction(
currentRestriction = elem.getValue().getKey().getValue().getKey();
currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
currentWindow = currentWindows.get(windowCurrentIndex);
currentTrackerClaimed = new AtomicBoolean(false);
currentTracker =
RestrictionTrackers.observe(
doFnInvoker.invokeNewTracker(processContext),
new ClaimObserver<PositionT>() {
private final AtomicBoolean claimed =
Preconditions.checkNotNull(currentTrackerClaimed);

@Override
public void onClaimed(PositionT position) {}
public void onClaimed(PositionT position) {
claimed.lazySet(true);
}

@Override
public void onClaimFailed(PositionT position) {}
Expand All @@ -1107,7 +1138,7 @@ public void onClaimFailed(PositionT position) {}

// Attempt to checkpoint the current restriction.
HandlesSplits.SplitResult splitResult =
trySplitForElementAndRestriction(0, continuation.resumeDelay());
trySplitForElementAndRestriction(0, continuation.resumeDelay(), false);

/**
* After the user has chosen to resume processing later, either the restriction is already
Expand All @@ -1132,7 +1163,7 @@ private abstract class SplittableFnDataReceiver
implements HandlesSplits, FnDataReceiver<WindowedValue> {
@Override
public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
return trySplitForElementAndRestriction(fractionOfRemainder, Duration.ZERO);
return trySplitForElementAndRestriction(fractionOfRemainder, Duration.ZERO, true);
}

@Override
Expand Down Expand Up @@ -1278,6 +1309,13 @@ private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
if (currentWindow == null) {
return null;
}
// We are requesting a checkpoint but have not yet progressed on the restriction, skip
// request.
if (fractionOfRemainder == 0
&& currentTrackerClaimed != null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it is not marked volatile those could be out of date, right? Based on this and your use of lazySet that seems to align with your intent, but I don't have a holistic understanding of this code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the setting of the atomicboolean object being consulted is protected by splitLock happensafter ordering. This is the same as currentTracker synchronization.

AtomicBoolean is used since the restriction tracker observing can happen in the background but the laziness is ok since the split would be retried and the real value should be eventually observed.

&& !currentTrackerClaimed.get()) {
return null;
}

SplitResultsWithStopIndex splitResult =
computeSplitForProcessOrTruncate(
Expand Down Expand Up @@ -1620,14 +1658,21 @@ static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult
}

private HandlesSplits.SplitResult trySplitForElementAndRestriction(
double fractionOfRemainder, Duration resumeDelay) {
double fractionOfRemainder, Duration resumeDelay, boolean requireClaimForCheckpoint) {
KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
WindowedSplitResult windowedSplitResult = null;
synchronized (splitLock) {
// There is nothing to split if we are between element and restriction processing calls.
if (currentTracker == null) {
return null;
}
// The tracker has not yet been claimed meaning that a checkpoint won't meaningfully advance.
if (fractionOfRemainder == 0
&& requireClaimForCheckpoint
&& currentTrackerClaimed != null
&& !currentTrackerClaimed.get()) {
return null;
}
// Make sure to get the output watermark before we split to ensure that the lower bound
// applies to the residual.
watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
Expand Down
Loading
Loading