Skip to content

Commit

Permalink
[GOBBLIN-2097] Use unique JobDataMaps and Properties to use for remin…
Browse files Browse the repository at this point in the history
…der events (#3984)

* Use unique JobDataMaps and Properties to use for reminder events
  • Loading branch information
umustafi authored Jun 25, 2024
1 parent d9f83e6 commit e3108dd
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ private boolean persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus le
}

/**
* This method is used by {@link FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to check on
* the other participant's progress to finish acting on a dag action after the time the lease should expire.
* This method is used by {@link FlowLaunchHandler#handleFlowLaunchTriggerEvent} to schedule a self-reminder to check
* on the other participant's progress to finish acting on a dag action after the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for (stored in `consensusDagAction`) and the minimum time after
* which reminder should occur
Expand Down Expand Up @@ -197,7 +197,8 @@ protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatu
// refer to the same set of jobProperties)
String reminderSuffix = createSuffixForJobTrigger(status);
JobKey reminderJobKey = new JobKey(origJobKey.getName() + reminderSuffix, origJobKey.getGroup());
JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, reminderJobKey, status);
JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, status);
jobDetail.setKey(reminderJobKey);
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {} with "
Expand All @@ -223,56 +224,59 @@ public static String createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey provides and the Properties map to
* contain the cron scheduler for the reminder event and information about the event to revisit
* @param originalKey
* @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, JobKey reminderKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
JobDetailImpl jobDetail = (JobDetailImpl) this.schedulerService.getScheduler().getJobDetail(originalKey);
jobDetail.setKey(reminderKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap = updatePropsInJobDataMap(jobDataMap, status, schedulerMaxBackoffMillis);
jobDetail.setJobDataMap(jobDataMap);
return jobDetail;
// 1. shallow `.clone()` this top-level `JobDetailImpl`
JobDetailImpl clonedJobDetail = (JobDetailImpl) this.schedulerService.getScheduler().getJobDetail(originalKey).clone();
JobDataMap originalJobDataMap = clonedJobDetail.getJobDataMap();
// 2. create a fresh `JobDataMap` specific to the reminder
JobDataMap newJobDataMap = cloneAndUpdateJobProperties(originalJobDataMap, status, schedulerMaxBackoffMillis);
// 3. update `clonedJobDetail` to point to the new `JobDataMap`
clonedJobDetail.setJobDataMap(newJobDataMap);
return clonedJobDetail;
}

public static Properties getJobPropertiesFromJobDetail(JobDetail jobDetail) {
return (Properties) jobDetail.getJobDataMap().get(GobblinServiceJobScheduler.PROPERTIES_KEY);
}

/**
* Updates the cronExpression, reminderTimestamp, originalEventTime values in the properties map of a JobDataMap
* provided returns the updated JobDataMap to the user
* Adds the cronExpression, reminderTimestamp, originalEventTime values in the properties map of a new jobDataMap
* cloned from the one provided and returns the new JobDataMap to the user.
* `jobDataMap` and its `GobblinServiceJobScheduler.PROPERTIES_KEY` field are shallow, not deep-copied
* @param jobDataMap
* @param leasedToAnotherStatus
* @param schedulerMaxBackoffMillis
* @return
*/
@VisibleForTesting
public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
public static JobDataMap cloneAndUpdateJobProperties(JobDataMap jobDataMap,
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus, int schedulerMaxBackoffMillis) {
Properties prevJobProps = (Properties) jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
JobDataMap newJobDataMap = (JobDataMap) jobDataMap.clone();
Properties newJobProperties =
(Properties) ((Properties) jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY)).clone();
// Add a small randomization to the minimum reminder wait time to avoid 'thundering herd' issue
long delayPeriodMillis = leasedToAnotherStatus.getMinimumLingerDurationMillis()
+ random.nextInt(schedulerMaxBackoffMillis);
String cronExpression = createCronFromDelayPeriod(delayPeriodMillis);
prevJobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
newJobProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
// Saves the following properties in jobProps to retrieve when the trigger fires
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
newJobProperties.put(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
// Use the db consensus timestamp for the reminder to ensure inter-host agreement. Participant trigger timestamps
// can differ between participants and be interpreted as a reminder for a distinct flow trigger which will cause
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
newJobProperties.put(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
// Use this boolean to indicate whether this is a reminder event
prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, String.valueOf(true));
// Update job data map and reset it in jobDetail
jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
return jobDataMap;
newJobProperties.put(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, String.valueOf(true));
// Replace reference to old Properties map with new cloned Properties
newJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, newJobProperties);
return newJobDataMap;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,14 @@ private void initializeConstantsTable() throws IOException {

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject, boolean adoptConsensusFlowExecutionId) throws IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, is: {}, triggerEventTimestamp: {}]",
dagActionLeaseObject.getDagAction(), dagActionLeaseObject.isReminder() ? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis());
log.info("Multi-active arbiter about to handle trigger event: {}", dagActionLeaseObject);
// Query lease arbiter table about this dag action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(dagActionLeaseObject);

