Skip to content

Commit

Permalink
Add millisecond level precision to timestamp cols & proper timezone c…
Browse files Browse the repository at this point in the history
…onversion

	- existing tests pass with minor modifications
  • Loading branch information
Urmi Mustafi committed Oct 1, 2023
1 parent 028b85f commit ecc9408
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* This interface defines a generic approach to a non-blocking, multiple active thread or host system, in which one or
* more active participants compete to take responsiblity for a particular flow's event. The type of flow event in
* more active participants compete to take responsibility for a particular flow's event. The type of flow event in
* question does not impact the algorithm other than to uniquely identify the flow event. Each participant uses the
* interface to initiate an attempt at ownership over the flow event and receives a response indicating the status of
* the attempt.
Expand All @@ -38,7 +38,8 @@
* b) LeasedToAnotherStatus -> another will attempt to carry out the required action before the lease expires
* c) NoLongerLeasingStatus -> flow event no longer needs to be acted upon (terminal state)
* 3. If another participant has acquired the lease before this one could, then the present participant must check back
* in at the time of lease expiry to see if it needs to attempt the lease again [status (b) above].
* in at the time of lease expiry to see if it needs to attempt the lease again [status (b) above]. We refer to this
* check-in as a 'reminder event'.
* 4. Once the participant which acquired the lease completes its work on the flow event, it calls recordLeaseSuccess
* to indicate to all other participants that the flow event no longer needs to be acted upon [status (c) above]
*/
Expand All @@ -51,10 +52,12 @@ public interface MultiActiveLeaseArbiter {
* determine the next action.
* @param flowAction uniquely identifies the flow and the present action upon it
* @param eventTimeMillis is the time this flow action was triggered
* @param isReminderEvent true if the flow action event we're checking on is a reminder event
* @return LeaseAttemptStatus
* @throws IOException
*/
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException;
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, boolean isReminderEvent)
throws IOException;

