Skip to content

Commit

Permalink
[GOBBLIN-2073] Use previous eventTime for lease arbitration of remind…
Browse files Browse the repository at this point in the history
…er dagActions (#3952)

* Use previous eventTime for lease arbitration of reminder dagActions
* Move isReminder and eventTimestamp into dagAction
* Fix existing tests and build error
* Decouple dependent MyMALA tests to prevent flakiness and allow debuggability
* Creates a leaseObject to contain dagAction, isReminder, and eventTimeMillis
  • Loading branch information
umustafi authored Jun 17, 2024
1 parent ac5d556 commit c064604
Show file tree
Hide file tree
Showing 14 changed files with 357 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
Expand Down Expand Up @@ -60,20 +61,20 @@ public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
/**
* Uses a dagAction & reminder duration in milliseconds to create a reminder job that will fire
* `reminderDurationMillis` after the current time
* @param dagAction
* @param dagActionLeaseObject
* @param reminderDurationMillis
* @throws SchedulerException
*/
public void scheduleReminder(DagActionStore.DagAction dagAction, long reminderDurationMillis)
public void scheduleReminder(DagActionStore.DagActionLeaseObject dagActionLeaseObject, long reminderDurationMillis)
throws SchedulerException {
JobDetail jobDetail = createReminderJobDetail(dagAction);
Trigger trigger = createReminderJobTrigger(dagAction, reminderDurationMillis, System::currentTimeMillis);
JobDetail jobDetail = createReminderJobDetail(dagActionLeaseObject);
Trigger trigger = createReminderJobTrigger(dagActionLeaseObject.getDagAction(), reminderDurationMillis,
System::currentTimeMillis);
quartzScheduler.scheduleJob(jobDetail, trigger);
}

public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws SchedulerException {
JobDetail jobDetail = createReminderJobDetail(dagAction);
quartzScheduler.deleteJob(jobDetail.getKey());
quartzScheduler.deleteJob(createJobKey(dagAction));
}

/**
Expand All @@ -84,6 +85,7 @@ public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws Sch
@Slf4j
public static class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";

@Override
public void execute(JobExecutionContext context) {
Expand All @@ -94,17 +96,18 @@ public void execute(JobExecutionContext context) {
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
long flowExecutionId = jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType = (DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);

log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", flowName: " + flowName
+ ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName + ", dagActionType: " + dagActionType + ")");

DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType, true);
DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject = new DagActionStore.DagActionLeaseObject(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType),
true, eventTimeMillis);
log.info("DagProc reminder triggered for dagAction event: {}", reminderDagActionLeaseObject);

try {
DagManagement dagManagement = GobblinServiceManager.getClass(DagManagement.class);
dagManagement.addDagAction(dagAction);
dagManagement.addReminderDagAction(reminderDagActionLeaseObject);
} catch (IOException e) {
log.error("Failed to add DagAction to DagManagement. Action: {}", dagAction);
log.error("Failed to add DagAction event to DagManagement. dagAction event: {}", reminderDagActionLeaseObject);
}
}
}
Expand All @@ -117,20 +120,30 @@ public static String createDagActionReminderKey(DagActionStore.DagAction dagActi
dagAction.getFlowExecutionId(), dagAction.getJobName(), dagAction.getDagActionType());
}

/**
* Creates a JobKey object for the reminder job where the name is the DagActionReminderKey from above and the group is
* the flowGroup
*/
public static JobKey createJobKey(DagActionStore.DagAction dagAction) {
return new JobKey(createDagActionReminderKey(dagAction), dagAction.getFlowGroup());
}

/**
* Creates a jobDetail containing flow and job identifying information in the jobDataMap, uniquely identified
* by a key comprised of the dagAction's fields.
*/
public static JobDetail createReminderJobDetail(DagActionStore.DagAction dagAction) {
public static JobDetail createReminderJobDetail(DagActionStore.DagActionLeaseObject dagActionLeaseObject) {
JobDataMap dataMap = new JobDataMap();
dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId());
dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, dagAction.getDagActionType());
dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagActionLeaseObject.getDagAction().getFlowName());
dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagActionLeaseObject.getDagAction().getFlowGroup());
dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagActionLeaseObject.getDagAction().getJobName());
dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagActionLeaseObject.getDagAction().getFlowExecutionId());
dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, dagActionLeaseObject.getDagAction().getDagActionType());
dataMap.put(ReminderJob.FLOW_ACTION_EVENT_TIME_KEY, dagActionLeaseObject.getEventTimeMillis());

