Skip to content

Commit

Permalink
add some logs and cache DagManagement in DagActionReminderScheduler (#…
Browse files Browse the repository at this point in the history
…3997)

* do not retry SQLIntegrityConstraintViolationException in KafkaJobStatusMonitor
  • Loading branch information
arjun4084346 authored Jul 10, 2024
1 parent 5879723 commit a1a5d4b
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;


/**
Expand All @@ -57,16 +56,17 @@
@Slf4j
@Singleton
public class DagActionReminderScheduler {
public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY = "DagActionReminderScheduler";
public static final String RetryReminderKeyGroup = "RetryReminder";
public static final String DeadlineReminderKeyGroup = "DeadlineReminder";
private final Scheduler quartzScheduler;
private final DagManagement dagManagement;

@Inject
public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, DagManagement dagManagement)
throws SchedulerException {
// Creates a new Scheduler to be used solely for the DagProc reminders
this.quartzScheduler = schedulerFactory.getScheduler();
this.dagManagement = dagManagement;
}

/**
Expand All @@ -77,28 +77,29 @@ public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
* @throws SchedulerException
*/
public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long reminderDurationMillis,
boolean isDeadlineReminder)
throws SchedulerException {
boolean isDeadlineReminder) throws SchedulerException {
DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams, isDeadlineReminder);
Trigger trigger = createReminderJobTrigger(leaseParams, reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
log.info("Reminder set for dagAction {} to fire after {} ms, isDeadlineTrigger: {}",
leaseParams.getDagAction(), reminderDurationMillis, isDeadlineReminder);
log.info("Going to set reminder for dagAction {} to fire after {} ms, isDeadlineTrigger: {}",
dagAction, reminderDurationMillis, isDeadlineReminder);
quartzScheduler.scheduleJob(jobDetail, trigger);
}

public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, boolean isDeadlineTrigger) throws SchedulerException {
log.info("Reminder unset for dagAction {}, isDeadlineTrigger: {}", leaseParams, isDeadlineTrigger);
quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger));
log.info("Reminder unset for LeaseParams {}, isDeadlineTrigger: {}", leaseParams, isDeadlineTrigger);
if (!quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger))) {
log.warn("Reminder not found for {}. Possibly the event is received out-of-order.", leaseParams);
}
}

/**
* Static class used to store information regarding a pending dagAction that needs to be revisited at a later time
* by {@link DagManagement} interface to re-attempt a lease on if it has not been completed by the previous owner.
* These jobs are scheduled and used by the {@link DagActionReminderScheduler}.
*/
@Slf4j
public static class ReminderJob implements Job {
public class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";

Expand All @@ -119,8 +120,7 @@ public void execute(JobExecutionContext context) {
log.info("DagProc reminder triggered for dagAction event: {}", reminderLeaseParams);

try {
DagManagement dagManagement = GobblinServiceManager.getClass(DagManagement.class);
dagManagement.addReminderDagAction(reminderLeaseParams);
dagManagement.addDagAction(reminderLeaseParams);
} catch (IOException e) {
log.error("Failed to add DagAction event to DagManagement. dagAction event: {}", reminderLeaseParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public LeaseParams(DagAction dagAction, long eventTimeMillis) {
this(dagAction, false, eventTimeMillis);
}

public LeaseParams(DagAction dagAction) {
this(dagAction, System.currentTimeMillis());
}

/**
* Replace flow execution id in dagAction with agreed upon event time to easily track the flow
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,8 @@
* Consumption of the Dags happen through {@link DagTaskStream}.
*/
public interface DagManagement {

/**
* Used to add a dagAction event to DagManagement
*/
void addDagAction(DagActionStore.DagAction dagAction) throws IOException;

/**
* Used to add reminder dagActions to the queue that already contain an eventTimestamp from the previous lease attempt
* Used to add {@link DagActionStore.LeaseParams} to the queue
*/
void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams) throws IOException;
void addDagAction(DagActionStore.LeaseParams leaseParams) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,10 @@ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagAc
this.dagManagementStateStore = dagManagementStateStore;
}

@Override
public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
// TODO: Used to track missing dag issue, remove later as needed
log.info("Add original (non-reminder) dagAction {}", dagAction);

if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction, false, System.currentTimeMillis()))) {
throw new RuntimeException(String.format("Could not add dag action to the queue %s", dagAction));
}
}

