Skip to content

Commit

Permalink
Fix compilation errors & add isReminder flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Urmi Mustafi committed Oct 2, 2023
1 parent 0891ab6 commit d0aa450
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class ConfigurationKeys {
// Event time of flow action to orchestrate using the multi-active lease arbiter
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = "orchestratorTriggerEventTimeMillis";
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1";
public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
// Note: linger should be on the order of seconds even though we measure in millis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* We also maintain another table in the database with two constants that allow us to coordinate between participants
* and ensure they are using the same values to base their coordination off of.
* [epsilon | linger]
* `epsilon` - time within we consider to event timestamps to be overlapping and can consolidate
* `epsilon` - time within we consider two event timestamps to be overlapping and can consolidate
* `linger` - minimum time to occur before another host may attempt a lease on a flow event. It should be much greater
* than epsilon and encapsulate executor communication latency including retry attempts
*
Expand Down Expand Up @@ -94,7 +94,15 @@ protected interface CheckedFunction<T, R> {
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
// Note: we need to set `event_timestamp` default value to turn off timestamp auto-updates for row modifications
/*
Notes:
- Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col
in an unexpected way upon completing the lease
- Upon reading any timestamps from MySQL we convert the timezone from session (default) to UTC to consistently
use epoch-millis in UTC locally
- Upon using any timestamps from local we convert the timezone from UTC to session
- We desire millisecond level precision and denote that with `(3)` for the TIMESTAMP types
*/
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, "
Expand All @@ -115,8 +123,8 @@ protected interface CheckedFunction<T, R> {
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
+ "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Does a cross join between the two tables to have epsilon and linger values available. Returns the following values:
// event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if new event timestamp (provided by caller)
// db is within epsilon of event_timestamp in the table), leaseValidityStatus (1 if lease has not expired, 2 if
// event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if new event timestamp (current timestamp in
// db) is within epsilon of event_timestamp in the table), leaseValidityStatus (1 if lease has not expired, 2 if
// expired, 3 if column is NULL or no longer leasing)
protected static final String GET_EVENT_INFO_STATEMENT = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
Expand All @@ -131,7 +139,6 @@ protected interface CheckedFunction<T, R> {
protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
// TODO maybe need to change this
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 "
Expand Down Expand Up @@ -213,8 +220,8 @@ private void initializeConstantsTable() throws IOException {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]" +
(isReminderEvent ? " (reminderEvent)" : ""), flowAction, eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
// Use this boolean to indicate whether this is a reminder event
prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, String.valueOf(true));
// Update job data map and reset it in jobDetail
jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
return jobDataMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ public void onUpdateSpec(Spec updatedSpec) {

}

public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMillis) throws Exception {
public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMillis, boolean isReminderEvent)
throws Exception {
// Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time
this.topologyCatalog.get().getInitComplete().await();

Expand Down Expand Up @@ -264,9 +265,9 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
DagActionStore.DagAction flowAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH);
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis);
_log.info("Multi-active scheduler finished handling trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
triggerTimestampMillis);
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis, isReminderEvent);
_log.info("Multi-active scheduler finished handling trigger event: [{}, triggerEventTimestamp: {}]" +
(isReminderEvent ? " (reminderEvent)" : ""), flowAction, triggerTimestampMillis);
} else {
Dag<JobExecutionPlan> jobExecutionPlanDag = jobExecutionPlanDagOptional.get();
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,9 @@ public void runJob(Properties jobProps, JobListener jobListener) throws JobExcep
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis));
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "false"));
this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis), isReminderEvent);
} catch (Exception e) {
String exceptionPrefix = "Failed to run Spec: " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
log.warn(exceptionPrefix + " because", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,13 @@ public void doNotRegisterMetricsAdhocFlows() throws Exception {
flowProps.put("gobblin.flow.destinationIdentifier", "destination");
flowProps.put("flow.allowConcurrentExecution", false);
FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), Optional.absent());
this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
String metricName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", "flow0", ServiceMetricNames.COMPILED);
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));

flowProps.setProperty("job.schedule", "0/2 * * * * ?");
FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), Optional.absent());
this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
}
}

0 comments on commit d0aa450

Please sign in to comment.