-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
extract semaphore logic out of WeightBoundedQueue to allow for sharing the weigher #32905
base: master
Are you sure you want to change the base?
Conversation
94041c4
to
881d43f
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
@scwhittle ready to for review, the failure to unrelated thanks! |
Run Java Precommit |
@@ -118,19 +118,17 @@ public final class StreamingDataflowWorker { | |||
*/ | |||
public static final int MAX_SINK_BYTES = 10_000_000; | |||
|
|||
public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return new WeightedSemaphore<>(maxWeight, new Semaphore(maxWeight, true), weigherFn); | ||
} | ||
|
||
void acquire(V value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public acquire and release? seems like basic functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return new WeightedSemaphore<>(maxWeight, new Semaphore(maxWeight, true), weigherFn); | ||
} | ||
|
||
void acquire(V value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name acquireUninterruptibly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -61,11 +62,10 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { | |||
Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory, | |||
int numCommitSenders, | |||
Consumer<CompleteCommit> onCommitComplete, | |||
String backendWorkerToken) { | |||
String backendWorkerToken, | |||
WeightedSemaphore<Commit> weigher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename weigher throughout to commitByteSemaphore or something capturing it is the limiter not just a weighing function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Thread putThread = | ||
new Thread( | ||
() -> { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the sleep? the element is already added by queue1 above so the sleep doesn't seem necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
return result; | ||
} | ||
|
||
/** Returns and removes the next value, or blocks until one is available. */ | ||
public @Nullable V take() throws InterruptedException { | ||
V result = queue.take(); | ||
limit.release(weigher.apply(result)); | ||
weigher.release(result); | ||
return result; | ||
} | ||
|
||
/** Returns the current weight of the queue. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed and just look at the weighted semaphore? It is confusing since it doesn't count just bytes in this queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final int maxWeight; | ||
private final Semaphore limit; | ||
private final Function<V, Integer> weigher; | ||
private final WeightedSemaphore<V> weigher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name weightedSemaphore? weigher just sounds like it is the weigh function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -86,16 +86,19 @@ public final class StreamingEngineWorkCommitter implements WorkCommitter { | |||
public static Builder builder() { | |||
return new AutoBuilder_StreamingEngineWorkCommitter_Builder() | |||
.setBackendWorkerToken(NO_BACKEND_WORKER_TOKEN) | |||
.setWeigher( | |||
WeightedSemaphore.create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe require this to be set by caller instead of hiding it? It would be nice for the callers to know about it because then they could display it on their status page
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}); | ||
putThread.start(); | ||
|
||
// Should only see the first value in the queue, since the queue is at capacity. putThread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could put the sleep here if you want to give a little more time for putThread to run and become blocked (doesn't matter if it doesn't or not so not racy but better test if it does block).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…to be injected and shared
881d43f
to
27a9758
Compare
back to you @scwhittle thank you |
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor cleanups
public static <V> WeightedBoundedQueue<V> create(int maxWeight, Function<V, Integer> weigherFn) { | ||
return new WeightedBoundedQueue<>( | ||
new LinkedBlockingQueue<>(), maxWeight, new Semaphore(maxWeight, true), weigherFn); | ||
public static <V> WeightedBoundedQueue<V> create(WeightedSemaphore<V> weigher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: weightedSemaphore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
public void acquireUninterruptibly(V value) { | ||
limit.acquireUninterruptibly(weigher.apply(value)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about capping the calculated weight to not be more than the max so it will be the single item available to process?
Otherwise it seems that we coudl block forever. this is currently done in the specific weigher logic but seems safer to do here and in release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
public static WeightedSemaphore<Commit> maxCommitByteSemaphore() { | ||
return WeightedSemaphore.create( | ||
MAX_QUEUED_COMMITS_BYTES, commit -> Math.min(MAX_QUEUED_COMMITS_BYTES, commit.getSize())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see other comment, remove the min here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); | ||
WeightedSemaphore.create( | ||
MAX_COMMIT_QUEUE_BYTES, | ||
commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove min.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Thread.sleep(100); | ||
assertEquals(MAX_WEIGHT, weigher.currentWeight()); | ||
assertEquals(1, queue1.size()); | ||
assertEquals(MAX_WEIGHT, weigher.currentWeight()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm, doing this twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
putThread.join(); | ||
|
||
assertEquals(MAX_WEIGHT, weigher.currentWeight()); | ||
assertEquals(1, queue2.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
poll from queue2 and verify the weight goes to zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
back to you thanks! @scwhittle |
Run Java PreCommit |
Factor out semaphore logic out of WeightBoundedQueue to allow for sharing the weigher
R: @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.