Skip to content

Commit

Permalink
[GOBBLIN-2057]add resume dag proc (#3938)
Browse files Browse the repository at this point in the history
* add resume dag proc
* address review comments
  • Loading branch information
arjun4084346 authored May 10, 2024
1 parent c823e98 commit e40483d
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.util.ConfigUtils;


Expand Down Expand Up @@ -167,6 +168,8 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt
return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
case KILL:
return new KillDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
case RESUME:
return new ResumeDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
default:
throw new UnsupportedOperationException(dagActionType + " not yet implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.ReevaluateDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;


Expand Down Expand Up @@ -59,9 +61,15 @@ public ReevaluateDagProc meet(ReevaluateDagTask reEvaluateDagTask) {
return new ReevaluateDagProc(reEvaluateDagTask);
}

@Override
public KillDagProc meet(KillDagTask killDagTask) {
return new KillDagProc(killDagTask);
}

@Override
public ResumeDagProc meet(ResumeDagTask resumeDagTask) {
return new ResumeDagProc(resumeDagTask);
}
//todo - overload meet method for other dag tasks
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;


public interface DagTaskVisitor<T> {
T meet(LaunchDagTask launchDagTask);
T meet(ReevaluateDagTask reevaluateDagTask);
T meet(KillDagTask killDagTask);
T meet(ResumeDagTask resumeDagTask);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration.proc;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;

import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
import static org.apache.gobblin.service.ExecutionStatus.FAILED;
import static org.apache.gobblin.service.ExecutionStatus.PENDING_RESUME;


/**
* An implementation for {@link DagProc} that resumes a dag and submits the job that previously failed or was killed.
*/
@Slf4j
public class ResumeDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {

public ResumeDagProc(ResumeDagTask resumeDagTask) {
super(resumeDagTask);
}

@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
return dagManagementStateStore.getFailedDag(getDagId());
}

@Override
protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag<JobExecutionPlan>> failedDag)
throws IOException {
log.info("Request to resume dag {}", getDagId());

if (!failedDag.isPresent()) {
// todo - add a metric here
log.error("Dag " + dagId + " was not found in dag state store");
return;
}

long flowResumeTime = System.currentTimeMillis();

// Set the flow and its failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagManagerUtils.emitFlowEvent(eventSubmitter, failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);

for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
if (executionStatus.equals(FAILED) || executionStatus.equals(CANCELLED)) {
node.getValue().setExecutionStatus(PENDING_RESUME);
// reset currentAttempts because we do not want to count previous execution's attempts in deciding whether to retry a job
node.getValue().setCurrentAttempts(0);
DagManagerUtils.incrementJobGeneration(node);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
// Set flowStartTime so that flow start deadline and flow completion deadline will be based on current time instead of original flow
node.getValue().setFlowStartTime(flowResumeTime);
}

// these two statements effectively move the dag from failed dag store to (running) dag store.
// to prevent loss in the unlikely event of failure between the two, we add first.
dagManagementStateStore.checkpointDag(failedDag.get());
// if it fails here, it will check point the failed dag in the (running) dag store again, which is idempotent
dagManagementStateStore.deleteFailedDag(failedDag.get());

resumeDag(dagManagementStateStore, failedDag.get());
}

private void resumeDag(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag) {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);

if (nextNodes.size() > 1) {
handleMultipleJobs(nextNodes);
}

//Submit jobs from the dag ready for execution.
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, getDagId());
log.info("Submitted job {} for dagId {}", DagManagerUtils.getJobName(dagNode), getDagId());
}
}

private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> nextNodes) {
throw new UnsupportedOperationException("More than one start job is not allowed");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration.task;

import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;


/**
* A {@link DagTask} responsible for resuming previously failed or killed dags.
*/
public class ResumeDagTask extends DagTask {
public ResumeDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
DagActionStore dagActionStore) {
super(dagAction, leaseObtainedStatus, dagActionStore);
}

public <T> T host(DagTaskVisitor<T> visitor) {
return visitor.meet(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean isSta
try {
// todo - add actions for other other type of dag actions
switch (dagAction.getDagActionType()) {
case KILL :
case LAUNCH :
case REEVALUATE :
case KILL :
case RESUME:
dagManagement.addDagAction(dagAction);
break;
default:
log.warn("Received unsupported dagAction {}. Expected to be a KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType());
log.warn("Received unsupported dagAction {}. Expected to be a RESUME, KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType());
this.unexpectedErrors.mark();
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration.proc;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;


public class ResumeDagProcTest {
private MostlyMySqlDagManagementStateStore dagManagementStateStore;
private ITestMetastoreDatabase testDb;

@BeforeClass
public void setUp() throws Exception {
testDb = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore = spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testDb));
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
doNothing().when(this.dagManagementStateStore).addDagNodeState(any(), any());
}

@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (testDb != null) {
testDb.close();
}
}

/*
This test creates a failed dag and launches a resume dag proc for it. It then verifies that the next jobs are set to run.
*/
@Test
public void resumeDag()
throws IOException, URISyntaxException, ExecutionException, InterruptedException {
long flowExecutionId = 12345L;
String flowGroup = "fg";
String flowName = "fn";
Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
5, "user5", ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)));
// simulate a failed dag in store
dag.getNodes().get(0).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());

ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new DagActionStore.DagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME),
null, mock(DagActionStore.class)));
resumeDagProc.process(this.dagManagementStateStore);

SpecProducer<Spec> specProducer = DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
List<SpecProducer<Spec>> otherSpecProducers = dag.getNodes().stream().map(node -> {
try {
return DagManagerUtils.getSpecProducer(node);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}).filter(sp -> specProducer != sp).collect(Collectors.toList());
int expectedNumOfResumedJobs = 1; // = number of resumed nodes

Mockito.verify(specProducer, Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
Mockito.verify(this.dagManagementStateStore, Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
otherSpecProducers.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
}
}

0 comments on commit e40483d

Please sign in to comment.