Skip to content

Commit

Permalink
Merge pull request #2519 from opencb/TASK-7097
Browse files Browse the repository at this point in the history
TASK-7097 - Port Patch 3.3.0 -> 4.0.0 Xetabase 2.3.0 -> 3.0.0
  • Loading branch information
juanfeSanahuja authored Oct 21, 2024
2 parents d8116e9 + af4146d commit 6ee003d
Show file tree
Hide file tree
Showing 18 changed files with 822 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public RestResponse<AuthenticationResponse> login(CustomUsersCommandOptions.Logi
String token = session.getSession().getToken();
String errorMsg = "Missing password. ";
if (StringUtils.isNotEmpty(token)) {
errorMsg += "Active token detected ";
errorMsg += "Active token detected. Please logout first.";
}
CommandLineUtils.error(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ private boolean checkAutoRefresh(OpencgaCommandExecutor commandExecutor) {

public void loadSessionStudies(OpencgaCommandExecutor commandExecutor) {
Session session = commandExecutor.getSessionManager().getSession();
logger.debug("Loading session studies using token: "
+ session.getToken());
OpenCGAClient openCGAClient = commandExecutor.getOpenCGAClient();
logger.debug("openCGAClient Token: " + openCGAClient.getToken());
if(StringUtils.isEmpty(openCGAClient.getToken())) {
openCGAClient.setToken(session.getToken());
}
logger.debug("Loading session studies using token: "
+ openCGAClient.getToken());
try {
// Query the server to retrieve the studies of user projects
RestResponse<Project> res = openCGAClient.getProjectClient().search(new ObjectMap());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opencb.opencga.app.migrations.v2.v2_12_5.storage;

import org.apache.commons.lang3.tuple.Pair;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
Expand Down Expand Up @@ -67,6 +68,8 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
if (fileSets.isEmpty()) {
logger.info("No concurrent file loadings found in study '{}'", study);
return;
} else {
logger.info("Found {} sets of files with shared samples in study '{}'", fileSets.size(), study);
}

Map<Integer, TaskMetadata> fileTasks = new HashMap<>();
Expand All @@ -85,10 +88,12 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}

Set<Set<Integer>> fileSetsToInvalidate = new HashSet<>();
Set<Integer> affectedFiles = new HashSet<>();
Set<Integer> affectedSamples = new HashSet<>();
for (Set<Integer> fileSet : fileSets) {
Set<Integer> affectedFiles = new HashSet<>();
Set<Integer> affectedSamples = new HashSet<>();
Set<Integer> invalidFiles = new HashSet<>();
Set<Integer> invalidSampleIndexes = new HashSet<>();

// Check if any task from this file set overlaps in time
List<TaskMetadata> tasks = new ArrayList<>();
for (Integer fileId : fileSet) {
Expand All @@ -97,8 +102,11 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
tasks.add(task);
}
}
if (tasks.size() > 1) {
logger.info("Found {} tasks loading files {}", tasks.size(), fileSet);
if (tasks.size() <= 1) {
continue;
} else {
logger.info("--------------------");
logger.info("Found {} tasks loading files {} in study {}", tasks.size(), fileSet, study);
for (int i = 0; i < tasks.size(); i++) {
TaskMetadata task1 = tasks.get(i);
Date task1start = task1.getStatus().firstKey();
Expand All @@ -108,8 +116,7 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
Date task2start = task2.getStatus().firstKey();
Date task2end = task2.getStatus().lastKey();
if (task1start.before(task2end) && task1end.after(task2start)) {
fileSetsToInvalidate.add(fileSet);
affectedFiles.addAll(task1.getFileIds());
affectedFiles.addAll(fileSet);

List<String> task1Files = task1.getFileIds().stream().map(fileId -> "'" + metadataManager.getFileName(studyId, fileId) + "'(" + fileId + ")").collect(Collectors.toList());
List<String> task2Files = task2.getFileIds().stream().map(fileId -> "'" + metadataManager.getFileName(studyId, fileId) + "'(" + fileId + ")").collect(Collectors.toList());
Expand All @@ -131,8 +138,6 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}

Set<Integer> invalidFiles = new HashSet<>();
List<Integer> invalidSampleIndexes = new ArrayList<>();
for (Integer sampleId : affectedSamples) {
String sampleName = metadataManager.getSampleName(studyId, sampleId);
SampleMetadata sampleMetadata = metadataManager.getSampleMetadata(studyId, sampleId);
Expand All @@ -145,7 +150,7 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
metadataManager.getFileName(studyId, file), file);
}
}
} else if (sampleMetadata.getSampleIndexStatus(Optional.of(sampleMetadata.getSampleIndexVersion()).orElse(-1)) == TaskMetadata.Status.READY) {
} else if (sampleMetadata.getSampleIndexStatus(Optional.ofNullable(sampleMetadata.getSampleIndexVersion()).orElse(-1)) == TaskMetadata.Status.READY) {
for (Integer fileId : sampleMetadata.getFiles()) {
if (affectedFiles.contains(fileId)) {
FileMetadata fileMetadata = metadataManager.getFileMetadata(studyId, fileId);
Expand Down Expand Up @@ -195,6 +200,8 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}
} else {
logger.info("Sample '{}'({}) sample index is not in READY status. Invalidate to ensure rebuild", sampleName, sampleId);
logger.info(" - Invalidating sample index for sample '{}'({})", sampleName, sampleId);
invalidSampleIndexes.add(sampleId);
}
}
Expand All @@ -210,9 +217,19 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
invalidSamples.addAll(metadataManager.getSampleIdsFromFileId(studyId, fileId));
}

logger.info("Affected files: {}", invalidFiles);
logger.info("Affected samples: {}", invalidSamples);
logger.info("Affected sample indexes: {}", invalidSampleIndexes);
logger.info("Study '{}'", study);
List<Pair<String, Integer>> invalidFilesPairs = invalidFiles.stream()
.map(fileId -> Pair.of(metadataManager.getFileName(studyId, fileId), fileId))
.collect(Collectors.toList());
logger.info("Affected files: {}", invalidFilesPairs);
List<Pair<String, Integer>> invalidSamplesPairs = invalidSamples.stream()
.map(sampleId -> Pair.of(metadataManager.getSampleName(studyId, sampleId), sampleId))
.collect(Collectors.toList());
logger.info("Affected samples: {}", invalidSamplesPairs);
List<Pair<String, Integer>> invalidSampleIndexesPairs = invalidSampleIndexes.stream()
.map(sampleId -> Pair.of(metadataManager.getSampleName(studyId, sampleId), sampleId))
.collect(Collectors.toList());
logger.info("Affected sample indexes: {}", invalidSampleIndexesPairs);
}
} else {
ObjectMap event = new ObjectMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
import org.bson.conversions.Bson;
import org.opencb.opencga.catalog.db.api.CohortDBAdaptor;
import org.opencb.opencga.catalog.db.api.SampleDBAdaptor;
import org.opencb.opencga.catalog.db.api.StudyDBAdaptor;
import org.opencb.opencga.catalog.db.mongodb.MongoDBAdaptor;
import org.opencb.opencga.catalog.db.mongodb.OrganizationMongoDBAdaptorFactory;
import org.opencb.opencga.catalog.migration.Migration;
import org.opencb.opencga.catalog.migration.MigrationTool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Migration(id = "syncCohortsAndSamplesMigration" ,
description = "Sync array of samples from cohort with array of cohortIds from Sample",
description = "Sync array of samples from cohort with array of cohortIds from Sample.",
version = "2.12.6",
domain = Migration.MigrationDomain.CATALOG,
language = Migration.MigrationLanguage.JAVA,
date = 20240621
date = 20240621,
patch = 2 // TASK-6998
)
public class SyncCohortsAndSamplesMigration extends MigrationTool {

Expand All @@ -31,8 +35,18 @@ protected void run() throws Exception {
MongoCollection<Document> sampleCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_COLLECTION);
MongoCollection<Document> sampleArchiveCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_ARCHIVE_COLLECTION);

// Fill map study uid - fqn
Map<Long, String> uidFqnMap = new HashMap<>();
Bson studyProjection = Projections.include(StudyDBAdaptor.QueryParams.UID.key(), StudyDBAdaptor.QueryParams.FQN.key());
queryMongo(OrganizationMongoDBAdaptorFactory.STUDY_COLLECTION, new Document(), studyProjection, study -> {
long studyUid = study.get(StudyDBAdaptor.QueryParams.UID.key(), Number.class).longValue();
String studyFqn = study.getString(StudyDBAdaptor.QueryParams.FQN.key());
uidFqnMap.put(studyUid, studyFqn);
});

queryMongo(OrganizationMongoDBAdaptorFactory.COHORT_COLLECTION, new Document(),
Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key()),
Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key(),
CohortDBAdaptor.QueryParams.STUDY_UID.key()),
cohortDoc -> {
String cohortId = cohortDoc.getString(CohortDBAdaptor.QueryParams.ID.key());
List<Document> samples = cohortDoc.getList(CohortDBAdaptor.QueryParams.SAMPLES.key(), Document.class);
Expand All @@ -50,8 +64,11 @@ protected void run() throws Exception {
long addedMissingCohort = sampleCollection.updateMany(query, update).getModifiedCount();
sampleArchiveCollection.updateMany(query, update);

long studyUid = cohortDoc.get(CohortDBAdaptor.QueryParams.STUDY_UID.key(), Number.class).longValue();

// Ensure there aren't any samples pointing to this cohort that are not in the samples array
query = Filters.and(
Filters.eq(SampleDBAdaptor.QueryParams.STUDY_UID.key(), studyUid),
Filters.nin(SampleDBAdaptor.QueryParams.UID.key(), sampleUids),
Filters.eq(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId),
Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true)
Expand All @@ -61,10 +78,10 @@ protected void run() throws Exception {
sampleArchiveCollection.updateMany(query, update);

if (addedMissingCohort > 0 || removedNonAssociatedCohort > 0) {
logger.info("Fixed cohort '{}' references. "
logger.info("Fixed cohort '{}' references from study '{}'. "
+ "Added missing reference to {} samples. "
+ "Removed non-associated reference from {} samples.",
cohortId, addedMissingCohort, removedNonAssociatedCohort);
cohortId, uidFqnMap.get(studyUid), addedMissingCohort, removedNonAssociatedCohort);
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion opencga-core/src/main/resources/configuration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobDir: ${OPENCGA.USER.WORKSPACE}/jobs
# Maximum number of login attempts before banning a user account
account:
maxLoginAttempts: ${OPENCGA.ACCOUNT.MAX_LOGIN_ATTEMPTS}
passwordExpirationDays: 0
passwordExpirationDays: 90

panel:
host: "http://resources.opencb.org/opencb/opencga/disease-panels"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,12 @@ protected DataResult<Job> submitJobRaw(String toolId, String project, String stu
String jobDescription, String jobDependsOnStr, String jobTagsStr, String jobScheduledStartTime,
String jobPriority, Boolean dryRun)
throws CatalogException {
Map<String, Object> paramsMap = bodyParams.toParams();
Map<String, Object> paramsMap;
if (bodyParams != null) {
paramsMap = bodyParams.toParams();
} else {
paramsMap = new HashMap<>();
}
if (StringUtils.isNotEmpty(study)) {
paramsMap.putIfAbsent(ParamConstants.STUDY_PARAM, study);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,12 @@ public Response submitOperationToProject(String toolId, String project, ToolPara
public Response submitOperation(String toolId, String project, String study, ToolParams params, String jobName, String jobDescription,
String jobDependsOn, String jobTags, String jobScheduledStartTime, String jobPriority, Boolean dryRun) {
try {
Map<String, Object> paramsMap = params.toParams();
Map<String, Object> paramsMap;
if (params != null) {
paramsMap = params.toParams();
} else {
paramsMap = new HashMap<>();
}
if (StringUtils.isNotEmpty(study)) {
paramsMap.put(ParamConstants.STUDY_PARAM, study);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public enum VariantStorageOptions implements ConfigurationOption {
ANNOTATOR_CELLBASE_VARIANT_LENGTH_THRESHOLD("annotator.cellbase.variantLengthThreshold", Integer.MAX_VALUE),
ANNOTATOR_CELLBASE_IMPRECISE_VARIANTS("annotator.cellbase.impreciseVariants", true),
ANNOTATOR_CELLBASE_STAR_ALTERNATE("annotator.cellbase.starAlternate", false),
ANNOTATOR_EXTENSION_PREFIX("annotator.extension."),
ANNOTATOR_EXTENSION_LIST("annotator.extension.list"),
ANNOTATOR_EXTENSION_COSMIC_FILE("annotator.extension.cosmic.file"),
ANNOTATOR_EXTENSION_COSMIC_VERSION("annotator.extension.cosmic.version"),

INDEX_SEARCH("indexSearch", false), // Build secondary indexes using search engine.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.opencb.opencga.storage.core.variant.adaptors.VariantField;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.annotation.annotators.VariantAnnotator;
import org.opencb.opencga.storage.core.variant.annotation.annotators.extensions.VariantAnnotatorExtensionTask;
import org.opencb.opencga.storage.core.variant.annotation.annotators.extensions.VariantAnnotatorExtensionsFactory;
import org.opencb.opencga.storage.core.variant.io.VariantReaderUtils;
import org.opencb.opencga.storage.core.variant.io.db.VariantAnnotationDBWriter;
import org.opencb.opencga.storage.core.variant.io.db.VariantDBReader;
Expand Down Expand Up @@ -265,6 +267,13 @@ public URI createAnnotation(URI outDir, String fileName, Query query, ObjectMap
return variantAnnotationList;
};

List<VariantAnnotatorExtensionTask> extensions = new VariantAnnotatorExtensionsFactory().getVariantAnnotatorExtensions(params);
for (VariantAnnotatorExtensionTask extension : extensions) {
extension.setup(outDir);
extension.checkAvailable();
annotationTask = annotationTask.then(extension);
}

final DataWriter<VariantAnnotation> variantAnnotationDataWriter;
if (avro) {
//FIXME
Expand All @@ -286,7 +295,7 @@ public URI createAnnotation(URI outDir, String fileName, Query query, ObjectMap
ParallelTaskRunner<Variant, VariantAnnotation> parallelTaskRunner =
new ParallelTaskRunner<>(variantDataReader, annotationTask, variantAnnotationDataWriter, config);
parallelTaskRunner.run();
} catch (ExecutionException e) {
} catch (Exception e) {
throw new VariantAnnotatorException("Error creating annotations", e);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.opencb.opencga.storage.core.variant.annotation.annotators.extensions;

import org.opencb.biodata.models.variant.avro.VariantAnnotation;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.run.Task;

import java.net.URI;
import java.util.List;

public interface VariantAnnotatorExtensionTask extends Task<VariantAnnotation, VariantAnnotation> {

/**
* Set up the annotator extension.
* This method will be called before any other method. It might generate extra files or data needed for the annotation.
*
* @param output Output directory where the annotator extension should write the files
* @return List of URIs of generated files (if any)
* @throws Exception if the annotator extension set up fails
*/
List<URI> setup(URI output) throws Exception;

/**
* Check if the annotator extension is available for the given options.
* @throws IllegalArgumentException if the annotator extension is not available
*/
void checkAvailable() throws IllegalArgumentException;

/**
* Check if the annotator extension is available for the given options. Do not throw any exception if the extension is not available.
* @return true if the annotator extension is available
*/
default boolean isAvailable() {
try {
checkAvailable();
return true;
} catch (IllegalArgumentException e) {
return false;
}
}

@Override
default void pre() throws Exception {
Task.super.pre();
checkAvailable();
}

/**
* Get the options for the annotator extension.
* @return Options for the annotator extension
*/
ObjectMap getOptions();

/**
* Get the metadata for the annotator extension.
* @return Metadata for the annotator extension
*/
ObjectMap getMetadata();

}
Loading

0 comments on commit 6ee003d

Please sign in to comment.