Skip to content

Commit

Permalink
Enhance GoT observability with WorkUnitsSizeSummary and `WorkUnitSi…
Browse files Browse the repository at this point in the history
…zeInfo`
  • Loading branch information
phet committed Dec 10, 2024
1 parent 7f2a447 commit 3495e2f
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 44 deletions.
1 change: 1 addition & 0 deletions gobblin-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies {
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}
compile externalDependency.tdigest
compile externalDependency."temporal-sdk"
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
/** Activity for generating {@link WorkUnit}s and persisting them to the {@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
@ActivityInterface
public interface GenerateWorkUnits {

public static final String NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = GenerateWorkUnits.class.getName() + ".numWorkUnitsSizeInfoQuantiles";
public static final int DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = 10;


/** @return the number of {@link WorkUnit}s generated and persisted */
@ActivityMethod
GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,31 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.google.api.client.util.Lists;
import com.google.common.io.Closer;

import io.temporal.failure.ApplicationFailure;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import com.google.api.client.util.Lists;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
Expand All @@ -50,6 +57,7 @@
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
Expand All @@ -58,6 +66,39 @@
@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {

/** [Internal, implementation class] Size sketch/digest of a collection of {@link MultiWorkUnit}s */
@Data
@VisibleForTesting
protected static class WorkUnitsSizeDigest {
private final long totalSize;
/** a top-level work unit has no parent - a root */
private final TDigest topLevelWorkUnitsSizeDigest;
/** a constituent work unit has no children - a leaf */
private final TDigest constituentWorkUnitsSizeDigest;

public WorkUnitsSizeSummary asSizeSummary(int numQuantiles) {
Preconditions.checkArgument(numQuantiles > 0, "numQuantiles must be > 0");
final double quantilesWidth = 1.0 / numQuantiles;

List<Double> topLevelQuantileValues = getQuantiles(topLevelWorkUnitsSizeDigest, numQuantiles);
List<Double> constituentQuantileValues = getQuantiles(constituentWorkUnitsSizeDigest, numQuantiles);
return new WorkUnitsSizeSummary(
totalSize,
topLevelWorkUnitsSizeDigest.size(), constituentWorkUnitsSizeDigest.size(),
numQuantiles, quantilesWidth,
topLevelQuantileValues, constituentQuantileValues);
}

private static List<Double> getQuantiles(TDigest digest, int numQuantiles) {
List<Double> quantileMinSizes = Lists.newArrayList();
for (int i = 1; i <= numQuantiles; i++) {
quantileMinSizes.add(digest.quantile((i * 1.0) / numQuantiles));
}
return quantileMinSizes;
}
}


@Override
public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
// TODO: decide whether to acquire a job lock (as MR did)!
Expand All @@ -80,12 +121,18 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
FileSystem fs = JobStateUtils.openFileSystem(jobState);
fs.mkdirs(workDirRoot);

Set<String> resourcesToCleanUp = new HashSet<>();
List<WorkUnit> workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, resourcesToCleanUp);
Set<String> pathsToCleanUp = new HashSet<>();
List<WorkUnit> workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, pathsToCleanUp);

int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
log.info("Discovered WorkUnits: {}", wuSizeSummary);

JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete

return new GenerateWorkUnitsResult(jobState.getTaskCount(), resourcesToCleanUp);
String sourceClassName = JobStateUtils.getSourceClassName(jobState);
return new GenerateWorkUnitsResult(jobState.getTaskCount(), sourceClassName, wuSizeSummary, pathsToCleanUp);
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits for job " + jobState.getJobId();
log.error(errMsg, roe);
Expand All @@ -101,7 +148,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
}

protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer,
Set<String> resourcesToCleanUp)
Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
Expand All @@ -127,7 +174,7 @@ protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream = datasetHandlerService.executeHandlers(workUnitStream);
resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
// initialize writer and converter(s)
// TODO: determine whether registration here is effective, or the lifecycle of this activity is too brief (as is likely!)
closer.register(WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream)).initialize();
Expand All @@ -151,24 +198,24 @@ protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS
}

protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream workUnitStream) {
Set<String> resourcesToCleanUp = new HashSet<>();
Set<String> workDirPaths = new HashSet<>();
// Validate every workunit if they have the temp dir props since some workunits may be commit steps
Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
while (workUnitIterator.hasNext()) {
WorkUnit workUnit = workUnitIterator.next();
if (workUnit.isMultiWorkUnit()) {
List<WorkUnit> workUnitList = ((MultiWorkUnit) workUnit).getWorkUnits();
for (WorkUnit wu : workUnitList) {
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
// WARNING/TODO: NOT resilient to nested multi-workunits... should it be?
workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
}
} else {
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
}
}
return resourcesToCleanUp;
return workDirPaths;
}


private static Set<String> collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) {
Set<String> resourcesToCleanUp = new HashSet<>();
if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
Expand All @@ -183,4 +230,41 @@ private static Set<String> collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit
}
return resourcesToCleanUp;
}

/** @return the {@link WorkUnitsSizeDigest} for `workUnits` */
protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit> workUnits) {
AtomicLong totalSize = new AtomicLong(0L);
TDigest topLevelWorkUnitsDigest = TDigest.createDigest(100);
TDigest constituentWorkUnitsDigest = TDigest.createDigest(100);

Iterator<WorkUnit> workUnitIterator = workUnits.iterator();
while (workUnitIterator.hasNext()) {
WorkUnit workUnit = workUnitIterator.next();
if (workUnit.isMultiWorkUnit()) {
List<WorkUnit> subWorkUnitsList = ((MultiWorkUnit) workUnit).getWorkUnits();
AtomicLong mwuAggSize = new AtomicLong(0L);
// WARNING/TODO: NOT resilient to nested multi-workunits... should it be?
subWorkUnitsList.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).forEach(wuSize -> {
constituentWorkUnitsDigest.add(wuSize);
mwuAggSize.addAndGet(wuSize);
});
totalSize.addAndGet(mwuAggSize.get());
topLevelWorkUnitsDigest.add(mwuAggSize.get());
} else {
long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0);
totalSize.addAndGet(wuSize);
constituentWorkUnitsDigest.add(wuSize);
topLevelWorkUnitsDigest.add(wuSize);
}
}

// TODO - decide whether helpful/necessary to `.compress()`
topLevelWorkUnitsDigest.compress();
constituentWorkUnitsDigest.compress();
return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest);
}

public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
Expand Down Expand Up @@ -76,9 +77,14 @@ public static FileSystem openFileSystem(JobState jobState) throws IOException {
return Help.loadFileSystemForUriForce(getFileSystemUri(jobState), jobState);
}

/** @return a new instance of {@link Source} identified by {@link ConfigurationKeys#SOURCE_CLASS_KEY} */
/** @return the FQ class name, presumed configured as {@link ConfigurationKeys#SOURCE_CLASS_KEY} */
public static String getSourceClassName(JobState jobState) {
return jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY);
}

/** @return a new instance of {@link Source}, identified by {@link ConfigurationKeys#SOURCE_CLASS_KEY} */
public static Source<?, ?> createSource(JobState jobState) throws ReflectiveOperationException {
Class<?> sourceClass = Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY));
Class<?> sourceClass = Class.forName(getSourceClassName(jobState));
log.info("Creating source: '{}'", sourceClass.getName());
Source<?, ?> source = new SourceDecorator<>(
Source.class.cast(sourceClass.newInstance()),
Expand Down Expand Up @@ -145,7 +151,10 @@ public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
return fs.makeQualified(jobOutputPath);
}