@Override
public synchronized void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams) {
// TODO: Used to track missing dag issue, remove later as needed
log.info("Add reminder dagAction {}", reminderLeaseParams);

if (!this.leaseParamsQueue.offer(reminderLeaseParams)) {
throw new RuntimeException(String.format("Could not add reminder dag action to the queue %s", reminderLeaseParams));
public synchronized void addDagAction(DagActionStore.LeaseParams leaseParams) {
log.info("Adding {} to queue...", leaseParams);
if (!this.leaseParamsQueue.offer(leaseParams)) {
throw new RuntimeException(String.format("Could not add %s to the queue", leaseParams));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public void markDagFailed(Dag<JobExecutionPlan> dag) throws IOException {
this.dagStateStore.cleanUp(dag);
// todo - updated failedDagStateStore iff cleanup returned 1
this.failedDagStateStore.writeCheckpoint(dag);
log.info("Marked dag failed {}", DagManagerUtils.generateDagId(dag));
}

@Override
Expand All @@ -155,6 +156,7 @@ public void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
@Override
public void deleteDag(DagManager.DagId dagId) throws IOException {
this.dagStateStore.cleanUp(dagId.toString());
log.info("Deleted dag {}", dagId);
}

@Override
Expand Down Expand Up @@ -289,11 +291,14 @@ public boolean existsFlowDagAction(String flowGroup, String flowName, long flowE
@Override
public void addJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
log.info("Adding Dag Action for flowGroup {}, flowName {}, flowExecutionId {}, jobName {}, dagActionType {}",
flowGroup, flowName, flowExecutionId, jobName, dagActionType);
this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType);
}

@Override
public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOException {
log.info("Deleting Dag Action {}", dagAction);
return this.dagActionStore.deleteDagAction(dagAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
// because db laundering tells us that the currently worked on db event is newer and will have its own reminders
if (leaseParams.isReminder()) {
if (leaseParams.getEventTimeMillis() < dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new event trigger "
log.info("tryAcquireLease for {} - dbEventTimeMillis: {} - A new event trigger "
+ "is being worked on, so this older reminder will be dropped.", leaseParams,
dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
Expand Down Expand Up @@ -630,11 +630,11 @@ public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status)
status.getEventTimeMillis());
return false;
}
if( numRowsUpdated == 1) {
if (numRowsUpdated == 1) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - COMPLETED, no longer leasing"
+ " this event after this.", dagAction, status.getEventTimeMillis());
return true;
};
}
throw new IOException(String.format("Attempt to complete lease use: [%s, eventTimestamp: %s] - updated more "
+ "rows than expected", dagAction, status.getEventTimeMillis()));
}, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
// blocks (by calling Future#get()) until the submission is completed.
dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));

sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);

Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into the JobExecutionPlan
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
Expand All @@ -131,6 +129,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
dagManagementStateStore.addDagNodeState(dagNode, dagId);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean isSta
case LAUNCH :
case REEVALUATE :
case RESUME:
dagManagement.addDagAction(dagAction);
dagManagement.addDagAction(new DagActionStore.LeaseParams(dagAction));
break;
default:
log.warn("Received unsupported dagAction {}. Expected to be a RESUME, KILL, REEVALUATE or LAUNCH", dagAction.getDagActionType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.gobblin.service.monitoring;

import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -104,7 +106,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
private final StateStore<org.apache.gobblin.configuration.State> stateStore;
private final ScheduledExecutorService scheduledExecutorService;
private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume non-transient and give up
// keeping the retry timeout less until we configure retryer to retry only the transient exceptions
RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(12L), // after 12 hours, presume non-transient and give up
RETRY_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1L), // back-off to once/minute
RETRY_TYPE, RetryType.EXPONENTIAL.name()));
private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
Expand All @@ -120,6 +123,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
private final GaaSJobObservabilityEventProducer eventProducer;
private final DagManagementStateStore dagManagementStateStore;
private final boolean dagProcEngineEnabled;
private final List<Class<? extends Exception>> nonRetryableExceptions = Collections.singletonList(SQLIntegrityConstraintViolationException.class);