try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 1: no existing row for this dag action,"
+ " then go ahead and insert", dagActionLeaseObject.getDagAction(),
dagActionLeaseObject.isReminder() ? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis());
log.debug("tryAcquireLease for {} - CASE 1: no existing row for this dag action, then go ahead and insert",
dagActionLeaseObject);
int numRowsUpdated = attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
.initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random() * DELAY_FOR_RETRY_RANGE_MILLIS)
Expand All @@ -280,33 +278,29 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagActionLeaseObject da
// because db laundering tells us that the currently worked on db event is newer and will have its own reminders
if (dagActionLeaseObject.isReminder()) {
if (dagActionLeaseObject.getEventTimeMillis() < dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - A new event trigger "
+ "is being worked on, so this older reminder will be dropped.", dagActionLeaseObject.getDagAction(),
dagActionLeaseObject.isReminder ? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis(),
log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - A new event trigger "
+ "is being worked on, so this older reminder will be dropped.", dagActionLeaseObject,
dbEventTimestamp);
return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
if (dagActionLeaseObject.getEventTimeMillis() > dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
log.warn("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Severe constraint "
log.warn("tryAcquireLease for {} - dbEventTimeMillis: {} - Severe constraint "
+ "violation encountered: a reminder event newer than db event was found when db laundering should "
+ "ensure monotonically increasing laundered event times.", dagActionLeaseObject.getDagAction(),
dagActionLeaseObject.isReminder ? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis(),
+ "ensure monotonically increasing laundered event times.", dagActionLeaseObject,
dbEventTimestamp.getTime());
}
if (dagActionLeaseObject.getEventTimeMillis() == dbEventTimestamp.getTime()) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", dagActionLeaseObject.getDagAction(),
dagActionLeaseObject.isReminder ? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis(),
dbEventTimestamp);
log.debug("tryAcquireLease for {} - dbEventTimeMillis: {} - Reminder event time "
+ "is the same as db event.", dagActionLeaseObject, dbEventTimestamp);
}
}

log.info("Multi-active arbiter replacing local trigger event timestamp [{}, is: {}, triggerEventTimestamp: {}] "
+ "with database eventTimestamp {} (in epoch-millis)", dagActionLeaseObject.getDagAction(),
dagActionLeaseObject.isReminder ? "reminder" : "original", dagActionLeaseObject.getEventTimeMillis(),
dbCurrentTimestamp.getTime());

// TODO: check whether reminder event before replacing flowExecutionId
if (adoptConsensusFlowExecutionId) {
log.info("Multi-active arbiter replacing local trigger event timestamp {} with database eventTimestamp {} (in "
+ "epoch-millis)", dagActionLeaseObject, dbCurrentTimestamp.getTime());
}
/* Note that we use `adoptConsensusFlowExecutionId` parameter's value to determine whether we should use the db
laundered event timestamp as the flowExecutionId or maintain the original one
*/
Expand Down Expand Up @@ -444,7 +438,7 @@ protected int attemptLeaseIfNewRow(DagActionStore.DagAction dagAction, Exponenti
} catch (InterruptedException e2) {
throw new IOException(e2);
}
throw e;
throw e;
}
catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Properties;

import org.junit.Assert;
import org.mockito.Mockito;
import org.quartz.JobDataMap;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -56,7 +57,9 @@ private static String truncateFirstTwoFieldsOfCronExpression(String cronExpressi

/**
* Provides an input with all three values (cronExpression, reminderTimestamp, originalEventTime) set in the map
* Properties and checks that they are updated properly
* Properties and checks that they are updated properly in new jobDataMap's Properties object. It checks that the
* JobDataMap returned along with the Properties object it contains do not reference the same
* original objects.
*/
@Test
public void testUpdatePropsInJobDataMap() {
Expand All @@ -66,18 +69,28 @@ public void testUpdatePropsInJobDataMap() {
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY, "0");
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "1");
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, originalProperties);
JobDataMap spyOldJobDataMap = Mockito.spy(oldJobDataMap);

JobDataMap newJobDataMap = FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
schedulerBackOffMillis);
JobDataMap newJobDataMap =
FlowLaunchHandler.cloneAndUpdateJobProperties(spyOldJobDataMap, leasedToAnotherStatus, schedulerBackOffMillis);
Properties newProperties = (Properties) newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertNotEquals("0",
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));

Assert.assertNotSame(oldJobDataMap, newJobDataMap);
Assert.assertNotSame(originalProperties, newProperties);
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
// Verify that only clone() and get() methods are called on the oldJobDataMap
Mockito.verify(spyOldJobDataMap).clone();
Mockito.verify(spyOldJobDataMap).get(Mockito.any());
Mockito.verifyNoMoreInteractions(spyOldJobDataMap);
}


/**
* Provides input with an empty Properties object and checks that the three values in question are set.
*/
Expand All @@ -86,15 +99,27 @@ public void testSetPropsInJobDataMap() {
JobDataMap oldJobDataMap = new JobDataMap();
Properties originalProperties = new Properties();
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, originalProperties);
JobDataMap spyOldJobDataMap = Mockito.spy(oldJobDataMap);

JobDataMap newJobDataMap = FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
JobDataMap newJobDataMap = FlowLaunchHandler.cloneAndUpdateJobProperties(spyOldJobDataMap, leasedToAnotherStatus,
schedulerBackOffMillis);
Properties newProperties = (Properties) newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);

Assert.assertNotSame(oldJobDataMap, newJobDataMap);
Assert.assertNotSame(originalProperties, newProperties);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertTrue(newProperties.containsKey(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));

Assert.assertNotSame(oldJobDataMap, newJobDataMap);
Assert.assertNotSame(originalProperties, newProperties);
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
// Verify that only clone() and get() methods are called on the oldJobDataMap
Mockito.verify(spyOldJobDataMap).clone();
Mockito.verify(spyOldJobDataMap).get(Mockito.any());
Mockito.verifyNoMoreInteractions(spyOldJobDataMap);
}

/**
Expand Down

0 comments on commit e3108dd

Please sign in to comment.