Skip to content

Commit

Permalink
Add more tests plus javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
phet committed Dec 23, 2024
1 parent 2562a5e commit d037588
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY = "job.duration.target.completion.in.minutes";
public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES = 360;

public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = "job.commit.partial.fail.task.fails.job.commit";
// If true, commit of different datasets will be performed in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ private void cleanLeftoverStagingData(WorkUnitStream workUnits, JobState jobStat

try {
if (!canCleanStagingData(jobState)) {
// TODO: consider whether should be `.warn` OR could even be `.info`???
// TODO: decide whether should be `.warn`, stay as `.info`, or change back to `.error`
LOG.info("Job " + jobState.getJobName() + " has unfinished commit sequences. Will not clean up staging data.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface RecommendScalingForWorkUnits {
/**
* Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s of {@link WorkUnitsSizeSummary} within {@link TimeBudget}.
*
* @param remainingWork may characterize a newly-generated batch of `WorkUnit` for which no processing has yet begun - or be the sub-portion
* @param remainingWork may characterize a newly-generated batch of `WorkUnit`s for which no processing has yet begun - or be the sub-portion
* of an in-progress job that still awaits processing
* @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should name a {@link org.apache.gobblin.source.Source}
* @param timeBudget the remaining target duration for processing the summarized `WorkUnit`s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,22 @@


/**
* Simple config-driven linear relationship between `remainingWork` and the resulting `setPoint`
* Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per:
*
*
* TODO: describe algo!!!!!
* a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size
* b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute
* 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU)
* c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon
* 2. calculate the per-container throughput of MWUs per minute
* 3. estimate the total per-container-minutes required to process all MWUs
* d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs
* 4. recommend the number of containers so all MWU processing should finish within the target number of minutes
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl {

public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute";
public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 10 * 1000L * 1000L * 60L; // 10MB/sec
public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec

@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
Expand All @@ -53,7 +59,7 @@ protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String
double meanMWUsThroughputPerContainerMinute = numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
double estContainerMinutesForAllMWUs = numMWUs / meanMWUsThroughputPerContainerMinute;

long targetNumMinutesForAllMWUs = jobTimeBudget.getMaxDurationDesiredMinutes();
long targetNumMinutesForAllMWUs = jobTimeBudget.getMaxTargetDurationMinutes();
// TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` - e.g. to decide whether to use `Math.ceil` vs. `Math.floor`

// TODO: decide how to account for container startup; working est. for GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
@RequiredArgsConstructor
public class TimeBudget {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private long maxDurationDesiredMinutes;
@NonNull private long maxTargetDurationMinutes;
@NonNull private long permittedOverageMinutes;

/** construct w/ {@link #permittedOverageMinutes} expressed as a percentage of {@link #maxDurationDesiredMinutes} */
/** construct w/ {@link #permittedOverageMinutes} expressed as a percentage of {@link #maxTargetDurationMinutes} */
public static TimeBudget withOveragePercentage(long maxDurationDesiredMinutes, double permittedOveragePercentage) {
return new TimeBudget(maxDurationDesiredMinutes, (long) (maxDurationDesiredMinutes * permittedOveragePercentage));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -199,16 +200,15 @@ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties job
}

protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
// TODO: make configurable! for now, aim for:
// - total job runtime of 2 hours
// - at least 15 minutes for the `CommitStepWorkflow`
// - leave at least 1 hour for the `ProcessWorkUnitsWorkflow` (so deduct at most 45 minutes for WU generation thus far)
long totalTimeMins = 120;
// TODO: make fully configurable! for now, cap Work Discovery at 45 mins and set aside 10 mins for the `CommitStepWorkflow`
long maxGenWUsMins = 45;
long commitStepMins = 15;
long commitStepMins = 10;
long totalTargetTimeMins = TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime());
long remainingMins = totalTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

Expand All @@ -40,6 +41,7 @@
*/
@Slf4j
public class FsScalingDirectivesRecipient implements ScalingDirectivesRecipient {
public static final int MAX_STRINGIFIED_DIRECTIVE_LENGTH = 255;
private final FileSystem fileSystem;
private final Path dirPath;

Expand All @@ -59,10 +61,19 @@ public FsScalingDirectivesRecipient(FileSystem fileSystem, String directivesDirP
public void receive(List<ScalingDirective> directives) throws IOException {
for (ScalingDirective directive : directives) {
String directiveAsString = ScalingDirectiveParser.asString(directive);
// TODO: handle directivePaths in excess of length limit
Path directivePath = new Path(dirPath, directiveAsString);
log.info("Adding ScalingDirective: {} at '{}' - {}", directiveAsString, directivePath, directive);
fileSystem.create(directivePath, false).close();
// handle directivePaths in excess of length limit
if (directiveAsString.length() <= MAX_STRINGIFIED_DIRECTIVE_LENGTH) {
Path directivePath = new Path(dirPath, directiveAsString);
log.info("Adding ScalingDirective: {} at '{}' - {}", directiveAsString, directivePath, directive);
fileSystem.create(directivePath, false).close();
} else {
ScalingDirectiveParser.StringWithPlaceholderPlusOverlay placeholderForm = ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(directive);
Path directivePath = new Path(dirPath, placeholderForm.getDirectiveStringWithPlaceholder());
log.info("Adding ScalingDirective with overlay: {} at '{}' - {}", directiveAsString, directivePath, directive);
try (FSDataOutputStream out = fileSystem.create(directivePath, false)) {
out.writeUTF(placeholderForm.getOverlayDefinitionString());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -174,6 +175,25 @@ protected static String definePlaceholder(String directiveWithPlaceholder, Strin
}
}


/**
* A two-part stringified form of a `ScalingDirective`, comprised of:
* - the "core" directive, but using {@link #OVERLAY_DEFINITION_PLACEHOLDER} in place of any overlay
* - the separately stringified {@link ProfileOverlay} - empty string, when no overlay
*
* This facilitates writing the directive as a (size-constrained) file name, with the overlay definition written as the file's contents.
*
* NOTE: Every `ProfileOverlay` will be invariably rendered separately; the length of a singular `String` representation has no bearing.
*
* @see #asStringWithPlaceholderPlusOverlay(ScalingDirective)
*/
@Data
public static class StringWithPlaceholderPlusOverlay {
private final String directiveStringWithPlaceholder;
private final String overlayDefinitionString;
}


// TODO: syntax to remove an attr while ALSO "adding" (so not simply setting to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )]

// syntax (described in class-level javadoc):
Expand Down Expand Up @@ -263,29 +283,50 @@ public static String asString(ScalingDirective directive) {
directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
sb.append(',').append(derivedFrom.getBasisProfileName());
sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-(");
ProfileOverlay overlay = derivedFrom.getOverlay();
if (overlay instanceof ProfileOverlay.Adding) {
ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", ");
}
if (adding.getAdditionPairs().size() > 0) {
sb.setLength(sb.length() - 2); // remove trailing ", "
}
} else {
ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
for (String key : removing.getRemovalKeys()) {
sb.append(key).append(", ");
}
if (removing.getRemovalKeys().size() > 0) {
sb.setLength(sb.length() - 2); // remove trailing ", "
}
}
sb.append(stringifyProfileOverlay(derivedFrom.getOverlay()));
sb.append(')');
});
return sb.toString();
}

/** @return the `scalingDirective` invariably stringified as two parts, a {@link StringWithPlaceholderPlusOverlay} - regardless of stringified length */
public static StringWithPlaceholderPlusOverlay asStringWithPlaceholderPlusOverlay(ScalingDirective directive) {
StringBuilder sb = new StringBuilder();
sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint());
Optional<String> optProfileOverlayStr = directive.getOptDerivedFrom().map(derivedFrom ->
stringifyProfileOverlay(derivedFrom.getOverlay())
);
directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
sb.append(',').append(derivedFrom.getBasisProfileName());
sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-(");
sb.append(OVERLAY_DEFINITION_PLACEHOLDER);
sb.append(')');
});
return new StringWithPlaceholderPlusOverlay(sb.toString(), optProfileOverlayStr.orElse(""));
}

private static String stringifyProfileOverlay(ProfileOverlay overlay) {
StringBuilder sb = new StringBuilder();
if (overlay instanceof ProfileOverlay.Adding) {
ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", ");
}
if (adding.getAdditionPairs().size() > 0) {
sb.setLength(sb.length() - 2); // remove trailing ", "
}
} else {
ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
for (String key : removing.getRemovalKeys()) {
sb.append(key).append(", ");
}
if (removing.getRemovalKeys().size() > 0) {
sb.setLength(sb.length() - 2); // remove trailing ", "
}
}
return sb.toString();
}

