From d0aa4508c89c345b6e141fa534451c5c69a8d07b Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Mon, 2 Oct 2023 10:32:47 -0700 Subject: [PATCH] Fix compilation errors & add isReminder flag --- .../configuration/ConfigurationKeys.java | 1 + .../api/MysqlMultiActiveLeaseArbiter.java | 21 ++++++++++++------- .../orchestration/FlowTriggerHandler.java | 2 ++ .../modules/orchestration/Orchestrator.java | 9 ++++---- .../scheduler/GobblinServiceJobScheduler.java | 4 +++- .../orchestration/OrchestratorTest.java | 4 ++-- 6 files changed, 27 insertions(+), 14 deletions(-) diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 198c8de352b..81608655a0e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -109,6 +109,7 @@ public class ConfigurationKeys { // Event time of flow action to orchestrate using the multi-active lease arbiter public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = "orchestratorTriggerEventTimeMillis"; public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1"; + public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent"; public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis"; public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000; // Note: linger should be on the order of seconds even though we measure in millis diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 82922dcdde8..2d3e7ae3212 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -51,7 +51,7 @@ * We also maintain another table in the database with two constants that allow us to coordinate between participants * and ensure they are using the same values to base their coordination off of. * [epsilon | linger] - * `epsilon` - time within we consider to event timestamps to be overlapping and can consolidate + * `epsilon` - time within we consider two event timestamps to be overlapping and can consolidate * `linger` - minimum time to occur before another host may attempt a lease on a flow event. It should be much greater * than epsilon and encapsulate executor communication latency including retry attempts * @@ -94,7 +94,15 @@ protected interface CheckedFunction { private String thisTableAcquireLeaseIfFinishedStatement; // TODO: define retention on this table - // Note: we need to set `event_timestamp` default value to turn off timestamp auto-updates for row modifications + /* + Notes: + - Set `event_timestamp` default value to turn off timestamp auto-updates for row modifications which alters this col + in an unexpected way upon completing the lease + - Upon reading any timestamps from MySQL we convert the timezone from session (default) to UTC to consistently + use epoch-millis in UTC locally + - Upon using any timestamps from local we convert the timezone from UTC to session + - We desire millisecond level precision and denote that with `(3)` for the TIMESTAMP types + */ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, " @@ -115,8 +123,8 @@ protected interface CheckedFunction { + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, " + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; // Does a cross join between the two tables to have epsilon and linger values available. Returns the following values: - // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if new event timestamp (provided by caller) - // db is within epsilon of event_timestamp in the table), leaseValidityStatus (1 if lease has not expired, 2 if + // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if new event timestamp (current timestamp in + // db) is within epsilon of event_timestamp in the table), leaseValidityStatus (1 if lease has not expired, 2 if // expired, 3 if column is NULL or no longer leasing) protected static final String GET_EVENT_INFO_STATEMENT = "SELECT " + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, " @@ -131,7 +139,6 @@ protected interface CheckedFunction { protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT " + "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, " + "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, " - // TODO maybe need to change this + "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 1000 <= epsilon as is_within_epsilon, CASE " + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " @@ -213,8 +220,8 @@ private void initializeConstantsTable() throws IOException { @Override public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, boolean isReminderEvent) throws IOException { - log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction, - eventTimeMillis); + log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]" + + (isReminderEvent ? " (reminderEvent)" : ""), flowAction, eventTimeMillis); // Query lease arbiter table about this flow action Optional getResult = getExistingEventInfo(flowAction, isReminderEvent); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java index 3a1dff13c0f..6d9dcc9d6a9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java @@ -280,6 +280,8 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap, // excess flows to be triggered by the reminder functionality. prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, String.valueOf(leasedToAnotherStatus.getEventTimeMillis())); + // Use this boolean to indicate whether this is a reminder event + prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, String.valueOf(true)); // Update job data map and reset it in jobDetail jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps); return jobDataMap; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index c0975aa972b..1f54bbbf11e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -220,7 +220,8 @@ public void onUpdateSpec(Spec updatedSpec) { } - public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMillis) throws Exception { + public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMillis, boolean isReminderEvent) + throws Exception { // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time this.topologyCatalog.get().getInitComplete().await(); @@ -264,9 +265,9 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil String flowExecutionId = flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD); DagActionStore.DagAction flowAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH); - flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis); - _log.info("Multi-active scheduler finished handling trigger event: [{}, triggerEventTimestamp: {}]", flowAction, - triggerTimestampMillis); + flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, triggerTimestampMillis, isReminderEvent); + _log.info("Multi-active scheduler finished handling trigger event: [{}, triggerEventTimestamp: {}]" + + (isReminderEvent ? " (reminderEvent)" : ""), flowAction, triggerTimestampMillis); } else { Dag jobExecutionPlanDag = jobExecutionPlanDagOptional.get(); if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index daa96af2f4c..45053a93d2c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -503,7 +503,9 @@ public void runJob(Properties jobProps, JobListener jobListener) throws JobExcep String triggerTimestampMillis = jobProps.getProperty( ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL); - this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis)); + boolean isReminderEvent = + Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "false")); + this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis), isReminderEvent); } catch (Exception e) { String exceptionPrefix = "Failed to run Spec: " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); log.warn(exceptionPrefix + " because", e); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 2d3f4b3f917..444f2dc8cd6 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -341,13 +341,13 @@ public void doNotRegisterMetricsAdhocFlows() throws Exception { flowProps.put("gobblin.flow.destinationIdentifier", "destination"); flowProps.put("flow.allowConcurrentExecution", false); FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), Optional.absent()); - this.orchestrator.orchestrate(adhocSpec, flowProps, 0); + this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false); String metricName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", "flow0", ServiceMetricNames.COMPILED); Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName)); flowProps.setProperty("job.schedule", "0/2 * * * * ?"); FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), Optional.absent()); - this.orchestrator.orchestrate(scheduledSpec, flowProps, 0); + this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false); Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName)); } } \ No newline at end of file