Skip to content

Commit

Permalink
Handle reminder events properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Urmi Mustafi committed Oct 2, 2023
1 parent ecc9408 commit 0891ab6
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected interface CheckedFunction<T, R> {
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
// Note: we need to set `event_timestamp` default value only to turn off timestamp auto-updates for row modifications
// Note: we need to set `event_timestamp` default value to turn off timestamp auto-updates for row modifications
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, "
Expand Down Expand Up @@ -235,6 +235,20 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
int dbLinger = getResult.get().getDbLinger();
Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();

// For reminder event, we can stop early if the reminder eventTimeMillis is older than the current event in the db
// because db laundering tells us that the currently worked on db event is newer and will have its own reminders
if (isReminderEvent) {
if (eventTimeMillis < dbEventTimestamp.getTime()) {
log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event trigger is being worked on, so this "
+ "older reminder will be dropped.");
return new NoLongerLeasingStatus();
} if (eventTimeMillis > dbEventTimestamp.getTime()) {
log.warn("tryAcquireLease for [{}, eventTimestamp: {}] - Severe constraint violation encountered: a reminder "
+ "event newer than db event was found when db laundering should ensure monotonically increasing "
+ "laundered event times.");
}
}

log.info("Multi-active arbiter replacing local trigger event timestamp [{}, triggerEventTimestamp: {}] with "
+ "database eventTimestamp {} (in epoch-millis)", flowAction, eventTimeMillis, dbCurrentTimestamp.getTime());

Expand All @@ -256,7 +270,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of whether "
+ "same or distinct event)", flowAction, dbCurrentTimestamp.getTime());
if (isWithinEpsilon) {
if (isWithinEpsilon && !isReminderEvent) {
log.warn("Lease should not be out of date for the same trigger event since epsilon << linger for flowAction"
+ " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp {}, linger {}", flowAction,
dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
Expand Down

0 comments on commit 0891ab6

Please sign in to comment.