/**
* This method is used to indicate the owner of the lease has successfully completed required actions while holding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,10 +48,10 @@
* schema is as follows:
* [flow_group | flow_name | flow_action | event_timestamp | lease_acquisition_timestamp]
* (----------------------primary key------------------------)
* We also maintain another table in the database with two constants that allow us to coordinate between participants and
* ensure they are using the same values to base their coordination off of.
* We also maintain another table in the database with two constants that allow us to coordinate between participants
* and ensure they are using the same values to base their coordination off of.
* [epsilon | linger]
* `epsilon` - time within we consider to timestamps to be the same, to account for between-host clock drift
* `epsilon` - time within we consider to event timestamps to be overlapping and can consolidate
* `linger` - minimum time to occur before another host may attempt a lease on a flow event. It should be much greater
* than epsilon and encapsulate executor communication latency including retry attempts
*
Expand Down Expand Up @@ -86,16 +88,18 @@ protected interface CheckedFunction<T, R> {
private final int epsilon;
private final int linger;
private String thisTableGetInfoStatement;
private String thisTableGetInfoStatementForReminder;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
// Note: we need to set `event_timestamp` default value only to turn off timestamp auto-updates for row modifications
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP, "
+ "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, "
+ "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
+ "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT CURRENT_TIMESTAMP(3), "
+ "PRIMARY KEY (flow_group,flow_name,flow_action))";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
Expand All @@ -104,32 +108,51 @@ protected interface CheckedFunction<T, R> {
+ "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), linger=VALUES(linger)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE flow_group=? AND flow_name=? AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW = WHERE_CLAUSE_TO_MATCH_KEY
+ " AND event_timestamp=? AND lease_acquisition_timestamp=?";
protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT event_timestamp, lease_acquisition_timestamp, "
+ "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+ " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)"
+ " AND lease_acquisition_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)";
protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
+ "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Does a cross join between the two tables to have epsilon and linger values available. Returns the following values:
// event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if current time in db is within epsilon of
// event_timestamp), leaseValidityStatus (1 if lease has not expired, 2 if expired, 3 if column is NULL or no longer
// leasing)
protected static final String GET_EVENT_INFO_STATEMENT = "SELECT event_timestamp, lease_acquisition_timestamp, "
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if new event timestamp (provided by caller)
// db is within epsilon of event_timestamp in the table), leaseValidityStatus (1 if lease has not expired, 2 if
// expired, 3 if column is NULL or no longer leasing)
protected static final String GET_EVENT_INFO_STATEMENT = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
+ "ABS(TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3))) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
+ "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Same as query above, except that isWithinEpsilon is True if the reminder event timestamp (provided by caller) is
// OLDER than or equal to the db event_timestamp and within epsilon away from it.
protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT "
+ "CONVERT_TZ(`event_timestamp`, @@session.time_zone, '+00:00') as utc_event_timestamp, "
+ "CONVERT_TZ(`lease_acquisition_timestamp`, @@session.time_zone, '+00:00') as utc_lease_acquisition_timestamp, "
// TODO maybe need to change this
+ "TIMESTAMPDIFF(microsecond, event_timestamp, CURRENT_TIMESTAMP(3)) / 1000 <= epsilon as is_within_epsilon, CASE "
+ "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 "
+ "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 "
+ "ELSE 3 END as lease_validity_status, linger, "
+ "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since the previous read
// Need to define three separate statements to handle cases where row does not exist or has null values to check
protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, "
+ "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, CURRENT_TIMESTAMP, "
+ "CURRENT_TIMESTAMP)";
+ "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, CURRENT_TIMESTAMP(3), "
+ "CURRENT_TIMESTAMP(3))";
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
+ "SET event_timestamp=CURRENT_TIMESTAMP(3), lease_acquisition_timestamp=CURRENT_TIMESTAMP(3) "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone) AND "
+ "lease_acquisition_timestamp is NULL";
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ "SET event_timestamp=CURRENT_TIMESTAMP(3), lease_acquisition_timestamp=CURRENT_TIMESTAMP(3) "
+ WHERE_CLAUSE_TO_MATCH_ROW;
// Complete lease acquisition if values have not changed since lease was acquired
protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = "UPDATE %s SET "
+ "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
protected static final Calendar UTC_CAL = Calendar.getInstance(TimeZone.getTimeZone("UTC"));

@Inject
public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
Expand All @@ -150,6 +173,8 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableGetInfoStatementForReminder = String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
this.leaseArbiterTableName, this.constantsTableName);
this.thisTableSelectAfterInsertStatement = String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableAcquireLeaseIfMatchingAllStatement =
Expand Down Expand Up @@ -186,12 +211,12 @@ private void initializeConstantsTable() throws IOException {
}

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
log.info("Multi-active scheduler about to handle trigger event: [{}, triggerEventTimestamp: {}]", flowAction,
eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction, isReminderEvent);

try {
if (!getResult.isPresent()) {
Expand All @@ -211,7 +236,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();

log.info("Multi-active arbiter replacing local trigger event timestamp [{}, triggerEventTimestamp: {}] with "
+ "database eventTimestamp {}", flowAction, eventTimeMillis, dbCurrentTimestamp.getTime());
+ "database eventTimestamp {} (in epoch-millis)", flowAction, eventTimeMillis, dbCurrentTimestamp.getTime());

// Lease is valid
if (leaseValidityStatus == 1) {
Expand All @@ -227,7 +252,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
// Utilize db lease acquisition timestamp for wait time
return new LeasedToAnotherStatus(flowAction, dbCurrentTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
} // Lease is invalid
else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 4: Lease is out of date (regardless of whether "
+ "same or distinct event)", flowAction, dbCurrentTimestamp.getTime());
Expand Down Expand Up @@ -260,8 +285,9 @@ else if (leaseValidityStatus == 2) {
/**
* Checks leaseArbiterTable for an existing entry for this flow action and event time
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableGetInfoStatement,
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction,
boolean isReminderEvent) throws IOException {
return withPreparedStatement(isReminderEvent ? thisTableGetInfoStatementForReminder : thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
Expand All @@ -284,12 +310,12 @@ protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAc
protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException {
try {
// Extract values from result set
Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("lease_acquisition_timestamp");
Timestamp dbEventTimestamp = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL);
Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL);
boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
int leaseValidityStatus = resultSet.getInt("lease_validity_status");
int dbLinger = resultSet.getInt("linger");
Timestamp dbCurrentTimestamp = resultSet.getTimestamp("CURRENT_TIMESTAMP");
Timestamp dbCurrentTimestamp = resultSet.getTimestamp("utc_current_timestamp", UTC_CAL);
return new GetEventInfoResult(dbEventTimestamp, dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
dbLinger, dbCurrentTimestamp);
} catch (SQLException e) {
Expand Down Expand Up @@ -337,8 +363,8 @@ protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionS
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
return withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck,
needLeaseAcquisition, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck, needLeaseAcquisition,
dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
}
Expand Down Expand Up @@ -367,14 +393,15 @@ protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) th
throw new IOException("Expected resultSet containing row information for the lease that was attempted but "
+ "received nothing.");
}
if (resultSet.getTimestamp(1) == null) {
if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) {
throw new IOException("event_timestamp should never be null (it is always set to current timestamp)");
}
long eventTimeMillis = resultSet.getTimestamp(1).getTime();
long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", UTC_CAL).getTime();
// Lease acquisition timestamp is null if another participant has completed the lease
Optional<Long> leaseAcquisitionTimeMillis = resultSet.getTimestamp(2) == null ? Optional.empty() :
Optional.of(resultSet.getTimestamp(2).getTime());
int dbLinger = resultSet.getInt(3);
Optional<Long> leaseAcquisitionTimeMillis =
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL) == null ? Optional.empty() :
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL).getTime());
int dbLinger = resultSet.getInt("linger");
return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
throw new IOException(e);
Expand Down Expand Up @@ -469,10 +496,10 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen
statement.setString(++i, flowAction.getFlowActionType().toString());
// Values that may be needed depending on the insert statement
if (needEventTimeCheck) {
statement.setTimestamp(++i, originalEventTimestamp);
statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL);
}
if (needLeaseAcquisitionTimeCheck) {
statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL);
}
}

Expand All @@ -489,8 +516,8 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowActionType.toString());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()));
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()));
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()), UTC_CAL);
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL);
int numRowsUpdated = updateStatement.executeUpdate();
if (numRowsUpdated == 0) {
log.info("Multi-active lease arbiter lease attempt: [{}, eventTimestamp: {}] - FAILED to complete because "
Expand Down
Loading

0 comments on commit ecc9408

Please sign in to comment.