diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java index 9ee21395b25..fbd77d8617f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service.modules.orchestration; +import java.util.Collections; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -39,6 +41,7 @@ import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.DagTask; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExceptionUtils; import org.apache.gobblin.util.ExecutorsUtils; @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService { public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis"; @Getter static long defaultJobStartDeadlineTimeMillis; public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name(); + // TODO Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined + public static final List> retryableExceptions = Collections.EMPTY_LIST; @Inject public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory, @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) { defaultJobStartDeadlineTimeMillis = deadlineTimeMs; } + public static boolean isTransientException(Exception e) { + return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions); + } + @Override protected void startUp() { Integer numThreads = ConfigUtils.getInt @@ -151,6 +160,12 @@ public void run() { } catch (Exception e) { log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e); dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark(); + if (!DagProcessingEngine.isTransientException(e)) { + log.warn(dagProc.contextualizeStatus("ignoring non-transient exception by concluding so no retries")); + dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark(); + dagTask.conclude(); + } + // TODO add the else block for transient exceptions and add conclude task only if retry limit is not breached } } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java index 6c9694c80f7..c84d0dca8bc 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java @@ -18,8 +18,6 @@ package org.apache.gobblin.service.modules.orchestration.proc; import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; import com.typesafe.config.Config; @@ -33,15 +31,12 @@ import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.util.ExceptionUtils; -import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.DagNodeId; import org.apache.gobblin.service.modules.orchestration.DagActionStore; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagUtils; import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics; import org.apache.gobblin.service.modules.orchestration.task.DagTask; -import org.apache.gobblin.util.ConfigUtils; /** @@ -62,7 +57,6 @@ public abstract class DagProc { @Getter protected final Dag.DagId dagId; @Getter protected final DagNodeId dagNodeId; protected static final MetricContext metricContext = Instrumented.getMetricContext(new State(), DagProc.class); - protected final List> nonRetryableExceptions; protected static final EventSubmitter eventSubmitter = new EventSubmitter.Builder( metricContext, "org.apache.gobblin.service").build(); @@ -71,14 +65,6 @@ public DagProc(DagTask dagTask, Config config) { this.dagId = DagUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(), this.dagTask.getDagAction().getFlowName(), this.dagTask.getDagAction().getFlowExecutionId()); this.dagNodeId = this.dagTask.getDagAction().getDagNodeId(); - this.nonRetryableExceptions = ConfigUtils.getStringList(config, ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY) - .stream().map(className -> { - try { - return (Class) Class.forName(className); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); } public final void process(DagManagementStateStore dagManagementStateStore, @@ -92,20 +78,9 @@ public final void process(DagManagementStateStore dagManagementStateStore, dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false); throw e; } - try { logContextualizedInfo("ready to process"); act(dagManagementStateStore, state, dagProcEngineMetrics); logContextualizedInfo("processed"); - } catch (Exception e) { - if (isNonTransientException(e)) { - log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ", - getDagTask(), e); - dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark(); - dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark(); - } else { - throw e; - } - } } protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException; @@ -126,8 +101,4 @@ public String contextualizeStatus(String message) { public void logContextualizedInfo(String message) { log.info(contextualizeStatus(message)); } - - protected boolean isNonTransientException(Exception e) { - return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions); - } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 346037f6d2f..30bd89c98b5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -74,7 +74,6 @@ public class DagProcUtils { public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag dag, Dag.DagId dagId) throws IOException { Set> nextNodes = DagUtils.getNext(dag); - if (nextNodes.size() == 1) { Dag.DagNode dagNode = nextNodes.iterator().next(); DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId); @@ -139,12 +138,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { - TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; log.error(message, e); - jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); - if (jobFailedTimer != null) { - jobFailedTimer.stop(jobMetadata); + // Only mark the job as failed in case of non transient exceptions + if (!DagProcessingEngine.isTransientException(e)) { + TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); + jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); + if (jobFailedTimer != null) { + jobFailedTimer.stop(jobMetadata); + } } try { // when there is no exception, quota will be released in job status monitor or re-evaluate dag proc diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java index 8fea5303aeb..6e065fc1a71 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java @@ -190,15 +190,24 @@ public void dagProcessingTest() // (MAX_NUM_OF_TASKS + 1) th call int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS; int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / MockedDagTaskStream.FAILING_DAGS_FREQUENCY; - int expectedNonRetryableExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / MockedDagTaskStream.FAILING_DAGS_WITH_NON_RETRYABLE_EXCEPTIONS_FREQUENCY; AssertWithBackoff.assertTrue(input -> Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() == expectedNumOfInvocations, 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. " + "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(), log, 1, 1000L); - + // Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions); - Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions); + Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions); + } + + @Test + public void isNonTransientExceptionTest(){ + /* + These exceptions examples are solely for testing purpose, ultimately it would come down + to the config defined for the transient exceptions, when we implement retry logic + */ + Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!"))); + Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!"))); } private enum ExceptionType { diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java new file mode 100644 index 00000000000..7b128f120c9 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeMethod + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List jobExecutionPlans = getJobExecutionPlans(); + List> dagNodeList = jobExecutionPlans.stream() + .map(Dag.DagNode::new) + .collect(Collectors.toList()); + Dag dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) { + Mockito.verify(dagManagementStateStore, Mockito.times(1)) + .addJobDagAction(jobExecutionPlan.getFlowGroup(), jobExecutionPlan.getFlowName(), + jobExecutionPlan.getFlowExecutionId(), jobExecutionPlan.getJobName(), + DagActionStore.DagActionType.REEVALUATE); + } + Mockito.verifyNoMoreInteractions(dagManagementStateStore); + } + + @Test + public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680); + List> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0); + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag dag = new Dag<>(dagNodeList); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(2)).getDagManagerMetrics(); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).tryAcquireQuota(Collections.singleton(dagNode)); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).updateDagNode(dagNode); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).addDagAction(Mockito.any(DagActionStore.DagAction.class)); + + Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode); + Mockito.verify(metrics, Mockito.times(1)).incrementJobsSentToExecutor(dagNode); + Mockito.verifyNoMoreInteractions(dagManagementStateStore); + } + + @Test(expectedExceptions = RuntimeException.class) + public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{ + Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678); + List> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2); + Dag.DagNode dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag dag = new Dag<>(dagNodeList); + SpecProducer mockedSpecProducer = mockSpecExecutor.getProducer().get(); + Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class)); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(mockedSpecProducer, Mockito.times(1)).addSpec(Mockito.any(JobSpec.class)); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).getDagManagerMetrics(); + Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode); + Mockito.verifyNoMoreInteractions(dagManagementStateStore); + } + + private List getJobExecutionPlans() throws URISyntaxException { + Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build(); + Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build(); + Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build(); + List flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3); + + Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build(); + Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build(); + Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build(); + List jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3); + List jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Config jobConfig = jobConfigs.get(i); + FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build(); + if (i == 2) { + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, + jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef("testUri")), + mockSpecExecutor, 0L, ConfigFactory.empty())); + } else { + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, + jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, ConfigValueFactory.fromAnyRef("testUri")), + new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty())); + } + } + return jobExecutionPlans; + } +} \ No newline at end of file