Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pratapaditya04 committed Dec 11, 2024
2 parents 90d2af9 + 88a9986 commit dff7981
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Properties;

import lombok.Getter;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;

Expand All @@ -33,7 +35,7 @@ public enum JobCommitPolicy {
/**
* Commit output data of a job if and only if all of its tasks successfully complete.
*/
COMMIT_ON_FULL_SUCCESS("full"),
COMMIT_ON_FULL_SUCCESS("full", false),

/**
* Commit a job even if some of its tasks fail. It's up to the {@link org.apache.gobblin.publisher.DataPublisher} to
Expand All @@ -43,20 +45,22 @@ public enum JobCommitPolicy {
* and should cover most use cases when {@link #COMMIT_ON_FULL_SUCCESS} is not appropriate.
*/
@Deprecated
COMMIT_ON_PARTIAL_SUCCESS("partial"),
COMMIT_ON_PARTIAL_SUCCESS("partial", true),

/**
* Commit output data of tasks that successfully complete.
*
* It is recommended to use this commit policy in conjunction with task-level data publishing (i.e., when
* {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to {@code false}).
*/
COMMIT_SUCCESSFUL_TASKS("successful");
COMMIT_SUCCESSFUL_TASKS("successful", true);

private final String name;
@Getter private final boolean allowPartialCommit;// Indicates if the commit policy allows partial task commits

JobCommitPolicy(String name) {
JobCommitPolicy(String name, boolean allowPartialCommit) {
this.name = name;
this.allowPartialCommit = allowPartialCommit;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;


@Slf4j
Expand Down Expand Up @@ -99,20 +99,22 @@ public CommitStats commit(WUProcessingSpec workSpec) {
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
CommitStats commitStats = CommitStats.createEmpty();
Optional<FailedDatasetUrnsException> optFailure = Optional.empty();
try {
commitTaskStates(jobState, datasetStatesByUrns, jobContext);
} catch (FailedDatasetUrnsException exception) {
log.info("Some datasets failed to be committed, proceeding with publishing commit step");
commitStats.setOptFailure(Optional.of(exception));
log.warn("Some datasets failed to be committed, proceeding with publishing commit step", exception);
optFailure = Optional.of(exception);
}

boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");

Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks);
return commitStats.setDatasetStats(datasetTaskSummaries)
.setNumCommittedWorkUnits(
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
return new CommitStats(
datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
optFailure
);
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
Expand Down Expand Up @@ -175,7 +177,7 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry
if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
log.error("Failed to commit dataset state for dataset(s) {}", allFailedDatasets);
throw new FailedDatasetUrnsException(allFailedDatasets);
throw new FailedDatasetUrnsException(failedDatasetUrns);
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
Expand Down Expand Up @@ -215,7 +217,7 @@ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState.
// Only process successful datasets unless configuration to process failed datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED
&& (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {
&& commitPolicy.isAllowPartialCommit())) {
long totalBytesWritten = 0;
long totalRecordsWritten = 0;
int totalCommittedTasks = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;

import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;


/**
Expand All @@ -36,11 +37,10 @@
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
@Accessors(chain = true)
public class CommitStats {
@NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;
@NonNull private Optional<Exception> optFailure;
@NonNull private Optional<FailedDatasetUrnsException> optFailure;

public static CommitStats createEmpty() {
return new CommitStats(new HashMap<>(), 0, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;


Expand All @@ -62,21 +61,20 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);

if(!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.submit();
.submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
}
if(commitGobblinStats.getOptFailure().isPresent()){
if (commitGobblinStats.getOptFailure().isPresent()) {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed to commit dataset state for some dataset(s)"), FailedDatasetUrnsException.class.toString(),
String.format("Failed to commit dataset state for some dataset(s)"), commitGobblinStats.getOptFailure().get().getClass().toString(),
commitGobblinStats.getOptFailure().get());
}
return commitGobblinStats;
}

private List<DatasetTaskSummary> convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import com.typesafe.config.ConfigFactory;

import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +38,7 @@
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
Expand Down Expand Up @@ -79,9 +82,24 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(),
Optional.empty()));
} catch (Exception e) {
log.error("ProcessWorkUnits failure - will attempt partial commit before announcing error", e);
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);
throw e; //We want to proceed with partial commit and throw exception so that the parent workflow ExecuteGobblinWorkflowImpl can throw the failure event
log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e);

try {
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure
} catch (Exception commitException) {
// Combine current and commit exception messages for a more complete context
String combinedMessage = String.format(
"Processing failure: %s. Commit workflow failure: %s",
e.getMessage(),
commitException.getMessage()
);
log.error(combinedMessage);
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Processing failure: %s. Partial commit failure: %s", combinedMessage, commitException),
Exception.class.toString(),
new Exception(e)); // Wrap the original exception for stack trace preservation
}
throw e;// Re-throw after any partial commit, to fail the parent workflow in case commitWorkflow didn't flow (unlikely)
}
return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,43 @@
*/
package org.apache.gobblin.temporal.exception;


import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import lombok.Getter;


/**
* An exception thrown when a set of dataset URNs fail to be processed.
*/

public class FailedDatasetUrnsException extends IOException {

@Getter
private final Set<String> failedDatasetUrns;

/**
* Creates a new instance of this exception with the failed dataset URNs.
*
* @param failedDatasetUrns the String of failed dataset URNs joined by comma
* @param failedDatasetUrns a set containing the URNs of the datasets that failed to process
*/
public FailedDatasetUrnsException(String failedDatasetUrns) {
super("Failed to process the following dataset URNs: " + failedDatasetUrns);
public FailedDatasetUrnsException(Set<String> failedDatasetUrns) {
super("Failed to process the following dataset URNs: " + String.join(",", failedDatasetUrns));
this.failedDatasetUrns = failedDatasetUrns;
}

/**
* Default constructor for {@code FailedDatasetUrnsException}.
* <p>
* This constructor initializes an empty {@link HashSet} for {@code failedDatasetUrns}.
* It is provided to support frameworks like Jackson that require a no-argument constructor
* for deserialization purposes.
* </p>
* */
public FailedDatasetUrnsException() {
super();
this.failedDatasetUrns = new HashSet<>();
}
}

0 comments on commit dff7981

Please sign in to comment.