diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index b152d8e15d8..82922dcdde8 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -94,7 +94,7 @@ protected interface CheckedFunction { private String thisTableAcquireLeaseIfFinishedStatement; // TODO: define retention on this table - // Note: we need to set `event_timestamp` default value only to turn off timestamp auto-updates for row modifications + // Note: we need to set `event_timestamp` default value to turn off timestamp auto-updates for row modifications private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, " @@ -235,6 +235,20 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l int dbLinger = getResult.get().getDbLinger(); Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp(); + // For reminder event, we can stop early if the reminder eventTimeMillis is older than the current event in the db + // because db laundering tells us that the currently worked on db event is newer and will have its own reminders + if (isReminderEvent) { + if (eventTimeMillis < dbEventTimestamp.getTime()) { + log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event trigger is being worked on, so this " + + "older reminder will be dropped."); + return new NoLongerLeasingStatus(); + } if (eventTimeMillis > dbEventTimestamp.getTime()) { + log.warn("tryAcquireLease for [{}, eventTimestamp: {}] - Severe constraint violation encountered: a reminder " + + "event newer than db event was found when db laundering should ensure monotonically increasing " + + "laundered event times."); + } + } + log.info("Multi-active arbiter replacing local trigger event timestamp [{}, triggerEventTimestamp: {}] with " + "database eventTimestamp {} (in epoch-millis)", flowAction, eventTimeMillis, dbCurrentTimestamp.getTime()); @@ -256,7 +270,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l else if (leaseValidityStatus == 2) { log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of whether " + "same or distinct event)", flowAction, dbCurrentTimestamp.getTime()); - if (isWithinEpsilon) { + if (isWithinEpsilon && !isReminderEvent) { log.warn("Lease should not be out of date for the same trigger event since epsilon << linger for flowAction" + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp {}, linger {}", flowAction, dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);