Skip to content

Commit

Permalink
[GOBBLIN-2175] Support partial commit with Gobblin-on-Temporal execut…
Browse files Browse the repository at this point in the history
…ion (#4078)

---------

Co-authored-by: Aditya Pratap Singh <[email protected]>
  • Loading branch information
pratapaditya04 and Aditya Pratap Singh authored Dec 13, 2024
1 parent f4728fb commit 0bf770c
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Properties;

import lombok.Getter;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;

Expand All @@ -33,7 +35,7 @@ public enum JobCommitPolicy {
/**
* Commit output data of a job if and only if all of its tasks successfully complete.
*/
COMMIT_ON_FULL_SUCCESS("full"),
COMMIT_ON_FULL_SUCCESS("full", false),

/**
* Commit a job even if some of its tasks fail. It's up to the {@link org.apache.gobblin.publisher.DataPublisher} to
Expand All @@ -43,20 +45,22 @@ public enum JobCommitPolicy {
* and should cover most use cases when {@link #COMMIT_ON_FULL_SUCCESS} is not appropriate.
*/
@Deprecated
COMMIT_ON_PARTIAL_SUCCESS("partial"),
COMMIT_ON_PARTIAL_SUCCESS("partial", true),

/**
* Commit output data of tasks that successfully complete.
*
* It is recommended to use this commit policy in conjunction with task-level data publishing (i.e., when
* {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to {@code false}).
*/
COMMIT_SUCCESSFUL_TASKS("successful");
COMMIT_SUCCESSFUL_TASKS("successful", true);

private final String name;
@Getter private final boolean allowPartialCommit;// Indicates if the commit policy allows partial task commits

JobCommitPolicy(String name) {
JobCommitPolicy(String name, boolean allowPartialCommit) {
this.name = name;
this.allowPartialCommit = allowPartialCommit;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;


@Slf4j
public class CommitActivityImpl implements CommitActivity {

Expand Down Expand Up @@ -97,12 +99,22 @@ public CommitStats commit(WUProcessingSpec workSpec) {
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
commitTaskStates(jobState, datasetStatesByUrns, jobContext);
Optional<FailedDatasetUrnsException> optFailure = Optional.empty();
try {
commitTaskStates(jobState, datasetStatesByUrns, jobContext);
} catch (FailedDatasetUrnsException exception) {
log.warn("Some datasets failed to be committed, proceeding with publishing commit step", exception);
optFailure = Optional.of(exception);
}

boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");

Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks);
return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
return new CommitStats(
datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
optFailure
);
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
Expand Down Expand Up @@ -164,8 +176,8 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry

if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
log.error("Failed to commit dataset state for dataset(s) {}" + allFailedDatasets);
throw new IOException("Failed to commit dataset state for " + allFailedDatasets);
log.error("Failed to commit dataset state for dataset(s) {}", allFailedDatasets);
throw new FailedDatasetUrnsException(failedDatasetUrns);
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
Expand Down Expand Up @@ -205,7 +217,7 @@ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState.
// Only process successful datasets unless configuration to process failed datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED
&& commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
&& commitPolicy.isAllowPartialCommit())) {
long totalBytesWritten = 0;
long totalRecordsWritten = 0;
int totalCommittedTasks = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;


/**
* Data structure representing the stats for a committed dataset, and the total number of committed workunits in the Gobblin Temporal job
Expand All @@ -37,8 +40,9 @@
public class CommitStats {
@NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;
@NonNull private Optional<FailedDatasetUrnsException> optFailure;

public static CommitStats createEmpty() {
return new CommitStats(new HashMap<>(), 0);
return new CommitStats(new HashMap<>(), 0, Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import java.time.Duration;
Expand Down Expand Up @@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.submit();

if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
}
if (commitGobblinStats.getOptFailure().isPresent()) {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed to commit dataset state for some dataset(s)"), commitGobblinStats.getOptFailure().get().getClass().toString(),
commitGobblinStats.getOptFailure().get());
}
return commitGobblinStats;
}

private List<DatasetTaskSummary> convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.typesafe.config.ConfigFactory;

import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());

NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes);
int workunitsProcessed =
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes);
CommitStats result = commitWorkflow.commit(workSpec);
if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have been committed at the task level.");

Optional<Integer> workunitsProcessed = Optional.empty();
try {
workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(),
Optional.empty()));
} catch (Exception e) {
log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e);

try {
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure
} catch (Exception commitException) {
// Combine current and commit exception messages for a more complete context
String combinedMessage = String.format(
"Processing failure: %s. Commit workflow failure: %s",
e.getMessage(),
commitException.getMessage()
);
log.error(combinedMessage);
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Processing failure: %s. Partial commit failure: %s", combinedMessage, commitException),
Exception.class.toString(),
new Exception(e)); // Wrap the original exception for stack trace preservation
}
return result;
} else {
throw e;// Re-throw after any partial commit, to fail the parent workflow in case commitWorkflow didn't flow (unlikely)
}
return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);
}

private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSpec,
Map<String, Object> searchAttributes, Optional<Integer> workunitsProcessed) {
// we are only inhibiting commit when workunitsProcessed is actually known to be zero
if (workunitsProcessed.filter(n -> n == 0).isPresent()) {
log.error("No work units processed, so no commit attempted.");
return CommitStats.createEmpty();
}
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes);
CommitStats result = commitWorkflow.commit(workSpec);
if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have been committed at the task level.");
}
return result;
}

private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.temporal.exception;


import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;


/**
* An exception thrown when a set of dataset URNs fail to be processed.
*/

@JsonIgnoreProperties(ignoreUnknown = true)
public class FailedDatasetUrnsException extends IOException {

@Getter
private final Set<String> failedDatasetUrns;

/**
* Creates a new instance of this exception with the failed dataset URNs.
*
* @param failedDatasetUrns a set containing the URNs of the datasets that failed to process
*/
public FailedDatasetUrnsException(Set<String> failedDatasetUrns) {
super("Failed to process the following dataset URNs: " + String.join(",", failedDatasetUrns));
this.failedDatasetUrns = failedDatasetUrns;
}

/**
* Default constructor for {@code FailedDatasetUrnsException}.
* <p>
* This constructor initializes an empty {@link HashSet} for {@code failedDatasetUrns}.
* It is provided to support frameworks like Jackson that require a no-argument constructor
* for deserialization purposes.
* </p>
* */
public FailedDatasetUrnsException() {
super();
this.failedDatasetUrns = new HashSet<>();
}
}

0 comments on commit 0bf770c

Please sign in to comment.