public KafkaJobStatusMonitor(String topic, Config config, int numThreads, JobIssueEventHandler jobIssueEventHandler,
GaaSJobObservabilityEventProducer observabilityEventProducer, DagManagementStateStore dagManagementStateStore)
Expand All @@ -139,6 +143,7 @@ public KafkaJobStatusMonitor(String topic, Config config, int numThreads, JobIss
? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
: ConfigFactory.empty();
// log exceptions to expose errors we suffer under and/or guide intervention when resolution not readily forthcoming
// todo - this retryer retries all the exceptions. we should make it retry only really transient
this.persistJobStatusRetryer =
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() {
@Override
Expand Down Expand Up @@ -230,7 +235,16 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
if (this.dagProcEngineEnabled && DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
// todo - retried/resumed jobs *may* not be handled here, we may want to create their dag action elsewhere
this.dagManagementStateStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
try {
this.dagManagementStateStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
} catch (Exception e) {
if (isExceptionInstanceOf(e, nonRetryableExceptions)) {
// todo - add metrics
log.warn("Duplicate REEVALUATE Dag Action is being created. Ignoring... " + e.getMessage());
} else {
throw e;
}
}
} else if (updatedJobStatus.getRight() == NewState.RUNNING) {
DagProcUtils.removeEnforceJobStartDeadlineDagAction(dagManagementStateStore, flowGroup, flowName, flowExecutionId, jobName);
}
Expand Down Expand Up @@ -321,7 +335,12 @@ static Pair<org.apache.gobblin.configuration.State, NewState> recalcJobStatus(or
}

modifyStateIfRetryRequired(jobStatus);
return ImmutablePair.of(jobStatus, newState(jobStatus, states));
NewState newState = newState(jobStatus, states);
String newStatus = jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
if (newState == NewState.FINISHED) {
log.info("Flow {}:{}:{}:{} reached a terminal state {}", flowGroup, flowName, flowExecutionId, jobName, newStatus);
}
return ImmutablePair.of(jobStatus, newState);
} catch (Exception e) {
log.warn("Meet exception when adding jobStatus to state store at "
+ e.getStackTrace()[0].getClassName() + "line number: " + e.getStackTrace()[0].getLineNumber(), e);
Expand Down Expand Up @@ -409,4 +428,7 @@ public static long getExecutionIdFromTableName(String tableName) {

protected abstract org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event);

public static boolean isExceptionInstanceOf(Exception exception, List<Class<? extends Exception>> typesList) {
return typesList.stream().anyMatch(e -> e.isInstance(exception));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@

import org.apache.gobblin.configuration.ConfigurationKeys;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;


public class DagActionReminderSchedulerTest {
String flowGroup = "fg";
Expand All @@ -55,10 +59,12 @@ public class DagActionReminderSchedulerTest {
DagActionReminderScheduler dagActionReminderScheduler;

@BeforeClass
private void setup() throws SchedulerException {
private void setup() throws Exception {
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
schedulerFactory.getScheduler();
this.dagActionReminderScheduler = new DagActionReminderScheduler(schedulerFactory);
DagManagement dagManagement = mock(DagManagement.class);
doNothing().when(dagManagement).addDagAction(any());
this.dagActionReminderScheduler = new DagActionReminderScheduler(schedulerFactory, dagManagement);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ statuses that should cause the next() method to continue polling for tasks befor
LeaseObtainedStatus to the taskStream to break its loop and return a newly created dagTask
*/
DagActionStore.DagAction launchAction = new DagActionStore.DagAction("fg", "fn", 12345L, "jn", DagActionStore.DagActionType.LAUNCH);
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
DagActionStore.LeaseParams
dagActionLeaseParams = new DagActionStore.LeaseParams(launchAction, false, System.currentTimeMillis());
dagManagementTaskStream.addDagAction(dagActionLeaseParams);
dagManagementTaskStream.addDagAction(dagActionLeaseParams);
dagManagementTaskStream.addDagAction(dagActionLeaseParams);
when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
.tryAcquireLease(any(DagActionStore.LeaseParams.class), anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
Expand Down

0 comments on commit a1a5d4b

Please sign in to comment.