From 0bf770c2cde24c98c30ee6c9af18f58a9589f585 Mon Sep 17 00:00:00 2001 From: pratapaditya04 Date: Fri, 13 Dec 2024 12:13:14 +0530 Subject: [PATCH] [GOBBLIN-2175] Support partial commit with Gobblin-on-Temporal execution (#4078) --------- Co-authored-by: Aditya Pratap Singh --- .../source/extractor/JobCommitPolicy.java | 12 ++-- .../ddm/activity/impl/CommitActivityImpl.java | 22 +++++-- .../temporal/ddm/work/CommitStats.java | 6 +- .../workflow/impl/CommitStepWorkflowImpl.java | 19 ++++-- .../impl/ProcessWorkUnitsWorkflowImpl.java | 49 +++++++++++---- .../exception/FailedDatasetUrnsException.java | 60 +++++++++++++++++++ 6 files changed, 143 insertions(+), 25 deletions(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java index f60acb60309..24a5323d27e 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/JobCommitPolicy.java @@ -19,6 +19,8 @@ import java.util.Properties; +import lombok.Getter; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -33,7 +35,7 @@ public enum JobCommitPolicy { /** * Commit output data of a job if and only if all of its tasks successfully complete. */ - COMMIT_ON_FULL_SUCCESS("full"), + COMMIT_ON_FULL_SUCCESS("full", false), /** * Commit a job even if some of its tasks fail. It's up to the {@link org.apache.gobblin.publisher.DataPublisher} to @@ -43,7 +45,7 @@ public enum JobCommitPolicy { * and should cover most use cases when {@link #COMMIT_ON_FULL_SUCCESS} is not appropriate. */ @Deprecated - COMMIT_ON_PARTIAL_SUCCESS("partial"), + COMMIT_ON_PARTIAL_SUCCESS("partial", true), /** * Commit output data of tasks that successfully complete. @@ -51,12 +53,14 @@ public enum JobCommitPolicy { * It is recommended to use this commit policy in conjunction with task-level data publishing (i.e., when * {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to {@code false}). */ - COMMIT_SUCCESSFUL_TASKS("successful"); + COMMIT_SUCCESSFUL_TASKS("successful", true); private final String name; + @Getter private final boolean allowPartialCommit;// Indicates if the commit policy allows partial task commits - JobCommitPolicy(String name) { + JobCommitPolicy(String name, boolean allowPartialCommit) { this.name = name; + this.allowPartialCommit = allowPartialCommit; } /** diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 97699f2ce23..ae85a6a083c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -63,11 +63,13 @@ import org.apache.gobblin.temporal.ddm.work.DatasetStats; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException; import org.apache.gobblin.util.Either; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.executors.IteratorExecutor; + @Slf4j public class CommitActivityImpl implements CommitActivity { @@ -97,12 +99,22 @@ public CommitStats commit(WUProcessingSpec workSpec) { Map datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList()); TaskState firstTaskState = taskStates.get(0); log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true)); - commitTaskStates(jobState, datasetStatesByUrns, jobContext); + Optional optFailure = Optional.empty(); + try { + commitTaskStates(jobState, datasetStatesByUrns, jobContext); + } catch (FailedDatasetUrnsException exception) { + log.warn("Some datasets failed to be committed, proceeding with publishing commit step", exception); + optFailure = Optional.of(exception); + } boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"); Map datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks); - return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum()); + return new CommitStats( + datasetTaskSummaries, + datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(), + optFailure + ); } catch (Exception e) { //TODO: IMPROVE GRANULARITY OF RETRIES throw ApplicationFailure.newNonRetryableFailureWithCause( @@ -164,8 +176,8 @@ public Callable apply(final Map.Entry entry if (!failedDatasetUrns.isEmpty()) { String allFailedDatasets = String.join(", ", failedDatasetUrns); - log.error("Failed to commit dataset state for dataset(s) {}" + allFailedDatasets); - throw new IOException("Failed to commit dataset state for " + allFailedDatasets); + log.error("Failed to commit dataset state for dataset(s) {}", allFailedDatasets); + throw new FailedDatasetUrnsException(failedDatasetUrns); } if (!IteratorExecutor.verifyAllSuccessful(result)) { // TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception @@ -205,7 +217,7 @@ private Map summarizeDatasetOutcomes(Map datasetStats; @NonNull private int numCommittedWorkUnits; + @NonNull private Optional optFailure; public static CommitStats createEmpty() { - return new CommitStats(new HashMap<>(), 0); + return new CommitStats(new HashMap<>(), 0, Optional.empty()); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java index d568ea4e5b3..c2c21d28252 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -19,6 +19,7 @@ import io.temporal.activity.ActivityOptions; import io.temporal.common.RetryOptions; +import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.Workflow; import java.time.Duration; @@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public CommitStats commit(WUProcessingSpec workSpec) { CommitStats commitGobblinStats = activityStub.commit(workSpec); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); - timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) - .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) - .submit(); + + if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) + .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson( + convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) + .submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`) + } + if (commitGobblinStats.getOptFailure().isPresent()) { + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Failed to commit dataset state for some dataset(s)"), commitGobblinStats.getOptFailure().get().getClass().toString(), + commitGobblinStats.getOptFailure().get()); + } return commitGobblinStats; } - private List convertDatasetStatsToTaskSummaries(Map datasetStats) { List datasetTaskSummaries = new ArrayList<>(); for (Map.Entry entry : datasetStats.entrySet()) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 6402e473bf7..97c1c0a767e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -22,6 +22,7 @@ import com.typesafe.config.ConfigFactory; import io.temporal.api.enums.v1.ParentClosePolicy; +import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; import lombok.extern.slf4j.Slf4j; @@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) { searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); NestingExecWorkflow processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); - int workunitsProcessed = - processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), - workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + + Optional workunitsProcessed = Optional.empty(); + try { + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), + Optional.empty())); + } catch (Exception e) { + log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); + + try { + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure + } catch (Exception commitException) { + // Combine current and commit exception messages for a more complete context + String combinedMessage = String.format( + "Processing failure: %s. Commit workflow failure: %s", + e.getMessage(), + commitException.getMessage() + ); + log.error(combinedMessage); + throw ApplicationFailure.newNonRetryableFailureWithCause( + String.format("Processing failure: %s. Partial commit failure: %s", combinedMessage, commitException), + Exception.class.toString(), + new Exception(e)); // Wrap the original exception for stack trace preservation } - return result; - } else { + throw e;// Re-throw after any partial commit, to fail the parent workflow in case commitWorkflow didn't flow (unlikely) + } + return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed); + } + + private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSpec, + Map searchAttributes, Optional workunitsProcessed) { + // we are only inhibiting commit when workunitsProcessed is actually known to be zero + if (workunitsProcessed.filter(n -> n == 0).isPresent()) { log.error("No work units processed, so no commit attempted."); return CommitStats.createEmpty(); } + CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); + CommitStats result = commitWorkflow.commit(workSpec); + if (result.getNumCommittedWorkUnits() == 0) { + log.warn("No work units committed at the job level. They could have been committed at the task level."); + } + return result; } private Optional createOptJobEventTimer(WUProcessingSpec workSpec) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java new file mode 100644 index 00000000000..ea8e1def41d --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.temporal.exception; + + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Getter; + + +/** + * An exception thrown when a set of dataset URNs fail to be processed. + */ + +@JsonIgnoreProperties(ignoreUnknown = true) +public class FailedDatasetUrnsException extends IOException { + + @Getter + private final Set failedDatasetUrns; + + /** + * Creates a new instance of this exception with the failed dataset URNs. + * + * @param failedDatasetUrns a set containing the URNs of the datasets that failed to process + */ + public FailedDatasetUrnsException(Set failedDatasetUrns) { + super("Failed to process the following dataset URNs: " + String.join(",", failedDatasetUrns)); + this.failedDatasetUrns = failedDatasetUrns; + } + + /** + * Default constructor for {@code FailedDatasetUrnsException}. + *

+ * This constructor initializes an empty {@link HashSet} for {@code failedDatasetUrns}. + * It is provided to support frameworks like Jackson that require a no-argument constructor + * for deserialization purposes. + *

+ * */ + public FailedDatasetUrnsException() { + super(); + this.failedDatasetUrns = new HashSet<>(); + } +}