Skip to content

Commit

Permalink
Merge pull request #15 from boney9/task-update-api
Browse files Browse the repository at this point in the history
Add the task update API by reference name
  • Loading branch information
v1r3n authored Dec 22, 2023
2 parents d73a991 + 508c9fb commit 88f6e18
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,15 @@ public List<TaskModel> createTasks(List<TaskModel> tasks) {
}

public List<Task> getTasksForWorkflow(String workflowId) {
return executionDAO.getTasksForWorkflow(workflowId).stream()
return getTaskModelsForWorkflow(workflowId).stream()
.map(TaskModel::toTask)
.collect(Collectors.toList());
}

public List<TaskModel> getTaskModelsForWorkflow(String workflowId) {
return executionDAO.getTasksForWorkflow(workflowId);
}

public TaskModel getTaskModel(String taskId) {
TaskModel taskModel = getTaskFromDatastore(taskId);
if (taskModel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -27,6 +28,7 @@
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.common.utils.ExternalPayloadStorage.Operation;
import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.events.queue.Message;
Expand Down Expand Up @@ -252,12 +254,28 @@ public Task getTask(String taskId) {
}

public Task getPendingTaskForWorkflow(String taskReferenceName, String workflowId) {
return executionDAOFacade.getTasksForWorkflow(workflowId).stream()
.filter(task -> !task.getStatus().isTerminal())
.filter(task -> task.getReferenceTaskName().equals(taskReferenceName))
.findFirst() // There can only be one task by a given reference name running at a
// time.
.orElse(null);
List<TaskModel> tasks = executionDAOFacade.getTaskModelsForWorkflow(workflowId);
Stream<TaskModel> taskStream =
tasks.stream().filter(task -> !task.getStatus().isTerminal());
Optional<TaskModel> found =
taskStream
.filter(task -> task.getReferenceTaskName().equals(taskReferenceName))
.findFirst();
if (found.isPresent()) {
return found.get().toTask();
}
// If no task is found, let's check if there is one inside an iteration
found =
tasks.stream()
.filter(task -> !task.getStatus().isTerminal())
.filter(
task ->
TaskUtils.removeIterationFromTaskRefName(
task.getReferenceTaskName())
.equals(taskReferenceName))
.findFirst();

return found.map(TaskModel::toTask).orElse(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,11 @@ SearchResult<TaskSummary> search(
*/
ExternalStorageLocation getExternalStorageLocation(
String path, String operation, String payloadType);

String updateTask(
String workflowId,
String taskRefName,
TaskResult.Status status,
String workerId,
Map<String, Object> output);
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,27 @@ public String updateTask(TaskResult taskResult) {
return taskResult.getTaskId();
}

@Override
public String updateTask(
String workflowId,
String taskRefName,
TaskResult.Status status,
String workerId,
Map<String, Object> output) {
Task pending = getPendingTaskForWorkflow(workflowId, taskRefName);
if (pending == null) {
return null;
}

TaskResult taskResult = new TaskResult(pending);
taskResult.setStatus(status);
taskResult.getOutputData().putAll(output);
if (StringUtils.isNotBlank(workerId)) {
taskResult.setWorkerId(workerId);
}
return updateTask(taskResult);
}

/**
* Ack Task is received.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ public String updateTask(@RequestBody TaskResult taskResult) {
return taskService.updateTask(taskResult);
}

@PostMapping(value = "/{workflowId}/{taskRefName}/{status}", produces = TEXT_PLAIN_VALUE)
@Operation(summary = "Update a task By Ref Name")
public String updateTask(
@PathVariable("workflowId") String workflowId,
@PathVariable("taskRefName") String taskRefName,
@PathVariable("status") TaskResult.Status status,
@RequestParam(value = "workerid", required = false) String workerId,
@RequestBody Map<String, Object> output) {

return taskService.updateTask(workflowId, taskRefName, status, workerId, output);
}

@PostMapping("/{taskId}/log")
@Operation(summary = "Log Task Execution Details")
public void log(@PathVariable("taskId") String taskId, @RequestBody String log) {
Expand Down

0 comments on commit 88f6e18

Please sign in to comment.