/** handle special naming of {@link #BASELINE_ID} and enforce {@link #MAX_PROFILE_IDENTIFIER_LENGTH} */
private static String identifyProfileName(String profileId, String directive) throws InvalidSyntaxException {
if (profileId.length() > MAX_PROFILE_IDENTIFIER_LENGTH) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
@Override
protected void startUp() throws IOException {
JobState jobState = new JobState(ConfigUtils.configToProperties(this.config));
// since `super.startUp()` will invoke `createScalingDirectiveSource()`, which needs `this.fs`, create it beforehand
this.fs = JobStateUtils.openFileSystem(jobState);
// since `super.startUp()` will invoke `createScalingDirectiveSource()`, which needs `this.fs`, create that before deferring
super.startUp();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;


/** Test for {@link RecommendScalingForWorkUnitsLinearHeuristicImpl} */
public class RecommendScalingForWorkUnitsLinearHeuristicImplTest {

private RecommendScalingForWorkUnitsLinearHeuristicImpl scalingHeuristic;
@Mock private JobState jobState;
@Mock private WorkUnitsSizeSummary workUnitsSizeSummary;
@Mock private TimeBudget timeBudget;

@BeforeMethod
public void setUp() {
scalingHeuristic = new RecommendScalingForWorkUnitsLinearHeuristicImpl();
MockitoAnnotations.openMocks(this);
}

@Test
public void testCalcDerivationSetPoint() {
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER), Mockito.anyInt()))
.thenReturn(4); // 4 workers per container
Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE), Mockito.anyLong()))
.thenReturn(100L * 1000 * 1000); // 100MB/minute
long targetTimeBudgetMinutes = 75L;
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes);

long totalNumMWUs = 3000L;
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6); // 500MB
// parallelization capacity = 20 container-slots
// per-container-slot rate = 5 mins / MWU
long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate = 4 MWU / minute
long totalNumContainerMinutesAllMWUs = totalNumMWUs / numMWUsPerMinutePerContainer; // 750 minutes
long expectedSetPoint = totalNumContainerMinutesAllMWUs / targetTimeBudgetMinutes;

int resultA = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", timeBudget, jobState);
Assert.assertEquals(resultA, expectedSetPoint);

// verify: 3x MWUs ==> 3x the recommended set point
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs * 3);
int tripledResult = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", timeBudget, jobState);
Assert.assertEquals(tripledResult, resultA * 3);

// reduce the target duration by a third, and verify: 3/2 the recommended set point
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(2 * (targetTimeBudgetMinutes / 3));
int reducedTimeBudgetResult = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", timeBudget, jobState);
Assert.assertEquals(reducedTimeBudgetResult, (long) Math.round(expectedSetPoint * 3 * (3.0 / 2.0)));
}
}
Loading

0 comments on commit d037588

Please sign in to comment.