Skip to content

Commit

Permalink
core: improve nextflow executor, #TASK-6445
Browse files Browse the repository at this point in the history
  • Loading branch information
pfurio committed Sep 19, 2024
1 parent 93ec0b6 commit da72511
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class InputFileUtils {

private final CatalogManager catalogManager;

private static final Pattern OPERATION_PATTERN = Pattern.compile("^(?i)(ocga://|opencga://)(.+)$");
private static final Pattern OPERATION_PATTERN = Pattern.compile("^(?i)(ocga://|opencga://|file://)(.+)$");

private final static Logger logger = LoggerFactory.getLogger(InputFileUtils.class);

Expand All @@ -31,7 +31,8 @@ public File getOpenCGAFile(String study, String file, String token) throws Catal
Matcher matcher = OPERATION_PATTERN.matcher(file);

if (!matcher.find()) {
throw new CatalogException("Invalid OpenCGA file format. Accepted format is 'ocga://<path>' or 'opencga://<path>'");
throw new CatalogException("Invalid OpenCGA file format. Accepted format is 'ocga://<path>', 'opencga://<path>' or"
+ " 'file://<path>'.");
}

String filePath = matcher.group(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import org.apache.commons.lang3.time.StopWatch;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.utils.DockerUtils;
import org.opencb.opencga.analysis.tools.OpenCgaToolScopeStudy;
import org.opencb.opencga.analysis.utils.InputFileUtils;
import org.opencb.opencga.core.common.JacksonUtils;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.common.UserProcessUtils;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.file.File;
Expand All @@ -26,13 +28,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
Expand All @@ -51,10 +51,15 @@ public class NextFlowExecutor extends OpenCgaToolScopeStudy {
private String cliParams;
// Build list of inputfiles in case we need to specifically mount them in read only mode
private List<String> inputFileUris;
// Build list of inputfiles in case we need to specifically mount them in read only mode
List<AbstractMap.SimpleEntry<String, String>> inputBindings;

private Thread thread;
private final int monitorThreadPeriod = 5000;

private final Path inputDir = Paths.get("/data/input");
private final String outputDir = "/data/output";

private final static Logger logger = LoggerFactory.getLogger(NextFlowExecutor.class);

@Override
Expand Down Expand Up @@ -87,8 +92,9 @@ protected void check() throws Exception {
if (CollectionUtils.isNotEmpty(workflow.getTags())) {
tags.addAll(workflow.getTags());
}
updateJobInformation(tags, attributes);
// updateJobInformation(tags, attributes);

this.inputBindings = new LinkedList<>();
if (MapUtils.isNotEmpty(nextflowParams.getParams())) {
this.inputFileUris = new LinkedList<>();
InputFileUtils inputFileUtils = new InputFileUtils(catalogManager);
Expand All @@ -102,8 +108,8 @@ protected void check() throws Exception {
}
if (inputFileUtils.isValidOpenCGAFile(entry.getValue())) {
File file = inputFileUtils.getOpenCGAFile(study, entry.getValue(), token);
cliParamsBuilder.append(file.getUri().getPath()).append(" ");
inputFileUris.add(file.getUri().getPath());
inputBindings.add(new AbstractMap.SimpleEntry<>(file.getUri().getPath(), inputDir.resolve(file.getName()).toString()));
cliParamsBuilder.append(inputDir).append("/").append(file.getName()).append(" ");
} else {
cliParamsBuilder.append(entry.getValue()).append(" ");
}
Expand All @@ -117,75 +123,67 @@ protected void check() throws Exception {

@Override
protected void run() throws Exception {
Path temporalInputDir = Files.createDirectory(getOutDir().resolve(".input"));
for (WorkflowScript script : workflow.getScripts()) {
// Write script files
Files.write(getOutDir().resolve(script.getFilename()), script.getContent().getBytes());
Files.write(temporalInputDir.resolve(script.getFileName()), script.getContent().getBytes());
inputBindings.add(new AbstractMap.SimpleEntry<>(temporalInputDir.resolve(script.getFileName()).toString(),
inputDir.resolve(script.getFileName()).toString()));
}

// Write nextflow.config file
URL nextflowConfig = getClass().getResource("/nextflow.config");
if (nextflowConfig != null) {
Files.copy(nextflowConfig.openStream(), getOutDir().resolve("nextflow.config"));
Files.copy(nextflowConfig.openStream(), temporalInputDir.resolve("nextflow.config"));
inputBindings.add(new AbstractMap.SimpleEntry<>(temporalInputDir.resolve("nextflow.config").toString(),
inputDir.resolve("nextflow.config").toString()));
} else {
throw new RuntimeException("Can't fetch nextflow.config file");
}

// Execute docker image
String workingDirectory = getOutDir().toAbsolutePath().toString();
Map<String, String> dockerParams = new HashMap<>();
dockerParams.put("user", "0:0");

// Build output binding
AbstractMap.SimpleEntry<String, String> outputBinding = new AbstractMap.SimpleEntry<>(getOutDir().toAbsolutePath().toString(),
outputDir);

String dockerImage = "nextflow/nextflow:" + workflow.getManager().getVersion();
StringBuilder stringBuilder = new StringBuilder()
.append("nextflow -c ").append(workingDirectory).append("/nextflow.config")
.append(" run ");
.append("bash -c \"nextflow -c ").append(inputDir).append("/nextflow.config").append(" run ");
if (workflow.getRepository() != null && StringUtils.isNotEmpty(workflow.getRepository().getImage())) {
stringBuilder.append(workflow.getRepository().getImage()).append(" -with-docker");
dockerParams.put("-v", "/var/run/docker.sock:/var/run/docker.sock");
} else {
for (WorkflowScript script : workflow.getScripts()) {
if (script.isMain()) {
stringBuilder.append(workingDirectory).append("/").append(script.getFilename());
stringBuilder.append(inputDir).append("/").append(script.getFileName());
break;
}
}
}
if (StringUtils.isNotEmpty(cliParams)) {
stringBuilder.append(" ").append(cliParams);
}
stringBuilder.append(" -with-report ").append(workingDirectory).append("/report.html");
List<String> cliArgs = Arrays.asList(StringUtils.split(stringBuilder.toString(), " "));
stringBuilder.append(" -with-report ").append(outputDir).append("/report.html");
// And give ownership permissions to the user running this process
stringBuilder.append("; chown -R ")
.append(UserProcessUtils.getUserUid()).append(":").append(UserProcessUtils.getGroupId()).append(" ").append(outputDir)
.append("\"");

startTraceFileMonitor();

// Execute docker image
StopWatch stopWatch = StopWatch.createStarted();

// Execute nextflow binary
ProcessBuilder processBuilder = new ProcessBuilder(cliArgs);
// Establish the working directory of the process
processBuilder.directory(getOutDir().toFile());
logger.info("Executing: {}", stringBuilder);
Process p;
try {
p = processBuilder.start();
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));
String line;
while ((line = input.readLine()) != null) {
logger.info("{}", line);
}
while ((line = error.readLine()) != null) {
logger.error("{} ", line);
}
p.waitFor();
input.close();
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Error executing cli: " + e.getMessage(), e);
}

DockerUtils.run(dockerImage, inputBindings, outputBinding, stringBuilder.toString(), dockerParams);
logger.info("Execution time: " + TimeUtils.durationToString(stopWatch));

// Delete input files
// Delete input files and temporal directory
for (WorkflowScript script : workflow.getScripts()) {
Files.delete(getOutDir().resolve(script.getFilename()));
Files.delete(temporalInputDir.resolve(script.getFileName()));
}
Files.delete(getOutDir().resolve("nextflow.config"));
Files.delete(temporalInputDir.resolve("nextflow.config"));
Files.delete(temporalInputDir);

endTraceFileMonitor();
}
Expand Down
2 changes: 1 addition & 1 deletion opencga-analysis/src/main/resources/nextflow.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
trace {
enabled = true
file = 'trace.txt'
file = '/data/output/trace.txt'
overwrite = true
fields = 'task_id,hash,name,status,start,complete,%cpu,peak_vmem'
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.file.FileCreateParams;
import org.opencb.opencga.core.models.workflow.NextFlowRunParams;
import org.opencb.opencga.core.models.workflow.WorkflowCreateParams;
import org.opencb.opencga.core.models.workflow.WorkflowRepository;
import org.opencb.opencga.core.models.workflow.WorkflowScript;
import org.opencb.opencga.core.models.workflow.*;
import org.opencb.opencga.storage.core.StorageEngineFactory;

import java.io.IOException;
Expand Down Expand Up @@ -85,6 +82,7 @@ public void nextflowDockerTest() throws ToolException, CatalogException, IOExcep
StorageConfiguration storageConfiguration = StorageConfiguration.load(inputStream, "yml");
WorkflowCreateParams workflow = new WorkflowCreateParams()
.setId("workflow")
.setType(Workflow.Type.OTHER)
// .setCommandLine("run nextflow-io/rnaseq-nf -with-docker");
.setRepository(new WorkflowRepository("nextflow-io/rnaseq-nf"));
catalogManager.getWorkflowManager().create(studyFqn, workflow.toWorkflow(), QueryOptions.empty(), ownerToken);
Expand All @@ -111,6 +109,7 @@ private WorkflowCreateParams createDummyWorkflow(String pipelineId) throws IOExc
String content = IOUtils.toString(inputStream, "UTF-8");
return new WorkflowCreateParams()
.setId("workflow")
.setType(Workflow.Type.OTHER)
.setScripts(Collections.singletonList(new WorkflowScript("pipeline.nf", content, true)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ process foo {

script:
"""
echo `cat $params.in` world > file.out
echo `cat $params.in` world > /data/output/file.out
"""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private void validateNewWorkflow(Workflow workflow, String userId) throws Catalo
workflow.setScripts(workflow.getScripts() != null ? workflow.getScripts() : Collections.emptyList());
boolean main = false;
for (WorkflowScript script : workflow.getScripts()) {
ParamUtils.checkIdentifier(script.getFilename(), SCRIPTS.key() + ".id");
ParamUtils.checkIdentifier(script.getFileName(), SCRIPTS.key() + ".id");
ParamUtils.checkParameter(script.getContent(), SCRIPTS.key() + ".content");
if (script.isMain()) {
if (main) {
Expand Down
2 changes: 1 addition & 1 deletion opencga-client/src/main/R/R/AllGenerics.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ setGeneric("jobClient", function(OpencgaR, job, jobs, members, endpointName, par

# ##############################################################################
## WorkflowClient
setGeneric("workflowClient", function(OpencgaR, workflowId, workflows, endpointName, params=NULL, ...)
setGeneric("workflowClient", function(OpencgaR, members, workflowId, workflows, endpointName, params=NULL, ...)
standardGeneric("workflowClient"))

# ##############################################################################
Expand Down
13 changes: 12 additions & 1 deletion opencga-client/src/main/R/R/Workflow-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#'
#' | endpointName | Endpoint WS | parameters accepted |
#' | -- | :-- | --: |
#' | updateAcl | /{apiVersion}/workflows/acl/{members}/update | study, members[*], action[*], body[*] |
#' | create | /{apiVersion}/workflows/create | include, exclude, study, includeResult, body[*] |
#' | distinct | /{apiVersion}/workflows/distinct | study, id, name, uuid, tags, draft, internal.registrationUserId, type, creationDate, modificationDate, acl, release, snapshot, deleted, field[*] |
#' | run | /{apiVersion}/workflows/run | study, jobId, jobDescription, jobDependsOn, jobTags, jobScheduledStartTime, jobPriority, jobDryRun, body[*] |
Expand All @@ -34,9 +35,19 @@
#' [*]: Required parameter
#' @export

setMethod("workflowClient", "OpencgaR", function(OpencgaR, workflowId, workflows, endpointName, params=NULL, ...) {
setMethod("workflowClient", "OpencgaR", function(OpencgaR, members, workflowId, workflows, endpointName, params=NULL, ...) {
switch(endpointName,

#' @section Endpoint /{apiVersion}/workflows/acl/{members}/update:
#' Update the set of permissions granted for the member.
#' @param study Study [[organization@]project:]study where study and project can be either the ID or UUID.
#' @param members Comma separated list of user or group ids.
#' @param action Action to be performed [ADD, SET, REMOVE or RESET]. Allowed values: ['SET ADD REMOVE RESET']
#' @param data JSON containing the parameters to update the permissions.
updateAcl=fetchOpenCGA(object=OpencgaR, category="workflows", categoryId=NULL, subcategory="acl",
subcategoryId=members, action="update", params=params, httpMethod="POST", as.queryParam=c("action"),
...),

#' @section Endpoint /{apiVersion}/workflows/create:
#' Create a workflow.
#' @param include Fields included in the response, whole JSON path must be provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opencb.opencga.core.models.workflow.NextFlowRunParams;
import org.opencb.opencga.core.models.workflow.Workflow;
import org.opencb.opencga.core.models.workflow.WorkflowAclEntryList;
import org.opencb.opencga.core.models.workflow.WorkflowAclUpdateParams;
import org.opencb.opencga.core.models.workflow.WorkflowCreateParams;
import org.opencb.opencga.core.models.workflow.WorkflowUpdateParams;
import org.opencb.opencga.core.response.RestResponse;
Expand All @@ -50,6 +51,24 @@ public WorkflowClient(String token, ClientConfiguration configuration) {
super(token, configuration);
}

/**
* Update the set of permissions granted for the member.
* @param members Comma separated list of user or group ids.
* @param action Action to be performed [ADD, SET, REMOVE or RESET].
* @param data JSON containing the parameters to update the permissions.
* @param params Map containing any of the following optional parameters.
* study: Study [[organization@]project:]study where study and project can be either the ID or UUID.
* @return a RestResponse object.
* @throws ClientException ClientException if there is any server error.
*/
public RestResponse<WorkflowAclEntryList> updateAcl(String members, String action, WorkflowAclUpdateParams data, ObjectMap params)
throws ClientException {
params = params != null ? params : new ObjectMap();
params.putIfNotNull("action", action);
params.put("body", data);
return execute("workflows", null, "acl", members, "update", params, POST, WorkflowAclEntryList.class);
}

/**
* Create a workflow.
* @param data JSON containing workflow information.
Expand Down
12 changes: 12 additions & 0 deletions opencga-client/src/main/javascript/Workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ export default class Workflow extends OpenCGAParentClass {
super(config);
}

/** Update the set of permissions granted for the member
* @param {String} members - Comma separated list of user or group ids.
* @param {Object} data - JSON containing the parameters to update the permissions.
* @param {"SET ADD REMOVE RESET"} action = "ADD" - Action to be performed [ADD, SET, REMOVE or RESET]. The default value is ADD.
* @param {Object} [params] - The Object containing the following optional parameters:
* @param {String} [params.study] - Study [[organization@]project:]study where study and project can be either the ID or UUID.
* @returns {Promise} Promise object in the form of RestResponse instance.
*/
updateAcl(members, action, data, params) {
return this._post("workflows", null, "acl", members, "update", data, {action, ...params});
}

/** Create a workflow
* @param {Object} data - JSON containing workflow information.
* @param {Object} [params] - The Object containing the following optional parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ class Workflow(_ParentRestClient):
def __init__(self, configuration, token=None, login_handler=None, *args, **kwargs):
super(Workflow, self).__init__(configuration, token, login_handler, *args, **kwargs)

def update_acl(self, members, action, data=None, **options):
"""
Update the set of permissions granted for the member.
PATH: /{apiVersion}/workflows/acl/{members}/update
:param dict data: JSON containing the parameters to update the
permissions. (REQUIRED)
:param str action: Action to be performed [ADD, SET, REMOVE or RESET].
Allowed values: ['SET ADD REMOVE RESET'] (REQUIRED)
:param str members: Comma separated list of user or group ids.
(REQUIRED)
:param str study: Study [[organization@]project:]study where study and
project can be either the ID or UUID.
"""

options['action'] = action
return self._post(category='workflows', resource='update', subcategory='acl', second_query_id=members, data=data, **options)

def create(self, data=None, **options):
"""
Create a workflow.
Expand Down
Loading

0 comments on commit da72511

Please sign in to comment.