diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java index e4d34aee60f..e211906a60c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java @@ -43,6 +43,7 @@ import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask; import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask; +import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask; import org.apache.gobblin.util.ConfigUtils; @@ -167,6 +168,8 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); case KILL: return new KillDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + case RESUME: + return new ResumeDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); default: throw new UnsupportedOperationException(dagActionType + " not yet implemented"); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java index cf48b3d1cf7..0cc55954195 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java @@ -25,10 +25,12 @@ import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc; import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc; import org.apache.gobblin.service.modules.orchestration.proc.ReevaluateDagProc; +import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc; import org.apache.gobblin.service.modules.orchestration.task.DagTask; import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask; import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask; +import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask; import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; @@ -59,9 +61,15 @@ public ReevaluateDagProc meet(ReevaluateDagTask reEvaluateDagTask) { return new ReevaluateDagProc(reEvaluateDagTask); } + @Override public KillDagProc meet(KillDagTask killDagTask) { return new KillDagProc(killDagTask); } + + @Override + public ResumeDagProc meet(ResumeDagTask resumeDagTask) { + return new ResumeDagProc(resumeDagTask); + } //todo - overload meet method for other dag tasks } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java index bbba21037f5..9c36c8c1172 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java @@ -20,10 +20,12 @@ import org.apache.gobblin.service.modules.orchestration.task.KillDagTask; import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask; import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask; +import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask; public interface DagTaskVisitor { T meet(LaunchDagTask launchDagTask); T meet(ReevaluateDagTask reevaluateDagTask); T meet(KillDagTask killDagTask); + T meet(ResumeDagTask resumeDagTask); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java new file mode 100644 index 00000000000..00e8809ebb3 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; +import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.apache.gobblin.service.ExecutionStatus.CANCELLED; +import static org.apache.gobblin.service.ExecutionStatus.FAILED; +import static org.apache.gobblin.service.ExecutionStatus.PENDING_RESUME; + + +/** + * An implementation for {@link DagProc} that resumes a dag and submits the job that previously failed or was killed. + */ +@Slf4j +public class ResumeDagProc extends DagProc>> { + + public ResumeDagProc(ResumeDagTask resumeDagTask) { + super(resumeDagTask); + } + + @Override + protected Optional> initialize(DagManagementStateStore dagManagementStateStore) + throws IOException { + return dagManagementStateStore.getFailedDag(getDagId()); + } + + @Override + protected void act(DagManagementStateStore dagManagementStateStore, Optional> failedDag) + throws IOException { + log.info("Request to resume dag {}", getDagId()); + + if (!failedDag.isPresent()) { + // todo - add a metric here + log.error("Dag " + dagId + " was not found in dag state store"); + return; + } + + long flowResumeTime = System.currentTimeMillis(); + + // Set the flow and its failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed + DagManagerUtils.emitFlowEvent(eventSubmitter, failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME); + + for (Dag.DagNode node : failedDag.get().getNodes()) { + ExecutionStatus executionStatus = node.getValue().getExecutionStatus(); + if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) { + node.getValue().setExecutionStatus(PENDING_RESUME); + // reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job + node.getValue().setCurrentAttempts(0); + DagManagerUtils.incrementJobGeneration(node); + Map jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue()); + eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata); + } + // Set flowStartTime so that flow start deadline and flow completion deadline will be based on current time instead of original flow + node.getValue().setFlowStartTime(flowResumeTime); + } + + // these two statements effectively move the dag from failed dag store to (running) dag store. + // to prevent loss in the unlikely event of failure between the two, we add first. + dagManagementStateStore.checkpointDag(failedDag.get()); + // if it fails here, it will check point the failed dag in the (running) dag store again, which is idempotent + dagManagementStateStore.deleteFailedDag(failedDag.get()); + + resumeDag(dagManagementStateStore, failedDag.get()); + } + + private void resumeDag(DagManagementStateStore dagManagementStateStore, Dag dag) { + Set> nextNodes = DagManagerUtils.getNext(dag); + + if (nextNodes.size() > 1) { + handleMultipleJobs(nextNodes); + } + + //Submit jobs from the dag ready for execution. + for (Dag.DagNode dagNode : nextNodes) { + DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId()); + log.info("Submitted job {} for dagId {}", DagManagerUtils.getJobName(dagNode), getDagId()); + } + } + + private void handleMultipleJobs(Set> nextNodes) { + throw new UnsupportedOperationException("More than one start job is not allowed"); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java new file mode 100644 index 00000000000..b9ff4fb8abc --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.task; + +import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; +import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; + + +/** + * A {@link DagTask} responsible for resuming previously failed or killed dags. + */ +public class ResumeDagTask extends DagTask { + public ResumeDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, + DagActionStore dagActionStore) { + super(dagAction, leaseObtainedStatus, dagActionStore); + } + + public T host(DagTaskVisitor visitor) { + return visitor.meet(this); + } +} diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java index 6582f03f1a8..c2065b7d591 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java @@ -60,13 +60,14 @@ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean isSta try { // todo - add actions for other other type of dag actions switch (dagAction.getDagActionType()) { + case KILL : case LAUNCH : case REEVALUATE : - case KILL : + case RESUME: dagManagement.addDagAction(dagAction); break; default: - log.warn("Received unsupported dagAction {}. Expected to be a KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType()); + log.warn("Received unsupported dagAction {}. Expected to be a RESUME, KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType()); this.unexpectedErrors.mark(); } } catch (IOException e) { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java new file mode 100644 index 00000000000..993fba8c02f --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration.proc; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.mockito.Mockito; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManager; +import org.apache.gobblin.service.modules.orchestration.DagManagerTest; +import org.apache.gobblin.service.modules.orchestration.DagManagerUtils; +import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest; +import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore; +import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + + +public class ResumeDagProcTest { + private MostlyMySqlDagManagementStateStore dagManagementStateStore; + private ITestMetastoreDatabase testDb; + + @BeforeClass + public void setUp() throws Exception { + testDb = TestMetastoreDatabaseFactory.get(); + this.dagManagementStateStore = spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb)); + doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any()); + doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any()); + doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), any()); + } + + @AfterClass(alwaysRun = true) + public void tearDown() throws Exception { + if (testDb != null) { + testDb.close(); + } + } + + /* + This test creates a failed dag and launches a resume dag proc for it. It then verifies that the next jobs are set to run. + */ + @Test + public void resumeDag() + throws IOException, URISyntaxException, ExecutionException, InterruptedException { + long flowExecutionId = 12345L; + String flowGroup = "fg"; + String flowName = "fn"; + Dag dag = DagManagerTest.buildDag("1", flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), + 5, "user5", ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)) + .withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))); + // simulate a failed dag in store + dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); + dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED); + dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); + dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE); + doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any()); + + ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new DagActionStore.DagAction(flowGroup, flowName, + String.valueOf(flowExecutionId), MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME), + null, mock(DagActionStore.class))); + resumeDagProc.process(this.dagManagementStateStore); + + SpecProducer specProducer = DagManagerUtils.getSpecProducer(dag.getNodes().get(1)); + List> otherSpecProducers = dag.getNodes().stream().map(node -> { + try { + return DagManagerUtils.getSpecProducer(node); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }).filter(sp -> specProducer != sp).collect(Collectors.toList()); + int expectedNumOfResumedJobs = 1; // = number of resumed nodes + + Mockito.verify(specProducer, Mockito.times(expectedNumOfResumedJobs)).addSpec(any()); + Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any()); + otherSpecProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any())); + } +}