From 7caf9ee2319f7d5f12f41477aa0ea5c1575a2bdc Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Thu, 12 Dec 2024 20:39:37 -0800 Subject: [PATCH] Implement GoT Dynamic auto-scaling using `WorkUnitsSizeSummary`-driven linear heuristic --- .../RecommendScalingForWorkUnits.java | 56 +++++++++++ ...tractRecommendScalingForWorkUnitsImpl.java | 77 +++++++++++++++ ...calingForWorkUnitsLinearHeuristicImpl.java | 81 ++++++++++++++++ .../temporal/ddm/util/JobStateUtils.java | 34 +++++++ .../ddm/work/GenerateWorkUnitsResult.java | 1 + .../gobblin/temporal/ddm/work/TimeBudget.java | 47 ++++++++++ .../temporal/ddm/work/WorkUnitClaimCheck.java | 6 ++ .../ddm/work/WorkUnitsSizeSummary.java | 22 +++++ .../ddm/worker/WorkFulfillmentWorker.java | 9 +- .../impl/ExecuteGobblinWorkflowImpl.java | 93 +++++++++++++++++-- .../dynamic/FsScalingDirectiveSource.java | 13 ++- .../dynamic/FsScalingDirectivesRecipient.java | 68 ++++++++++++++ .../temporal/dynamic/ProfileDerivation.java | 14 ++- .../temporal/dynamic/ProfileOverlay.java | 45 +++++++-- .../temporal/dynamic/ScalingDirective.java | 44 ++++++++- .../dynamic/ScalingDirectivesRecipient.java | 30 ++++++ .../workflows/metrics/EventTimer.java | 3 + .../workflows/metrics/TemporalEventTimer.java | 12 ++- ...tractDynamicScalingYarnServiceManager.java | 4 +- ...ourceDynamicScalingYarnServiceManager.java | 21 +++-- .../dynamic/ProfileDerivationTest.java | 6 +- .../DynamicScalingYarnServiceManagerTest.java | 2 +- 22 files changed, 636 insertions(+), 52 deletions(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java new file mode 100644 index 00000000000..a31fe3bfabd --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.util.List; +import java.util.Properties; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; + + + +/** + * Activity to suggest the Dynamic Scaling warranted to complete processing of some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s + * within {@link TimeBudget}, through a combination of Workforce auto-scaling and Worker right-sizing. + * + * As with all {@link ActivityInterface}s, this is stateless, so the {@link ScalingDirective}(s) returned "stand alone", presuming nothing of current + * {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}. It thus falls to the caller to coordinate whether to apply the directive(s) as-is, + * or first to adjust in light of scaling levels already in the current {@link org.apache.gobblin.temporal.dynamic.WorkforcePlan}. + */ +@ActivityInterface +public interface RecommendScalingForWorkUnits { + + /** + * Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s of {@link WorkUnitsSizeSummary} within {@link TimeBudget}. + * + * @param remainingWork may characterize a newly-generated batch of `WorkUnit` for which no processing has yet begun - or be the sub-portion + * of an in-progress job that still awaits processing + * @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should name a {@link org.apache.gobblin.source.Source} + * @param timeBudget the remaining target duration for processing the summarized `WorkUnit`s + * @param jobProps all job props, to either guide the recommendation or better contextualize the nature of the `remainingWork` + * @return the {@link ScalingDirective}s to process the summarized {@link WorkUnit}s within {@link TimeBudget} + */ + @ActivityMethod + List recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java new file mode 100644 index 00000000000..a0d3fd11e55 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * Skeletal impl handling all foundational concerns, but leaving it to a concrete impl to actually choose the auto-scaling + * {@link ScalingDirective#getSetPoint()} for the exactly one {@link ScalingDirective} being recommended. + */ +@Slf4j +public abstract class AbstractRecommendScalingForWorkUnitsImpl implements RecommendScalingForWorkUnits { + + // TODO: decide whether this name ought to be configurable - or instead a predictable name that callers may expect (and possibly adjust) + public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc"; + + @Override + public List recommendScaling(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) { + // NOTE: no attempt to determine the current scaling - per `RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must "stand alone", + // presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing` + JobState jobState = new JobState(jobProps); + ScalingDirective procWUsWorkerScaling = new ScalingDirective( + calcProfileDerivationName(jobState), + calcDerivationSetPoint(remainingWork, sourceClass, timeBudget, jobState), + System.currentTimeMillis(), + Optional.of(calcProfileDerivation(calcBasisProfileName(jobState), remainingWork, sourceClass, jobState)) + ); + log.info("Recommended re-scaling to process work units: {}", procWUsWorkerScaling); + return Arrays.asList(procWUsWorkerScaling); + } + + protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState); + + protected ProfileDerivation calcProfileDerivation(String basisProfileName, WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) { + // TODO: implement right-sizing!!! (for now just return unchanged) + return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged()); + } + + protected String calcProfileDerivationName(JobState jobState) { + // TODO: if we ever return > 1 directive, append a monotonically increasing number to avoid collisions + return DEFAULT_PROFILE_DERIVATION_NAME; + } + + protected String calcBasisProfileName(JobState jobState) { + return WorkforceProfiles.BASELINE_NAME; // always build upon baseline + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java new file mode 100644 index 00000000000..909eaa929a1 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; +import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; + + +/** + * Simple config-driven linear relationship between `remainingWork` and the resulting `setPoint` + * + * + * TODO: describe algo!!!!! + */ +@Slf4j +public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { + + public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.reference.numBytesPerMinute"; + public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 10 * 1000L * 1000L * 60L; // 10MB/sec + + @Override + protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState) { + // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); + double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); + int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) + long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + log.info("Calculating auto-scaling (for {} remaining work units within {}) using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}", + numMWUs, timeBudget, bytesPerMinuteProcRate, meanBytesPerMWU); + + // calc how many container*minutes to process all MWUs, based on mean MWU size + double minutesProcTimeForMeanMWU = meanBytesPerMWU / bytesPerMinuteProcRate; + double meanMWUsThroughputPerContainerMinute = numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU; + double estContainerMinutesForAllMWUs = numMWUs / meanMWUsThroughputPerContainerMinute; + + long targetNumMinutes = timeBudget.getMaxDurationDesiredMinutes(); + // TODO: take into account `timeBudget.getPermittedOverageMinutes()` - e.g. to decide whether to use `Math.ceil` vs. `Math.floor` + + // TODO: decide how to account for container startup; working est. for GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m) + // e.g. can we amortize away / ignore when `targetNumMinutes >> workerRequestToReadyNumMinutes`? + // TODO take into account that MWUs are quantized into discrete chunks; this est. uses avg and presumes to divide partial MWUs amongst workers + // can we we mostly ignore if we keep MWU "chunk size" "small-ish", like maybe even just `duration(max(MWU)) <= targetNumMinutes/2)`? + + int recommendedNumContainers = (int) Math.floor(estContainerMinutesForAllMWUs / targetNumMinutes); + log.info("Recommended auto-scaling: {} containers, given: minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); " + + "est. container*minutes to complete ALL ({}) MWUs = {}", + recommendedNumContainers, minutesProcTimeForMeanMWU, meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs); + return recommendedNumContainers; + } + + protected int calcPerContainerWUCapacity(JobState jobState) { + int numWorkersPerContainer = jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS); + int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; // TODO: get from config, once that's implemented + return numWorkersPerContainer * numThreadsPerWorker; + } + + protected long calcAmortizedBytesPerMinute(JobState jobState) { + return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java index 52da7b5299b..e6066019562 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java @@ -65,6 +65,8 @@ public class JobStateUtils { public static final String INPUT_DIR_NAME = "input"; // following MRJobLauncher.INPUT_DIR_NAME public static final String OUTPUT_DIR_NAME = "output"; // following MRJobLauncher.OUTPUT_DIR_NAME + public static final String DYNAMIC_SCALING_RELATIVE_DIR_PATH = "dynamic-scaling/directives"; + public static final String DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH = "dynamic-scaling/dropped-directives"; public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true; // reuse same handle among activities executed by the same worker @@ -141,6 +143,38 @@ public static Path getWorkUnitsPath(Path workDirRoot) { return new Path(workDirRoot, INPUT_DIR_NAME); } + /** + * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same + * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} + * @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside + */ + public static Path getDynamicScalingPath(JobState jobState) { + return getDynamicScalingPath(getWorkDirRoot(jobState)); + } + + /** + * @return {@link Path} where {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside + */ + public static Path getDynamicScalingPath(Path workDirRoot) { + return new Path(workDirRoot, DYNAMIC_SCALING_RELATIVE_DIR_PATH); + } + + /** + * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same + * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} + * @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed + */ + public static Path getDynamicScalingErrorsPath(JobState jobState) { + return getDynamicScalingErrorsPath(getWorkDirRoot(jobState)); + } + + /** + * @return {@link Path} where any {@link org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed + */ + public static Path getDynamicScalingErrorsPath(Path workDirRoot) { + return new Path(workDirRoot, DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH); + } + /** * ATTENTION: derives path according to {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java index fee9d5ddd7a..6d20d9fc152 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java @@ -38,6 +38,7 @@ public class GenerateWorkUnitsResult { // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" @NonNull private int generatedWuCount; + // TODO: characterize the WUs more thoroughly, by also including destination info, and with more specifics, like src+dest location, I/O config, throttling... @NonNull private String sourceClass; @NonNull private WorkUnitsSizeSummary workUnitsSizeSummary; // Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java new file mode 100644 index 00000000000..8800ab9e4b8 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import lombok.AccessLevel; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + + +/** + * Duration for whatever work to complete, with a permitted overage to indicate firm-ness/soft-ness. + * Values are in minutes, befitting the granularity of inevitable companion activities, like: + * - network operations - opening connections, I/O, retries + * - starting/scaling workers + */ +@Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class TimeBudget { + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" + @NonNull private long maxDurationDesiredMinutes; + @NonNull private long permittedOverageMinutes; + + /** construct w/ {@link #permittedOverageMinutes} expressed as a percentage of {@link #maxDurationDesiredMinutes} */ + public static TimeBudget withOveragePercentage(long maxDurationDesiredMinutes, double permittedOveragePercentage) { + return new TimeBudget(maxDurationDesiredMinutes, (long) (maxDurationDesiredMinutes * permittedOveragePercentage)); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java index 71d44094c5b..0c068cb3ed7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java @@ -42,6 +42,12 @@ * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by claim-check, where the `workUnitPath` is resolved * against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. see: * @see Claim-Check Pattern + * + * TODO: if we're to generalize Work Prediction+Prioritization across multiplexed jobs, each having its own separate time budget, every WU claim-check + * standing on its own would allow an external observer to inspect only the task queue w/o correlation between workflow histories. For that, decide whether + * to add job-identifying metadata here or just tack on time budget (aka. SLA deadline) info. Either could be tunneled within the filename in the manner + * of `JobLauncherUtils.WorkUnitPathCalculator.calcNextPathWithTunneledSizeInfo` - in fact, by convention, the job ID / flow ID already is... we just don't + * recover it herein. */ @Data @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java index d6bccdd537d..37b18493046 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java @@ -26,6 +26,8 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnore; + import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; @@ -50,4 +52,24 @@ public class WorkUnitsSizeSummary { @NonNull private double quantilesWidth; @NonNull private List topLevelQuantilesMinSizes; @NonNull private List constituentQuantilesMinSizes; + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getTopLevelWorkUnitsMeanSize() { + return this.totalSize / this.topLevelWorkUnitsCount; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getConstituentWorkUnitsMeanSize() { + return this.totalSize / this.constituentWorkUnitsCount; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getTopLevelWorkUnitsMedianSize() { + return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2); + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public double getConstituentWorkUnitsMedianSize() { + return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2); + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java index 74737af5988..27348858e7c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java @@ -29,6 +29,7 @@ import org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl; import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl; import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl; @@ -48,14 +49,14 @@ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) { @Override protected Class[] getWorkflowImplClasses() { - return new Class[] { CommitStepWorkflowImpl.class, ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class, - NestingExecOfProcessWorkUnitWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class }; + return new Class[] { ExecuteGobblinWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, + CommitStepWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class }; } @Override protected Object[] getActivityImplInstances() { - return new Object[] { new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(), - new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()}; + return new Object[] { new SubmitGTEActivityImpl(), new GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), new ProcessWorkUnitImpl(), + new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() }; } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 1d5a63b7366..819fc421cb0 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -20,14 +20,20 @@ import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import lombok.extern.slf4j.Slf4j; + import com.typesafe.config.ConfigFactory; import io.temporal.activity.ActivityOptions; @@ -36,28 +42,32 @@ import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; -import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity; import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits; +import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits; import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; import org.apache.gobblin.temporal.ddm.work.CommitStats; import org.apache.gobblin.temporal.ddm.work.DirDeletionResult; import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats; import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow; import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectivesRecipient; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectivesRecipient; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; import org.apache.gobblin.temporal.workflows.metrics.EventTimer; import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer; -import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; import org.apache.gobblin.util.PropertiesUtils; @@ -79,8 +89,21 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS) .build(); - private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, - GEN_WUS_ACTIVITY_OPTS); + private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS); + + private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(5)) + .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS) + .build(); + private final RecommendScalingForWorkUnits recommendScalingStub = Workflow.newActivityStub(RecommendScalingForWorkUnits.class, + RECOMMEND_SCALING_ACTIVITY_OPTS); private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = RetryOptions.newBuilder() .setInitialInterval(Duration.ofSeconds(3)) @@ -100,16 +123,32 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext); timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME` EventTimer jobSuccessTimer = timerFactory.createJobTimer(); - Optional generateWorkUnitResultsOpt = Optional.empty(); + Optional optGenerateWorkUnitResult = Optional.empty(); WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext); boolean isSuccessful = false; try { - generateWorkUnitResultsOpt = Optional.of(genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext)); - WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary(); + GenerateWorkUnitsResult generateWorkUnitResult = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext); + optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult); + WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResult.getWorkUnitsSizeSummary(); int numWUsGenerated = safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary); int numWUsCommitted = 0; CommitStats commitStats = CommitStats.createEmpty(); if (numWUsGenerated > 0) { + TimeBudget timeBudget = calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps); + List scalingDirectives = + recommendScalingStub.recommendScaling(wuSizeSummary, generateWorkUnitResult.getSourceClass(), timeBudget, jobProps); + log.info("Recommended scaling to process WUs within {}: {}", timeBudget, scalingDirectives); + try { + ScalingDirectivesRecipient recipient = createScalingDirectivesRecipient(jobProps); + List adjustedScalingDirectives = adjustRecommendedScaling(scalingDirectives); + log.info("Submitting (adjusted) scaling directives: {}", adjustedScalingDirectives); + recipient.receive(adjustedScalingDirectives); + // TODO: when eliminating the "GenWUs Worker", pause/block until scaling is complete + } catch (IOException e) { + // TODO: decide whether this should be a hard failure; for now, "gracefully degrade" by continuing processing + log.error("Failed to send re-scaling directive", e); + } + ProcessWorkUnitsWorkflow processWUsWorkflow = createProcessWorkUnitsWorkflow(jobProps); commitStats = processWUsWorkflow.process(wuSpec); numWUsCommitted = commitStats.getNumCommittedWorkUnits(); @@ -128,8 +167,8 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid flight try { log.info("Cleaning up work dirs for job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); - if (generateWorkUnitResultsOpt.isPresent()) { - cleanupWorkDirs(wuSpec, eventSubmitterContext, generateWorkUnitResultsOpt.get().getWorkDirPathsToDelete()); + if (optGenerateWorkUnitResult.isPresent()) { + cleanupWorkDirs(wuSpec, eventSubmitterContext, optGenerateWorkUnitResult.get().getWorkDirPathsToDelete()); } else { log.warn("Skipping cleanup of work dirs for job due to no output from GenerateWorkUnits"); } @@ -154,6 +193,35 @@ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties job return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, childOpts); } + protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) { + // TODO: make configurable! for now, aim for: + // - total job runtime of 2 hours + // - at least 15 minutes for the `CommitStepWorkflow` + // - leave at least 1 hour for the `ProcessWorkUnitsWorkflow` (so deduct at most 45 minutes for WU generation thus far) + long totalTimeMins = 120; + long maxGenWUsMins = 45; + long commitStepMins = 15; + double permittedOveragePercentage = .2; + Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime()); + long remainingMins = totalTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins; + return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage); + } + + protected List adjustRecommendedScaling(List recommendedScalingDirectives) { + // TODO: make any adjustments - e.g. decide whether to shutdown the (often oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it + if (recommendedScalingDirectives.size() == 0) { + return recommendedScalingDirectives; + } + // TODO: be more robust and code more defensively, rather than presuming the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl` + ArrayList adjustedScaling = new ArrayList<>(recommendedScalingDirectives); + ScalingDirective firstDirective = adjustedScaling.get(0); + // deduct one for the (already existing) `GenerateWorkUnits` worker + adjustedScaling.set(0, firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1)); + // CAUTION: filter out set point zero, which (depending upon `.getProfileName()`) *could* down-scale away our only current worker + // TODO: consider whether to allow either a) "pre-defining" a profile w/ set point zero, available for later use OR b) down-scaling to zero to pause worker + return adjustedScaling.stream().filter(sd -> sd.getSetPoint() > 0).collect(Collectors.toList()); + } + protected static WUProcessingSpec createProcessingSpec(Properties jobProps, EventSubmitterContext eventSubmitterContext) { JobState jobState = new JobState(jobProps); URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState); @@ -169,6 +237,13 @@ protected static WUProcessingSpec createProcessingSpec(Properties jobProps, Even return wuSpec; } + protected ScalingDirectivesRecipient createScalingDirectivesRecipient(Properties jobProps) throws IOException { + JobState jobState = new JobState(jobProps); + FileSystem fs = JobStateUtils.openFileSystem(jobState); + Path directivesDirPath = JobStateUtils.getDynamicScalingPath(JobStateUtils.getWorkDirRoot(jobState)); + return new FsScalingDirectivesRecipient(fs, directivesDirPath); + } + private void cleanupWorkDirs(WUProcessingSpec workSpec, EventSubmitterContext eventSubmitterContext, Set directoriesToClean) throws IOException { // TODO: Add configuration to support cleaning up historical work dirs from same job name diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java index 6725c58b6e3..a7d38256b2b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java @@ -40,6 +40,8 @@ * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content. * Within-length scaling directives are no-data, zero-length files. When backed by HDFS, reading such zero-length scaling directive filenames is a * NameNode-only operation, with their metadata-only nature conserving NN object count/quota. + * + * @see FsScalingDirectivesRecipient */ @Slf4j public class FsScalingDirectiveSource implements ScalingDirectiveSource { @@ -49,10 +51,15 @@ public class FsScalingDirectiveSource implements ScalingDirectiveSource { private final ScalingDirectiveParser parser = new ScalingDirectiveParser(); /** Read from `directivesDirPath` of `fileSystem`, and optionally move invalid/rejected directives to `optErrorsDirPath` */ - public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional optErrorsDirPath) { + public FsScalingDirectiveSource(FileSystem fileSystem, Path directivesDirPath, Optional optErrorsDirPath) { this.fileSystem = fileSystem; - this.dirPath = new Path(directivesDirPath); - this.optErrorsPath = optErrorsDirPath.map(Path::new); + this.dirPath = directivesDirPath; + this.optErrorsPath = optErrorsDirPath; + } + + /** Read from `directivesDirPath` of `fileSystem`, and optionally move invalid/rejected directives to `optErrorsDirPath` */ + public FsScalingDirectiveSource(FileSystem fileSystem, String directivesDirPath, Optional optErrorsDirPath) { + this(fileSystem, new Path(directivesDirPath), optErrorsDirPath.map(Path::new)); } /** diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java new file mode 100644 index 00000000000..7feeb8a1ece --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +/** + * A {@link ScalingDirectivesRecipient} that writes {@link ScalingDirective}s to a {@link FileSystem} directory, where each directive is the name + * of a single file inside the directory. + * + * TODO: per {@link FsScalingDirectiveSource} - directives too long for one filename path component MUST (but currently do NOT!) use the + * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and write their {@link ProfileDerivation} overlay as the file's data/content. + * + * Within-length scaling directives are no-data, zero-length files. When backed by HDFS, writing such zero-length scaling directive filenames is a + * NameNode-only operation, with their metadata-only nature conserving NN object count/quota. + * + * @see FsScalingDirectiveSource + */ +@Slf4j +public class FsScalingDirectivesRecipient implements ScalingDirectivesRecipient { + private final FileSystem fileSystem; + private final Path dirPath; + + /** Write to `directivesDirPath` of `fileSystem` */ + public FsScalingDirectivesRecipient(FileSystem fileSystem, Path directivesDirPath) throws IOException { + this.fileSystem = fileSystem; + this.dirPath = directivesDirPath; + this.fileSystem.mkdirs(this.dirPath); + } + + /** Write to `directivesDirPath` of `fileSystem` */ + public FsScalingDirectivesRecipient(FileSystem fileSystem, String directivesDirPath) throws IOException { + this(fileSystem, new Path(directivesDirPath)); + } + + @Override + public void receive(List directives) throws IOException { + for (ScalingDirective directive : directives) { + String directiveAsString = ScalingDirectiveParser.asString(directive); + // TODO: handle directivePaths in excess of length limit + Path directivePath = new Path(dirPath, directiveAsString); + log.info("Adding ScalingDirective: {} at '{}' - {}", directiveAsString, directivePath, directive); + fileSystem.create(directivePath, false).close(); + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java index 1001150df3e..e17bc29e21f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java @@ -20,16 +20,24 @@ import java.util.Optional; import java.util.function.Function; -import com.typesafe.config.Config; +import lombok.AccessLevel; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import com.typesafe.config.Config; /** * Defines a new {@link WorkerProfile} by evolving from another profile, the basis. Such evolution creates a new immutable profile through * {@link ProfileOverlay}, which either adds or removes properties from the basis profile's definition. That basis profile must already exist. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor public class ProfileDerivation { /** Flags when the basis profile was NOT found */ @@ -41,8 +49,8 @@ public UnknownBasisException(String basisName) { } } - private final String basisProfileName; - private final ProfileOverlay overlay; + @NonNull private String basisProfileName; + @NonNull private ProfileOverlay overlay; /** @return a new profile definition through evolution from the basis profile, which is to be obtained via `basisResolver` */ public Config formulateConfig(Function> basisResolver) throws UnknownBasisException { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java index 64b5d8ec30b..2e2ffc76046 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java @@ -25,13 +25,20 @@ import java.util.Set; import java.util.stream.Collectors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; +import lombok.AccessLevel; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; /** Alt. forms of profile overlay to evolve one profile {@link Config} into another. Two overlays may be combined hierarchically into a new overlay. */ +@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle impls (`MINIMAL..`, as all defs below) public interface ProfileOverlay { /** @return a new, evolved {@link Config}, by application of this overlay */ @@ -40,21 +47,36 @@ public interface ProfileOverlay { /** @return a new overlay, by combining this overlay *over* another */ ProfileOverlay over(ProfileOverlay other); + /** @return a new overlay that would change nothing when used in a {@link ProfileDerivation} (beyond introducing a distinct name) */ + static ProfileOverlay unchanged() { + return new Adding(); + } + /** A key-value pair/duple */ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor class KVPair { - private final String key; - private final String value; + @NonNull private String key; + @NonNull private String value; } /** An overlay to evolve any profile by adding key-value pairs */ @Data - @RequiredArgsConstructor // explicit, due to second, variadic ctor + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @RequiredArgsConstructor // explicit, due to other ctors class Adding implements ProfileOverlay { - private final List additionPairs; + @NonNull private List additionPairs; + + // IMPORTANT: for jackson (de)serialization + public Adding() { + this(new ArrayList<>()); + } + /** variadic, for convenience */ public Adding(KVPair... kvPairs) { this(Arrays.asList(kvPairs)); } @@ -90,10 +112,13 @@ public ProfileOverlay over(ProfileOverlay other) { /** An overlay to evolve any profile by removing named keys */ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor // explicit, due to second, variadic ctor class Removing implements ProfileOverlay { - private final List removalKeys; + @NonNull private List removalKeys; + /** variadic, for convenience */ public Removing(String... keys) { this(Arrays.asList(keys)); } @@ -128,9 +153,11 @@ public ProfileOverlay over(ProfileOverlay other) { /** An overlay to evolve any profile by adding key-value pairs while also removing named keys */ @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization class Combo implements ProfileOverlay { - private final Adding adding; - private final Removing removing; + @NonNull private Adding adding; + @NonNull private Removing removing; /** restricted-access ctor: instead use {@link Combo#normalize(Adding, Removing)} */ private Combo(Adding adding, Removing removing) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java index 8af9e95249a..5cfda7e205b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java @@ -18,9 +18,17 @@ package org.apache.gobblin.temporal.dynamic; import java.util.Optional; + +import lombok.AccessLevel; import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; import lombok.RequiredArgsConstructor; +import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; /** * Core abstraction to model scaling adjustment: a directive originates at a specific moment in time to provide a set point for a given worker profile. @@ -28,12 +36,33 @@ * define that new profile through a {@link ProfileDerivation} referencing a known profile. Once defined, a worker profile MUST NOT be redefined. */ @Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor +/* + * NOTE: due to type erasure, neither alternative approach works when returning a collection of `ScalingDirective`s (only when a direct `ScalingDirective`) + * see: https://github.com/FasterXML/jackson-databind/issues/336 + * instead, `@JsonProperty("this")` clarifies the class name in serialized form + * @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "$this") + * @JsonTypeInfo(include=JsonTypeInfo.As.WRAPPER_OBJECT, use=JsonTypeInfo.Id.NAME) + */ +@JsonPropertyOrder({ "this", "profileName", "setPoint", "optDerivedFrom", "timestampEpochMillis" }) // improve readability (e.g. in the temporal UI) +@JsonIgnoreProperties(ignoreUnknown = true) /* necessary since no `setThis` setter (to act as inverse of `supplyJsonClassSimpleName`), so to avoid: + * com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field \"this\" (class ...dynamic.ScalingDirective), not marked as ignorable + */ public class ScalingDirective { - private final String profileName; - private final int setPoint; - private final long timestampEpochMillis; - private final Optional optDerivedFrom; + @NonNull private String profileName; + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" + @NonNull private int setPoint; + @NonNull private long timestampEpochMillis; + @NonNull private Optional optDerivedFrom; + + /** purely for observability: announce class to clarify serialized form */ + @JsonProperty("this") + public String supplyJsonClassSimpleName() { + return this.getClass().getSimpleName(); + } + /** Create a set-point-only directive (for a known profile, with no {@link ProfileDerivation}) */ public ScalingDirective(String profileName, int setPoint, long timestampEpochMillis) { @@ -44,7 +73,12 @@ public ScalingDirective(String profileName, int setPoint, long timestampEpochMil this(profileName, setPoint, timestampEpochMillis, Optional.of(new ProfileDerivation(basisProfileName, overlay))); } - /** @return the canonical display name (of {@link #getProfileName()}) for tracing/debugging */ + /** @return a new `ScalingDirective`, otherwise unchanged, but with {@link ScalingDirective#setPoint} replaced by `newSetPoint` */ + public ScalingDirective updateSetPoint(int newSetPoint) { + return new ScalingDirective(this.profileName, newSetPoint, this.timestampEpochMillis, this.optDerivedFrom); + } + + /** @return the canonical *display* name (for {@link #getProfileName()}) for tracing/debugging */ public String renderName() { return WorkforceProfiles.renderName(this.profileName); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java new file mode 100644 index 00000000000..0a361eed5ea --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.io.IOException; +import java.util.List; + + +/** An opaque sink for {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s - typically either to process or proxy them */ +public interface ScalingDirectivesRecipient { + /** @param directives the {@link ScalingDirective}s to receive */ + void receive(List directives) throws IOException; +} + + diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java index 003a05907e4..ac0f24bf816 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.workflows.metrics; import java.io.Closeable; +import java.time.Instant; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -55,4 +56,6 @@ public interface EventTimer extends Closeable { default void close() { stop(); } + + Instant getStartTime(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java index e1ac601986b..93beaadd033 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java @@ -20,9 +20,11 @@ import java.time.Duration; import java.time.Instant; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + import io.temporal.activity.ActivityOptions; import io.temporal.workflow.Workflow; -import lombok.RequiredArgsConstructor; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.GobblinEventBuilder; @@ -42,7 +44,7 @@ public class TemporalEventTimer implements EventTimer { private final SubmitGTEActivity trackingEventActivity; private final GobblinEventBuilder eventBuilder; private final EventSubmitterContext eventSubmitterContext; - private final Instant startTime; + @Getter private final Instant startTime; // Alias to stop() public void submit() { @@ -69,7 +71,11 @@ private void stop(Instant endTime) { trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext); } - private static Instant getCurrentTime() { + /** + * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link System#currentTimeMillis()} + * WARNING: DO NOT use from an {@link io.temporal.activity.Activity} + */ + public static Instant getCurrentTime() { return Instant.ofEpochMilli(Workflow.currentTimeMillis()); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java index 3022d5c9ecc..812667e3c82 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -72,7 +72,7 @@ public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster } @Override - protected void startUp() { + protected void startUp() throws IOException{ int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); log.info("Starting the {} with re-scaling interval of {} seconds", this.getClass().getSimpleName(), scheduleInterval); @@ -92,7 +92,7 @@ protected void shutDown() { /** * Create a {@link ScalingDirectiveSource} to use for getting scaling directives. */ - protected abstract ScalingDirectiveSource createScalingDirectiveSource(); + protected abstract ScalingDirectiveSource createScalingDirectiveSource() throws IOException; /** * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java index f6b65bbd2db..9c5a6430e05 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java @@ -17,34 +17,35 @@ package org.apache.gobblin.temporal.yarn; +import java.io.IOException; import java.util.Optional; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.fs.FileSystem; -import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; + /** * {@link FsScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}. */ public class FsSourceDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager { - // TODO: replace fetching of these configs using a new method similar to JobStateUtils::getWorkDirRoot - public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir"; - public final static String DYNAMIC_SCALING_ERRORS_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir"; - private final FileSystem fs; public FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { super(appMaster); - this.fs = appMaster.getFs(); } @Override - protected ScalingDirectiveSource createScalingDirectiveSource() { + protected ScalingDirectiveSource createScalingDirectiveSource() throws IOException { + JobState jobState = new JobState(ConfigUtils.configToProperties(this.config)); + FileSystem fs = JobStateUtils.openFileSystem(jobState); return new FsScalingDirectiveSource( - this.fs, - this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), - Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + fs, + JobStateUtils.getDynamicScalingPath(JobStateUtils.getWorkDirRoot(jobState)), + Optional.of(JobStateUtils.getDynamicScalingErrorsPath(JobStateUtils.getWorkDirRoot(jobState))) ); } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java index e953298c66f..4adc604b3dc 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java @@ -54,7 +54,7 @@ public void testFormulateConfigWithSuccessfulBasisResolution() throws ProfileDer public void testFormulateConfigUnknownBasis() { String basisProfileName = "foo"; try { - ProfileDerivation derivation = new ProfileDerivation(basisProfileName, null); + ProfileDerivation derivation = new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged()); derivation.formulateConfig(ignore -> Optional.empty()); Assert.fail("Expected instead: UnknownBasisException"); } catch (ProfileDerivation.UnknownBasisException ube) { @@ -65,14 +65,14 @@ public void testFormulateConfigUnknownBasis() { @Test public void testRenderNameNonBaseline() { String name = "testProfile"; - ProfileDerivation profileDerivation = new ProfileDerivation(name, null); + ProfileDerivation profileDerivation = new ProfileDerivation(name, ProfileOverlay.unchanged()); String renderedName = profileDerivation.renderName(); Assert.assertEquals(renderedName, name); } @Test public void testRenderNameBaseline() { - ProfileDerivation profileDerivation = new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null); + ProfileDerivation profileDerivation = new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, ProfileOverlay.unchanged()); String renderedName = profileDerivation.renderName(); Assert.assertEquals(renderedName, WorkforceProfiles.BASELINE_NAME_RENDERING); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index dd4243d3fa5..5a498168157 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -69,7 +69,7 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test - public void testWithDummyScalingDirectiveSource() throws InterruptedException { + public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list // so the total number of invocations after three invocations should always be 3 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager(