Skip to content

Commit

Permalink
[GOBBLIN-2102]Concurrent flow status check fix (#3989)
Browse files Browse the repository at this point in the history
modified isFlowStatus to check all the flow statuses instead of just the last one

---------

Co-authored-by: Aditya Pratap Singh <[email protected]>
  • Loading branch information
pratapaditya04 and Aditya Pratap Singh authored Jun 28, 2024
1 parent b72193d commit 4721d08
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public List<org.apache.gobblin.service.monitoring.FlowStatus> getFlowStatusesFor
int countJobStatusesPerFlowName) {
return Lists.newArrayList(); // (as this method not exercised within `FlowStatusResource`)
}

@Override
public List<org.apache.gobblin.service.monitoring.FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(
String flowGroup, String flowName) {
return Lists.newArrayList();// (as this method not exercised within `FlowStatusResource`)
}
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,22 @@ public List<FlowStatus> getFlowStatusesAcrossGroup(String flowGroup, int countPe
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) {
List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
List<FlowStatus> flowStatusList = jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup);

if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
} else {
FlowStatus flowStatus = flowStatusList.get(0);
}
// Iterating through all flow statuses to check the condition
for (FlowStatus flowStatus : flowStatusList) {
ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
log.info("Comparing flow execution status with flowExecutionId: " + flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " with incoming flowExecutionId: " + flowExecutionId);
// If the latest flow status is the current job about to get kicked off, we should ignore this check
return flowStatus.getFlowExecutionId() != flowExecutionId && !FINISHED_STATUSES.contains(flowExecutionStatus.name());
// Check if it is not the current flowExecutionId and the status is not in FINISHED_STATUSES
if (flowStatus.getFlowExecutionId() != flowExecutionId && !FINISHED_STATUSES.contains(flowExecutionStatus.name())) {
log.info("Comparing flow execution status with flowExecutionId: " + flowStatus.getFlowExecutionId()
+ " and flowStatus: " + flowExecutionStatus + " with incoming flowExecutionId: " + flowExecutionId);
return true;
}
}
return false; // Return false if all flow statuses are in terminal status
}

/** @return only `jobStatuses` that represent a flow or, when `tag != null`, represent a job tagged as `tag` */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.service.monitoring;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -81,6 +82,14 @@ public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowNa
*/
public abstract List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup, int countJobStatusesPerFlowName);

/**
* Get all the {@link FlowStatus}es of executions of flows belonging to this flow group and flowName. Currently, latest flow execution
* is decided by comparing {@link JobStatus#getFlowExecutionId()}.
* @return `FlowStatus`es are ordered by descending flowExecutionId.
**/
public abstract List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String flowGroup,String flowName);


public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName, flowGroup, 1);
return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ? lastKExecutionIds.get(0) : -1L;
Expand Down Expand Up @@ -202,17 +211,34 @@ protected static class FlowExecutionJobStateGrouping {
private final long flowExecutionId;
private final List<State> jobStates;
}

/**
* Groups job status states by flow execution IDs optionally limiting the number of executions per flow name.
*
* @param flowGroup The group to which the flow executions belong.
* @param jobStatusStates List of job status states to process.
* @param maxCountPerFlowName Maximum number of executions to retain per flow name.
* If null, all executions are returned
* @return List of FlowExecutionJobStateGrouping objects containing the latest job states
* grouped by flow execution ID and sorted by flow name in ascending order.
*/
protected List<FlowExecutionJobStateGrouping> groupByFlowExecutionAndRetainLatest(
String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
String flowGroup, List<State> jobStatusStates, Integer maxCountPerFlowName) {
Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName = jobStatusStates.stream().collect(
Collectors.groupingBy(JobStatusRetriever::getFlowName, Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));

return statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry -> {
String flowName = flowNameEntry.getKey();
Map<Long, List<State>> statesByFlowExecutionIdForName = flowNameEntry.getValue();

List<Long> executionIds = Ordering.<Long>natural().greatestOf(statesByFlowExecutionIdForName.keySet(), maxCountPerFlowName);
List<Long> executionIds;
if (maxCountPerFlowName != null) {
// If maxCountPerFlowName is specified, limit the number of executions per flow name
executionIds = Ordering.natural().greatestOf(statesByFlowExecutionIdForName.keySet(), maxCountPerFlowName);
} else {
// If maxCountPerFlowName is not specified (null), return all execution IDs sorted in descending order
executionIds = new ArrayList<>(statesByFlowExecutionIdForName.keySet());
executionIds.sort(Comparator.reverseOrder());
}
return executionIds.stream().map(executionId ->
new FlowExecutionJobStateGrouping(flowGroup, flowName, executionId, statesByFlowExecutionIdForName.get(executionId)));
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testIsFlowRunningFirstExecution() {
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(null);

FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
Expand All @@ -55,13 +55,12 @@ public void testIsFlowRunningCompiledPastExecution() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// Block the next execution if the prior one is in compiled as it's considered still running
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
Expand All @@ -78,8 +77,9 @@ public void skipFlowConcurrentCheckSameFlowExecutionId() {
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
Expand All @@ -103,18 +103,20 @@ public void testIsFlowRunningJobExecutionIgnored() {
.jobName(job2).eventName("FAILED").build();
JobStatus jobStatus3 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(job3).eventName("CANCELLED").build();
JobStatus flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
JobStatus jobStatus4 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, jobStatus4).iterator();
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
FlowStatus flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(Lists.newArrayList(flowStatus));
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));

flowStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
jobStatus4 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, jobStatus4).iterator();
flowStatus = new FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup)).thenReturn(Collections.singletonList(flowStatus));
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId+1));
}

Expand Down Expand Up @@ -175,6 +177,90 @@ public void testGetFlowStatusesAcrossGroup() {
Arrays.asList(f0jsmDep2)));
}

@Test
public void testIsFlowRunning_NoFlowStatuses_ReturnsFalse() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

// Mocking the retrieval of empty flowStatusList
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
.thenReturn(Collections.emptyList());

Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}

@Test
public void testIsFlowRunning_AllFinishedFlowStatuses_ReturnsFalse() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

// Mocking flowStatusList with all finished statuses
List<FlowStatus> flowStatusList = Arrays.asList(
createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE"),
createFlowStatus(flowName, flowGroup, flowExecutionId, "FAILED"),
createFlowStatus(flowName, flowGroup, flowExecutionId, "CANCELLED")
);

when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
.thenReturn(flowStatusList);

Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}

@Test
public void testIsFlowRunning_FlowStatusNotMatchingFlowExecutionIdAndOneOfTheStatusIsRunning_ReturnsTrue() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

// Mocking flowStatusList with a running status and a different flow execution id
List<FlowStatus> flowStatusList = Arrays.asList(
createFlowStatus(flowName, flowGroup, flowExecutionId+4, "COMPLETE"),
createFlowStatus(flowName, flowGroup, flowExecutionId+3, "COMPLETE"),
createFlowStatus(flowName, flowGroup, flowExecutionId+2, "RUNNING"),
createFlowStatus(flowName, flowGroup, flowExecutionId+1, "FAILED"),
createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE")

);

when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
.thenReturn(flowStatusList);

Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId + 1));
}

@Test
public void testIsFlowRunning_FlowStatusMatchingFlowExecutionId_ReturnsFalse() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);

// Mocking flowStatusList with a running status and the same flow execution id
List<FlowStatus> flowStatusList = Collections.singletonList(
createFlowStatus(flowName, flowGroup, flowExecutionId, "RUNNING")
);

when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName, flowGroup))
.thenReturn(flowStatusList);

Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}

private FlowStatus createFlowStatus(String flowName, String flowGroup, long flowExecutionId, String status) {
ExecutionStatus executionStatus = ExecutionStatus.valueOf(status);
return new FlowStatus(flowName, flowGroup, flowExecutionId, null, executionStatus);
}

private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(),
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup,
}
}

@Override
public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String flowGroup, String flowName) {
Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
Preconditions.checkArgument(flowName != null, "flowName cannot be null");
try {
String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName -> storeName.startsWith(storeNamePrefix));
List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapToUnchecked(storeName ->
stateStore.getAll(storeName).stream()
)).collect(Collectors.toList());
return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, flowGroupExecutionsStates, null));
} catch (IOException | RuntimeException e) { // (latter likely wrapping `IOException` originating within `wrapUnchecked`)
log.error(String.format("Exception encountered when listing files for flow group: %s", flowGroup), e);
return ImmutableList.of();
}
}

/**
* @param flowName
* @param flowGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,25 @@ public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, Strin

String JOB_DONE_SUFFIX = ".done";
if (this.doesJobExist(flowName, flowGroup, flowExecutionId, JOB_DONE_SUFFIX)) {
jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build();
jobStatus = JobStatus.builder()
.flowName(flowName)
.flowGroup(flowGroup)
.flowExecutionId(flowExecutionId)
.jobName(jobName)
.jobGroup(jobGroup)
.jobExecutionId(flowExecutionId)
.eventName(ExecutionStatus.COMPLETE.name())
.build();
} else if (this.doesJobExist(flowName, flowGroup, flowExecutionId, "")) {
jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.PENDING.name()).build();
jobStatus = JobStatus.builder()
.flowName(flowName)
.flowGroup(flowGroup)
.flowExecutionId(flowExecutionId)
.jobName(jobName)
.jobGroup(jobGroup)
.jobExecutionId(flowExecutionId)
.eventName(ExecutionStatus.PENDING.name())
.build();
} else {
return Collections.emptyIterator();
}
Expand Down Expand Up @@ -131,6 +145,19 @@ public List<FlowStatus> getFlowStatusesForFlowGroupExecutions(String flowGroup,
throw new UnsupportedOperationException("Not yet implemented");
}

/**
* @param flowGroup
* @param flowName
* @return all the flow statuses for the given flowGroup and flowName.
*/
@Override
public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String flowGroup, String flowName) {
Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
Preconditions.checkArgument(flowName != null, "flowName cannot be null");

throw new UnsupportedOperationException("Not yet implemented");
}

public StateStore<State> getStateStore() {
// this jobstatus retriever does not have a state store
// only used in tests so this is okay
Expand Down
Loading

0 comments on commit 4721d08

Please sign in to comment.