/** write serialized {@link WorkUnit}s in parallel into files named after the jobID and task IDs */
/**
* write serialized {@link WorkUnit}s in parallel into files named to tunnel {@link org.apache.gobblin.util.WorkUnitSizeInfo}.
* Size info is to be later recovered by {@link EagerFsDirBackedWorkUnitClaimCheckWorkload}
*/
public static void writeWorkUnits(List<WorkUnit> workUnits, Path workDirRootPath, JobState jobState, FileSystem fs)
throws IOException {
String jobId = jobState.getJobId();
Expand All @@ -159,7 +168,8 @@ public static void writeWorkUnits(List<WorkUnit> workUnits, Path workDirRootPath
JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new JobLauncherUtils.WorkUnitPathCalculator();
int i = 0;
for (WorkUnit workUnit : workUnits) {
Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId, targetDirPath);
// tunnel each WU's size info via its filename, for `EagerFsDirBackedWorkUnitClaimCheckWorkload#extractTunneledWorkUnitSizeInfo`
Path workUnitFile = pathCalculator.calcNextPathWithTunneledSizeInfo(workUnit, jobId, targetDirPath);
if (i++ == 0) {
log.info("Writing work unit file [first of {}]: '{}'", workUnits.size(), workUnitFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@

import java.net.URI;
import java.util.Comparator;
import java.util.Optional;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.annotation.JsonIgnore;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.WorkUnitSizeInfo;


/**
Expand All @@ -33,6 +40,7 @@
*/
@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@lombok.ToString(callSuper = true)
@Slf4j
public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
private EventSubmitterContext eventSubmitterContext;

Expand All @@ -43,8 +51,9 @@ public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String hdfs

@Override
protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
// begin by setting all correlators to empty
return new WorkUnitClaimCheck("", this.getFileSystemUri(), fileStatus.getPath().toString(), this.eventSubmitterContext);
// begin by setting all correlators to empty string - later we'll `acknowledgeOrdering()`
Path filePath = fileStatus.getPath();
return new WorkUnitClaimCheck("", this.getFileSystemUri(), filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), this.eventSubmitterContext);
}

@Override
Expand All @@ -58,4 +67,20 @@ protected void acknowledgeOrdering(int index, WorkUnitClaimCheck item) {
// later, after the post-total-ordering indices are know, use each item's index as its correlator
item.setCorrelator(Integer.toString(index));
}

/**
* @return the {@link WorkUnitSizeInfo}, when encoded in the filename; otherwise {@link WorkUnitSizeInfo#empty()} when no size info about {@link WorkUnit}
* @see org.apache.gobblin.util.JobLauncherUtils.WorkUnitPathCalculator#calcNextPathWithTunneledSizeInfo(WorkUnit, String, Path)
*/
protected static WorkUnitSizeInfo extractTunneledWorkUnitSizeInfo(Path filePath) {
String fileName = filePath.getName();
Optional<WorkUnitSizeInfo> optSizeInfo = Optional.empty();
try {
String maybeEncodedSizeInfo = Id.parse(fileName.substring(0, fileName.lastIndexOf('.'))).getName(); // strip extension
optSizeInfo = WorkUnitSizeInfo.decode(maybeEncodedSizeInfo);
} catch (Exception e) { // log, but swallow any `Id.parse` error
log.warn("Filename NOT `Id.parse`able: '" + filePath + "' - " + e.getMessage());
}
return optSizeInfo.orElse(WorkUnitSizeInfo.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class GenerateWorkUnitsResult {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private int generatedWuCount;
@NonNull private String sourceClass;
@NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
// Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers
@NonNull private Set<String> workDirPathsToDelete;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@

import org.apache.hadoop.fs.Path;

import com.fasterxml.jackson.annotation.JsonIgnore;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import com.fasterxml.jackson.annotation.JsonIgnore;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.WorkUnitSizeInfo;


/**
Expand All @@ -47,6 +48,7 @@ public class WorkUnitClaimCheck implements FileSystemApt, FileSystemJobStateful
@NonNull private String correlator;
@NonNull private URI fileSystemUri;
@NonNull private String workUnitPath;
@NonNull private WorkUnitSizeInfo workUnitSizeInfo;
@NonNull private EventSubmitterContext eventSubmitterContext;

@JsonIgnore // (because no-arg method resembles 'java bean property')
Expand Down
Loading

0 comments on commit 3495e2f

Please sign in to comment.