From 083913de95bfce14cd3147f04ea79d5dee0c0f32 Mon Sep 17 00:00:00 2001 From: umustafi Date: Tue, 21 May 2024 13:58:20 -0700 Subject: [PATCH] =?UTF-8?q?[GOBBLIN-2070]=20Add=20eventTimeMillis=20field?= =?UTF-8?q?=20to=20leaseAttemptStatus=20for=20adhoc=20flows=20where?= =?UTF-8?q?=E2=80=A6=20(#3951)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add eventTimeMillis field to leaseAttemptStatus for adhoc flows where flowExecutionId is different than the event time of the lease * Update tests to capture edge case --- .../orchestration/FlowLaunchHandler.java | 2 +- .../orchestration/LeaseAttemptStatus.java | 32 +++++++------------ .../MysqlMultiActiveLeaseArbiter.java | 10 +++--- .../DagManagementTaskStreamImplTest.java | 4 +-- .../orchestration/FlowLaunchHandlerTest.java | 7 ++-- .../MysqlMultiActiveLeaseArbiterTest.java | 18 +++++++---- 6 files changed, 36 insertions(+), 37 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java index 1c7c82358ff..1c37a404c4d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java @@ -126,7 +126,7 @@ private Optional calcLeasedToAnotherSt } else if (leaseAttempt instanceof LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttempt); } else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) { // remind w/o delay to immediately re-attempt handling - return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 0L)); + return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), ((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis(), 0L)); } else { throw new RuntimeException("unexpected `LeaseAttemptStatus` derived type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'"); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java index f77427892a9..1087c6be483 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java @@ -51,26 +51,23 @@ public long getMinimumLingerDurationMillis() { public static class NoLongerLeasingStatus extends LeaseAttemptStatus {} /* - The participant calling this method acquired the lease for the event in question. `Dag action`'s flow execution id - is the timestamp associated with the lease and the time the caller obtained the lease is stored within the - `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the - current LeaseObtainedStatus via the completeLease method from a caller without access to the {@link MultiActiveLeaseArbiter}. + The participant calling this method acquired the lease for the event in question. + The timestamp associated with the lease is stored in `eventTimeMillis` field and the time the caller obtained the + lease is stored within the`leaseAcquisitionTimestamp` field. Note that the `Dag action` returned by the lease + arbitration attempt will be unchanged for flows that do not adopt the consensus eventTimeMillis as the flow execution + id, so a separate field must be maintained to track the eventTimeMillis for lease completion. The + `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the current LeaseObtainedStatus via the + completeLease method from a caller without access to the {@link MultiActiveLeaseArbiter}. */ @Data public static class LeaseObtainedStatus extends LeaseAttemptStatus { private final DagActionStore.DagAction consensusDagAction; + private final long eventTimeMillis; private final long leaseAcquisitionTimestamp; private final long minimumLingerDurationMillis; @Getter(AccessLevel.NONE) private final MultiActiveLeaseArbiter multiActiveLeaseArbiter; - /** - * @return event time in millis since epoch for the event of this lease acquisition - */ - public long getEventTimeMillis() { - return Long.parseLong(consensusDagAction.getFlowExecutionId()); - } - /** * Completes the lease referenced by this status object if it has not expired. * @return true if able to complete lease, false otherwise. @@ -83,22 +80,15 @@ public boolean completeLease() throws IOException { /* This dag action event already has a valid lease owned by another participant. - `Dag action`'s flow execution id is the timestamp the lease is associated with, however the dag action event it - corresponds to may be a different and distinct occurrence of the same event. + `eventTimeMillis' corresponds to the timestamp of the existing lease associated with this dag action, however the dag + action event it corresponds to may be a different and distinct occurrence of the same event. `minimumLingerDurationMillis` is the minimum amount of time to wait before this participant should return to check if the lease has completed or expired */ @Data public static class LeasedToAnotherStatus extends LeaseAttemptStatus { private final DagActionStore.DagAction consensusDagAction; + private final long eventTimeMillis; private final long minimumLingerDurationMillis; - - /** - * Returns event time in millis since epoch for the event whose lease was obtained by another participant. - * @return - */ - public long getEventTimeMillis() { - return Long.parseLong(consensusDagAction.getFlowExecutionId()); - } } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java index 4479358ecff..2702a81d5ab 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java @@ -307,7 +307,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, lo log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 2: Same event, lease is valid", updatedDagAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db timestamp for reminder - return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, + return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, dbEventTimestamp.getTime(), dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } DagActionStore.DagAction updatedDagAction = @@ -315,7 +315,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, lo log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 3: Distinct event, lease is valid", updatedDagAction, isReminderEvent ? "reminder" : "original", dbCurrentTimestamp.getTime()); // Utilize db lease acquisition timestamp for wait time - return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, + return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, dbCurrentTimestamp.getTime(), dbLeaseAcquisitionTimestamp.getTime() + dbLinger - dbCurrentTimestamp.getTime()); } // Lease is invalid else if (leaseValidityStatus == 2) { @@ -514,12 +514,14 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, if (numRowsUpdated == 1) { log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] successfully!", updatedDagAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis); - return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get(), minimumLingerDurationMillis, this); + return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, selectInfoResult.eventTimeMillis, + selectInfoResult.getLeaseAcquisitionTimeMillis().get(), minimumLingerDurationMillis, this); } log.info("Another participant acquired lease in between for [{}, is: {}, eventTimestamp: {}] - num rows updated: {}", updatedDagAction, isReminderEvent ? "reminder" : "original", selectInfoResult.eventTimeMillis, numRowsUpdated); // Another participant acquired lease in between - return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, minimumLingerDurationMillis); + return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, selectInfoResult.eventTimeMillis, + minimumLingerDurationMillis); } /** diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java index 607d4438937..f8c0f829690 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java @@ -110,8 +110,8 @@ statuses that should cause the next() method to continue polling for tasks befor when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter() .tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(), anyBoolean(), anyBoolean())) .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(), - new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15), - new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 0, 5, null)); + new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 3, 15), + new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 5, 0, 5, null)); DagTask dagTask = dagManagementTaskStream.next(); Assert.assertTrue(dagTask instanceof LaunchDagTask); DagProc dagProc = dagTask.host(this.dagProcFactory); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java index dea34b6cdf5..b4feeed8410 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java @@ -28,15 +28,16 @@ public class FlowLaunchHandlerTest { - long eventToRevisit = 123000L; + long flowExecutionId = 123000L; + long eventToRevisit = 641000L; long minimumLingerDurationMillis = 2000L; String cronExpression = FlowLaunchHandler.createCronFromDelayPeriod(minimumLingerDurationMillis); String cronExpressionSuffix = truncateFirstTwoFieldsOfCronExpression(cronExpression); int schedulerBackOffMillis = 10; DagActionStore.DagAction dagAction = new DagActionStore.DagAction("flowName", "flowGroup", - String.valueOf(eventToRevisit), "jobName", DagActionStore.DagActionType.LAUNCH); + String.valueOf(flowExecutionId), "jobName", DagActionStore.DagActionType.LAUNCH); LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus = - new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, minimumLingerDurationMillis); + new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit, minimumLingerDurationMillis); /** * Remove first two fields from cron expression representing seconds and minutes to return truncated cron expression diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java index 2699148b60c..607d8f3b015 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java @@ -239,7 +239,8 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId( selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseAttemptStatus.LeaseObtainedStatus( - updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null)); + updatedResumeDagAction, selectInfoResult.getEventTimeMillis(), + selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null)); Assert.assertTrue(markedSuccess); // Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction, @@ -321,7 +322,8 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, DagActionStore.DagAction updatedResumeDagAction = resumeDagAction.updateFlowExecutionId( selectInfoResult.getEventTimeMillis()); boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseAttemptStatus.LeaseObtainedStatus( - updatedResumeDagAction, selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null)); + updatedResumeDagAction, selectInfoResult.getEventTimeMillis(), + selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null)); Assert.assertTrue(markedSuccess); // Sleep enough time for the event to have been considered distinct @@ -333,9 +335,10 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, } /* - Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set to True and verify that flowExecutionId + Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set to False and verify that flowExecutionId returned is the same as flowExecutionId provided to it for a LeaseObtainedStatus and LeasedToAnotherStatus object - (CASE 1 & 2). + (CASE 1 & 2). It also verifies that the `eventTimeMillis` stored in a lease obtained status can be used to complete + the lease. */ @Test public void testSkipAdoptingConsensusFlowExecutionId() throws IOException { @@ -346,18 +349,21 @@ public void testSkipAdoptingConsensusFlowExecutionId() throws IOException { LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus = (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus; Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= firstObtainedStatus.getLeaseAcquisitionTimestamp()); + Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() != Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId())); Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals( new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH))); // A second attempt to obtain a lease on the same action should return a LeasedToAnotherStatus which also contains - // the original flowExecutionId + // the original flowExecutionId and the same event time from the previous LeaseAttemptStatus LeaseAttemptStatus secondLaunchStatus = mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, eventTimeMillis, false, false); Assert.assertTrue(secondLaunchStatus instanceof LeaseAttemptStatus.LeasedToAnotherStatus); LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus = (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus; Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), secondLeasedToAnotherStatus.getEventTimeMillis()); - Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals( + Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals( new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH))); + + Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus)); } }