Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2181] ]Handling for Non transient exceptions #4084

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.service.modules.orchestration;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +41,7 @@
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.util.ExecutorsUtils;


Expand Down Expand Up @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartDeadlineTimeMillis;
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
// Todo Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the convention is to use TODO: , please update from Todo to TODO: for consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

public static List<Class<? extends Exception>> retryableExceptions = Collections.EMPTY_LIST;

@Inject
public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory,
Expand All @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) {
defaultJobStartDeadlineTimeMillis = deadlineTimeMs;
}

public static boolean isTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions);
}

@Override
protected void startUp() {
Integer numThreads = ConfigUtils.getInt
Expand Down Expand Up @@ -149,6 +158,13 @@ public void run() {
dagTask.conclude();
log.info(dagProc.contextualizeStatus("concluded dagTask"));
} catch (Exception e) {
if(!DagProcessingEngine.isTransientException(e)){
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't see toString() for dagTask, what does dagTask log? It would be useful to have dagId & dagAction in the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out, I had moved the existing log as it is, updated it now.

dagTask, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the exception is included as part of the message string instead of passed as a separate parameter, this would log only the exception's toString(), without the full stack trace.

Please update the log statement to pass the exception as a separate parameter to log the stack trace

log.error("Ignoring non-transient exception. DagTask {} will conclude and will not be retried.", dagTask, e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagTask.conclude();
}
// Todo add the else block for transient exceptions and add conclude task only if retry limit is not breached
log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
Expand Down
phet marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
Expand Down Expand Up @@ -92,20 +91,9 @@ public final void process(DagManagementStateStore dagManagementStateStore,
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
try {
logContextualizedInfo("ready to process");
act(dagManagementStateStore, state, dagProcEngineMetrics);
logContextualizedInfo("processed");
} catch (Exception e) {
if (isNonTransientException(e)) {
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
getDagTask(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
} else {
throw e;
}
}
}

protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException;
Expand All @@ -126,8 +114,4 @@ public String contextualizeStatus(String message) {
public void logContextualizedInfo(String message) {
log.info(contextualizeStatus(message));
}

protected boolean isNonTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class DagProcUtils {
public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);

if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId);
Expand Down Expand Up @@ -139,12 +138,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
// Only mark the job as failed in case of non transient exceptions
if(!DagProcessingEngine.isTransientException(e)){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing spaces
if (!DagProcessingEngine.isTransientException(e)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we atleast log the message? Even if it is a transient exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now all exceptions would be considered non transient, still updated as per suggestion

jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
}
try {
// when there is no exception, quota will be released in job status monitor or re-evaluate dag proc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,15 @@ public void dagProcessingTest()
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. "
+ "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);

// Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions);
phet marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void isNonTransientExceptionTest(){
Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!")));
Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use clarifying comment that there's nothing intrinsically transient or not about these, and in fact it just comes down to config, which is not presently provided.

even better however, would be to actually pass some test config and then demonstrate that the impl accordingly judges some as transient and others not. BTW, in authoring such a test you may find that it's challenging to have isTransientException as a static method, and yet be configuration-driven

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added clarifying statement, probably test config based tests can be added when we actually start maintaining config and we also add tests for transient exceptions handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and make isTransientException non-static to accept config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure makes sense

}

private enum ExceptionType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.apache.gobblin.service.modules.orchestration.proc;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@RunWith(PowerMockRunner.class)
@PrepareForTest(EventSubmitter.class)
public class DagProcUtilsTest {

DagManagementStateStore dagManagementStateStore;
SpecExecutor mockSpecExecutor;

@BeforeClass
public void setUp() {
dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
}

@Test
public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
}
phet marked this conversation as resolved.
Show resolved Hide resolved
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Mockito.verify for all the tests on dagManagementStateStore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure makes sense, added in all tests

}

@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException, ExecutionException, InterruptedException {
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

@Test(expectedExceptions = RuntimeException.class)
public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get();
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException {
Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3);

Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build();
Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build();
Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build();
List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3);
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Config jobConfig = jobConfigs.get(i);
FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
if(i==2){
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty()));
}
else{
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
phet marked this conversation as resolved.
Show resolved Hide resolved
ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty()));
}
}
return jobExecutionPlans;
}
}
Loading