Skip to content

Commit

Permalink
Remove flowExecutionId from MysqlMultiActiveLeaseArbiter for flow & a…
Browse files Browse the repository at this point in the history
…ction level arbitration (#3783)

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Sep 21, 2023
1 parent c888881 commit fc73db8
Showing 1 changed file with 6 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* well as the flow action associated with it. It uses two additional values of the `event_timestamp` and
* `lease_acquisition_timestamp` to indicate an active lease, expired lease, and state of no longer leasing. The table
* schema is as follows:
* [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp | lease_acquisition_timestamp]
* [flow_group | flow_name | flow_action | event_timestamp | lease_acquisition_timestamp]
* (----------------------primary key------------------------)
* We also maintain another table in the database with two constants that allow us to coordinate between participants and
* ensure they are using the same values to base their coordination off of.
Expand Down Expand Up @@ -93,18 +93,16 @@ protected interface CheckedFunction<T, R> {
// TODO: define retention on this table
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar("
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar("
+ ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, "
+ ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, "
+ "event_timestamp TIMESTAMP, "
+ "lease_acquisition_timestamp TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+ "PRIMARY KEY (flow_group,flow_name,flow_action))";
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (primary_key, epsilon, linger) "
+ "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), linger=VALUES(linger)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=?"
+ " AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE flow_group=? AND flow_name=? AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW = WHERE_CLAUSE_TO_MATCH_KEY
+ " AND event_timestamp=? AND lease_acquisition_timestamp=?";
protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT event_timestamp, lease_acquisition_timestamp, "
Expand All @@ -121,8 +119,8 @@ protected interface CheckedFunction<T, R> {
// Insert or update row to acquire lease if values have not changed since the previous read
// Need to define three separate statements to handle cases where row does not exist or has null values to check
protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, "
+ "flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, ?, "
+ "CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
+ "flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, CURRENT_TIMESTAMP, "
+ "CURRENT_TIMESTAMP)";
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
Expand Down Expand Up @@ -268,7 +266,6 @@ protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAc
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
Expand Down Expand Up @@ -434,7 +431,6 @@ protected static void completeInsertPreparedStatement(PreparedStatement statemen
// Values to set in new row
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
statement.setString(++i, flowAction.getFlowActionType().toString());
}

Expand All @@ -449,7 +445,6 @@ protected static void completeWhereClauseMatchingKeyPreparedStatement(PreparedSt
int i = 0;
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
statement.setString(++i, flowAction.getFlowActionType().toString());
}

Expand All @@ -471,7 +466,6 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen
// Values to check if existing row matches previous read
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
statement.setString(++i, flowAction.getFlowActionType().toString());
// Values that may be needed depending on the insert statement
if (needEventTimeCheck) {
Expand All @@ -488,14 +482,12 @@ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
DagActionStore.DagAction flowAction = status.getFlowAction();
String flowGroup = flowAction.getFlowGroup();
String flowName = flowAction.getFlowName();
String flowExecutionId = flowAction.getFlowExecutionId();
DagActionStore.FlowActionType flowActionType = flowAction.getFlowActionType();
return withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
updateStatement -> {
int i = 0;
updateStatement.setString(++i, flowGroup);
updateStatement.setString(++i, flowName);
updateStatement.setString(++i, flowExecutionId);
updateStatement.setString(++i, flowActionType.toString());
updateStatement.setTimestamp(++i, new Timestamp(status.getEventTimestamp()));
updateStatement.setTimestamp(++i, new Timestamp(status.getLeaseAcquisitionTimestamp()));
Expand Down

0 comments on commit fc73db8

Please sign in to comment.