return JobBuilder.newJob(ReminderJob.class)
.withIdentity(createDagActionReminderKey(dagAction), dagAction.getFlowGroup())
.withIdentity(createDagActionReminderKey(dagActionLeaseObject.getDagAction()),
dagActionLeaseObject.getDagAction().getFlowGroup())
.usingJobData(dataMap)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ class DagAction {
final long flowExecutionId;
final String jobName;
final DagActionType dagActionType;
final boolean isReminder;

public DagAction(String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionType dagActionType) {
this(flowGroup, flowName, flowExecutionId, jobName, dagActionType, false);
}

public static DagAction forFlow(String flowGroup, String flowName, long flowExecutionId, DagActionType dagActionType) {
return new DagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType);
Expand Down Expand Up @@ -83,6 +78,24 @@ public DagManager.DagId getDagId() {
}
}

@Data
@RequiredArgsConstructor
class DagActionLeaseObject {
final DagAction dagAction;
final boolean isReminder;
final long eventTimeMillis;

/**
* Creates a lease object for a dagAction and eventTimeMillis representing an original event (isReminder is False)
*/
public DagActionLeaseObject(DagAction dagAction, long eventTimeMillis) {
this.dagAction = dagAction;
this.isReminder = false;
this.eventTimeMillis = eventTimeMillis;
}
}



