Skip to content

Commit

Permalink
[GOBBLIN-2070] Add eventTimeMillis field to leaseAttemptStatus for ad…
Browse files Browse the repository at this point in the history
…hoc flows where… (#3951)

* Add eventTimeMillis field to leaseAttemptStatus for adhoc flows where flowExecutionId is different than the event time of the lease
* Update tests to capture edge case
  • Loading branch information
umustafi authored May 21, 2024
1 parent 83c325d commit 083913d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> calcLeasedToAnotherSt
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttempt);
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) { // remind w/o delay to immediately re-attempt handling
return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 0L));
return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), ((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis(), 0L));
} else {
throw new RuntimeException("unexpected `LeaseAttemptStatus` derived type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,23 @@ public long getMinimumLingerDurationMillis() {
public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}

/*
The participant calling this method acquired the lease for the event in question. `Dag action`'s flow execution id
is the timestamp associated with the lease and the time the caller obtained the lease is stored within the
`leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the
current LeaseObtainedStatus via the completeLease method from a caller without access to the {@link MultiActiveLeaseArbiter}.
The participant calling this method acquired the lease for the event in question.
The timestamp associated with the lease is stored in `eventTimeMillis` field and the time the caller obtained the
lease is stored within the`leaseAcquisitionTimestamp` field. Note that the `Dag action` returned by the lease
arbitration attempt will be unchanged for flows that do not adopt the consensus eventTimeMillis as the flow execution
id, so a separate field must be maintained to track the eventTimeMillis for lease completion. The
`multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the current LeaseObtainedStatus via the
completeLease method from a caller without access to the {@link MultiActiveLeaseArbiter}.
*/
@Data
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction consensusDagAction;
private final long eventTimeMillis;
private final long leaseAcquisitionTimestamp;
private final long minimumLingerDurationMillis;
@Getter(AccessLevel.NONE)
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;

/**
* @return event time in millis since epoch for the event of this lease acquisition
*/
public long getEventTimeMillis() {
return Long.parseLong(consensusDagAction.getFlowExecutionId());
}

/**
* Completes the lease referenced by this status object if it has not expired.
* @return true if able to complete lease, false otherwise.
Expand All @@ -83,22 +80,15 @@ public boolean completeLease() throws IOException {

/*
This dag action event already has a valid lease owned by another participant.
`Dag action`'s flow execution id is the timestamp the lease is associated with, however the dag action event it
corresponds to may be a different and distinct occurrence of the same event.
`eventTimeMillis' corresponds to the timestamp of the existing lease associated with this dag action, however the dag
action event it corresponds to may be a different and distinct occurrence of the same event.
`minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if
the lease has completed or expired
*/
@Data
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction consensusDagAction;
private final long eventTimeMillis;
private final long minimumLingerDurationMillis;

/**
* Returns event time in millis since epoch for the event whose lease was obtained by another participant.
* @return
*/
public long getEventTimeMillis() {
return Long.parseLong(consensusDagAction.getFlowExecutionId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,15 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, lo
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid",
updatedDagAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, dbEventTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ? dagAction.updateFlowExecutionId(dbCurrentTimestamp.getTime()) : dagAction;
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid",
updatedDagAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, dbCurrentTimestamp.getTime(),
dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
Expand Down Expand Up @@ -514,12 +514,14 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedDagAction,
isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis);
return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get(), minimumLingerDurationMillis, this);
return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), minimumLingerDurationMillis, this);
}
log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}",
updatedDagAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, minimumLingerDurationMillis);
return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, selectInfoResult.eventTimeMillis,
minimumLingerDurationMillis);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ statuses that should cause the next() method to continue polling for tasks befor
when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
.tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(), anyBoolean(), anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 0, 5, null));
new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 3, 15),
new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 5, 0, 5, null));
DagTask dagTask = dagManagementTaskStream.next();
Assert.assertTrue(dagTask instanceof LaunchDagTask);
DagProc dagProc = dagTask.host(this.dagProcFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@


public class FlowLaunchHandlerTest {
long eventToRevisit = 123000L;
long flowExecutionId = 123000L;
long eventToRevisit = 641000L;
long minimumLingerDurationMillis = 2000L;
String cronExpression = FlowLaunchHandler.createCronFromDelayPeriod(minimumLingerDurationMillis);
String cronExpressionSuffix = truncateFirstTwoFieldsOfCronExpression(cronExpression);
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new DagActionStore.DagAction("flowName", "flowGroup",
String.valueOf(eventToRevisit), "jobName", DagActionStore.DagActionType.LAUNCH);
String.valueOf(flowExecutionId), "jobName", DagActionStore.DagActionType.LAUNCH);
LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, minimumLingerDurationMillis);
new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit, minimumLingerDurationMillis);

/**
* Remove first two fields from cron expression representing seconds and minutes to return truncated cron expression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseAttemptStatus.LeaseObtainedStatus(
updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction,
Expand Down Expand Up @@ -321,7 +322,8 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException,
DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseAttemptStatus.LeaseObtainedStatus(
updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
Assert.assertTrue(markedSuccess);

// Sleep enough time for the event to have been considered distinct
Expand All @@ -333,9 +335,10 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException,
}

/*
Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set to True and verify that flowExecutionId
Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set to False and verify that flowExecutionId
returned is the same as flowExecutionId provided to it for a LeaseObtainedStatus and LeasedToAnotherStatus object
(CASE 1 & 2).
(CASE 1 & 2). It also verifies that the `eventTimeMillis` stored in a lease obtained status can be used to complete
the lease.
*/
@Test
public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
Expand All @@ -346,18 +349,21 @@ public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() != Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH)));

// A second attempt to obtain a lease on the same action should return a LeasedToAnotherStatus which also contains
// the original flowExecutionId
// the original flowExecutionId and the same event time from the previous LeaseAttemptStatus
LeaseAttemptStatus secondLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, eventTimeMillis, false, false);
Assert.assertTrue(secondLaunchStatus instanceof LeaseAttemptStatus.LeasedToAnotherStatus);
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH)));

Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
}
}

0 comments on commit 083913d

Please sign in to comment.