/**
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,13 @@
*/
public interface DagManagement {

/**
* Used to add a dagAction event to DagManagement
*/
void addDagAction(DagActionStore.DagAction dagAction) throws IOException;

/**
* Used to add reminder dagActions to the queue that already contain an eventTimestamp from the previous lease attempt
*/
void addReminderDagAction(DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
private final boolean isMultiActiveExecutionEnabled;
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<DagActionStore.DagActionLeaseObject> dagActionLeaseObjectQueue = new LinkedBlockingQueue<>();
private final DagManagementStateStore dagManagementStateStore;

@Inject
Expand Down Expand Up @@ -110,10 +110,20 @@ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagAc
@Override
public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
// TODO: Used to track missing dag issue, remove later as needed
log.info("Add dagAction {}", dagAction);
log.info("Add original (non-reminder) dagAction {}", dagAction);

if (!this.dagActionQueue.offer(dagAction)) {
throw new RuntimeException("Could not add dag action " + dagAction + " to the queue");
if (!this.dagActionLeaseObjectQueue.offer(new DagActionStore.DagActionLeaseObject(dagAction, false, System.currentTimeMillis()))) {
throw new RuntimeException(String.format("Could not add dag action to the queue %s", dagAction));
}
}

@Override
public synchronized void addReminderDagAction(DagActionStore.DagActionLeaseObject reminderDagActionLeaseObject) {
// TODO: Used to track missing dag issue, remove later as needed
log.info("Add reminder dagAction {}", reminderDagActionLeaseObject);

if (!this.dagActionLeaseObjectQueue.offer(reminderDagActionLeaseObject)) {
throw new RuntimeException(String.format("Could not add reminder dag action to the queue %s", reminderDagActionLeaseObject));
}
}

Expand All @@ -127,20 +137,17 @@ public DagTask next() {
while (true) {
DagActionStore.DagAction dagAction = null;
try {
dagAction = this.dagActionQueue.take();
DagActionStore.DagActionLeaseObject dagActionLeaseObject = this.dagActionLeaseObjectQueue.take();
dagAction = dagActionLeaseObject.getDagAction();
/* Create triggers for original (non-reminder) dag actions of type ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
Reminder triggers are used to inform hosts once the job start deadline and flow finish deadline are passed;
then only is lease arbitration done to enforce the deadline violation and fail the job or flow if needed */
if (!dagAction.isReminder() && dagAction.dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
createJobStartDeadlineTrigger(dagAction);
} else if (!dagAction.isReminder() && dagAction.dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
createFlowFinishDeadlineTrigger(dagAction);
} else if (!dagAction.isReminder
|| dagAction.dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE
|| dagAction.dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
// todo - fix bug of a reminder event getting a lease even when the first attempt succeeded.
// for now, avoid processing reminder events if they are not for deadline dag actions
LeaseAttemptStatus leaseAttemptStatus = retrieveLeaseStatus(dagAction);
if (!dagActionLeaseObject.isReminder() && dagAction.dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
createJobStartDeadlineTrigger(dagActionLeaseObject);
} else if (!dagActionLeaseObject.isReminder() && dagAction.dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
createFlowFinishDeadlineTrigger(dagActionLeaseObject);
} else { // Handle original non-deadline dagActions as well as reminder events of all types
LeaseAttemptStatus leaseAttemptStatus = retrieveLeaseStatus(dagActionLeaseObject);
if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
return createDagTask(dagAction, (LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
}
Expand All @@ -152,20 +159,22 @@ public DagTask next() {
}
}

private void createJobStartDeadlineTrigger(DagActionStore.DagAction dagAction) throws SchedulerException, IOException {
private void createJobStartDeadlineTrigger(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
throws SchedulerException, IOException {
long timeOutForJobStart = DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
dagAction.getDagId()).get().getNodes().get(0), DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0), DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
// todo - this timestamp is just an approximation, the real job submission has happened in past, and that is when a
// ENFORCE_JOB_START_DEADLINE dag action was created; we are just processing that dag action here
long jobSubmissionTime = System.currentTimeMillis();
long reminderDuration = jobSubmissionTime + timeOutForJobStart - System.currentTimeMillis();

dagActionReminderScheduler.get().scheduleReminder(dagAction, reminderDuration);
dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject, reminderDuration);
}

private void createFlowFinishDeadlineTrigger(DagActionStore.DagAction dagAction) throws SchedulerException, IOException {
private void createFlowFinishDeadlineTrigger(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
throws SchedulerException, IOException {
long timeOutForJobFinish;
Dag.DagNode<JobExecutionPlan> dagNode = this.dagManagementStateStore.getDag(dagAction.getDagId()).get().getNodes().get(0);
Dag.DagNode<JobExecutionPlan> dagNode = this.dagManagementStateStore.getDag(dagActionLeaseObject.getDagAction().getDagId()).get().getNodes().get(0);

try {
timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
Expand All @@ -180,25 +189,25 @@ private void createFlowFinishDeadlineTrigger(DagActionStore.DagAction dagAction)
long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
long reminderDuration = flowStartTime + timeOutForJobFinish - System.currentTimeMillis();

dagActionReminderScheduler.get().scheduleReminder(dagAction, reminderDuration);
dagActionReminderScheduler.get().scheduleReminder(dagActionLeaseObject, reminderDuration);
}

/**
* Returns a {@link LeaseAttemptStatus} associated with the
* `dagAction` by calling
* {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, long, boolean, boolean)}.
* @param dagAction
* {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagActionLeaseObject, boolean)}.
* @param dagActionLeaseObject
* @return
* @throws IOException
* @throws SchedulerException
*/
private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction dagAction)
private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagActionLeaseObject dagActionLeaseObject)
throws IOException, SchedulerException {
// TODO: need to handle reminder events and flag them
// Uses reminder flag to determine whether to use current time as event time or previously saved event time
LeaseAttemptStatus leaseAttemptStatus = this.dagActionProcessingLeaseArbiter
.tryAcquireLease(dagAction, System.currentTimeMillis(), dagAction.isReminder, false);
/* Schedule a reminder for the event unless the lease has been completed to safeguard against the case where even
we, when we might become the lease owner still fail to complete processing
.tryAcquireLease(dagActionLeaseObject, false);
/* Schedule a reminder for the event unless the lease has been completed to safeguard against the case where
even we, when we might become the lease owner still fail to complete processing
*/
if (!(leaseAttemptStatus instanceof LeaseAttemptStatus.NoLongerLeasingStatus)) {
scheduleReminderForEvent(leaseAttemptStatus);
Expand Down Expand Up @@ -228,11 +237,11 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt
}

/* Schedules a reminder for the flow action using {@link DagActionReminderScheduler} to reattempt the lease after the
current leaseholder's grant would have expired.
current leaseholder's grant would have expired. It saves the previous eventTimeMillis in the dagAction to use upon
reattempting the lease.
*/
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
leaseStatus.getMinimumLingerDurationMillis());
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagActionLeaseObject(), leaseStatus.getMinimumLingerDurationMillis());
}
}
Loading

0 comments on commit c064604

Please sign in to comment.