Skip to content

Commit

Permalink
Merge pull request #2516 from opencb/TASK-7038
Browse files Browse the repository at this point in the history
TASK-7038 - Port Patch 3.2.2 -> 3.3.0 - Xetabase 2.2.2 -> 2.3.0
  • Loading branch information
juanfeSanahuja authored Oct 11, 2024
2 parents 285b667 + 10a95d3 commit fbbdceb
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public RestResponse<AuthenticationResponse> login(CustomUsersCommandOptions.Logi
String token = session.getSession().getToken();
String errorMsg = "Missing password. ";
if (StringUtils.isNotEmpty(token)) {
errorMsg += "Active token detected ";
errorMsg += "Active token detected. Please logout first.";
}
CommandLineUtils.error(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ private boolean checkAutoRefresh(OpencgaCommandExecutor commandExecutor) {

public void loadSessionStudies(OpencgaCommandExecutor commandExecutor) {
Session session = commandExecutor.getSessionManager().getSession();
logger.debug("Loading session studies using token: "
+ session.getToken());
OpenCGAClient openCGAClient = commandExecutor.getOpenCGAClient();
logger.debug("openCGAClient Token: " + openCGAClient.getToken());
if(StringUtils.isEmpty(openCGAClient.getToken())) {
openCGAClient.setToken(session.getToken());
}
logger.debug("Loading session studies using token: "
+ openCGAClient.getToken());
try {
// Query the server to retrieve the studies of user projects
RestResponse<Project> res = openCGAClient.getProjectClient().search(new ObjectMap());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opencb.opencga.app.migrations.v2.v2_12_5.storage;

import org.apache.commons.lang3.tuple.Pair;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
Expand Down Expand Up @@ -67,6 +68,8 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
if (fileSets.isEmpty()) {
logger.info("No concurrent file loadings found in study '{}'", study);
return;
} else {
logger.info("Found {} sets of files with shared samples in study '{}'", fileSets.size(), study);
}

Map<Integer, TaskMetadata> fileTasks = new HashMap<>();
Expand All @@ -85,10 +88,12 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}

Set<Set<Integer>> fileSetsToInvalidate = new HashSet<>();
Set<Integer> affectedFiles = new HashSet<>();
Set<Integer> affectedSamples = new HashSet<>();
for (Set<Integer> fileSet : fileSets) {
Set<Integer> affectedFiles = new HashSet<>();
Set<Integer> affectedSamples = new HashSet<>();
Set<Integer> invalidFiles = new HashSet<>();
Set<Integer> invalidSampleIndexes = new HashSet<>();

// Check if any task from this file set overlaps in time
List<TaskMetadata> tasks = new ArrayList<>();
for (Integer fileId : fileSet) {
Expand All @@ -97,8 +102,11 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
tasks.add(task);
}
}
if (tasks.size() > 1) {
logger.info("Found {} tasks loading files {}", tasks.size(), fileSet);
if (tasks.size() <= 1) {
continue;
} else {
logger.info("--------------------");
logger.info("Found {} tasks loading files {} in study {}", tasks.size(), fileSet, study);
for (int i = 0; i < tasks.size(); i++) {
TaskMetadata task1 = tasks.get(i);
Date task1start = task1.getStatus().firstKey();
Expand All @@ -108,8 +116,7 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
Date task2start = task2.getStatus().firstKey();
Date task2end = task2.getStatus().lastKey();
if (task1start.before(task2end) && task1end.after(task2start)) {
fileSetsToInvalidate.add(fileSet);
affectedFiles.addAll(task1.getFileIds());
affectedFiles.addAll(fileSet);

List<String> task1Files = task1.getFileIds().stream().map(fileId -> "'" + metadataManager.getFileName(studyId, fileId) + "'(" + fileId + ")").collect(Collectors.toList());
List<String> task2Files = task2.getFileIds().stream().map(fileId -> "'" + metadataManager.getFileName(studyId, fileId) + "'(" + fileId + ")").collect(Collectors.toList());
Expand All @@ -131,8 +138,6 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}

Set<Integer> invalidFiles = new HashSet<>();
List<Integer> invalidSampleIndexes = new ArrayList<>();
for (Integer sampleId : affectedSamples) {
String sampleName = metadataManager.getSampleName(studyId, sampleId);
SampleMetadata sampleMetadata = metadataManager.getSampleMetadata(studyId, sampleId);
Expand All @@ -145,7 +150,7 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
metadataManager.getFileName(studyId, file), file);
}
}
} else if (sampleMetadata.getSampleIndexStatus(Optional.of(sampleMetadata.getSampleIndexVersion()).orElse(-1)) == TaskMetadata.Status.READY) {
} else if (sampleMetadata.getSampleIndexStatus(Optional.ofNullable(sampleMetadata.getSampleIndexVersion()).orElse(-1)) == TaskMetadata.Status.READY) {
for (Integer fileId : sampleMetadata.getFiles()) {
if (affectedFiles.contains(fileId)) {
FileMetadata fileMetadata = metadataManager.getFileMetadata(studyId, fileId);
Expand Down Expand Up @@ -195,6 +200,8 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
}
}
} else {
logger.info("Sample '{}'({}) sample index is not in READY status. Invalidate to ensure rebuild", sampleName, sampleId);
logger.info(" - Invalidating sample index for sample '{}'({})", sampleName, sampleId);
invalidSampleIndexes.add(sampleId);
}
}
Expand All @@ -210,9 +217,19 @@ private void checkStudy(VariantStorageEngine engine, String study) throws Storag
invalidSamples.addAll(metadataManager.getSampleIdsFromFileId(studyId, fileId));
}

logger.info("Affected files: {}", invalidFiles);
logger.info("Affected samples: {}", invalidSamples);
logger.info("Affected sample indexes: {}", invalidSampleIndexes);
logger.info("Study '{}'", study);
List<Pair<String, Integer>> invalidFilesPairs = invalidFiles.stream()
.map(fileId -> Pair.of(metadataManager.getFileName(studyId, fileId), fileId))
.collect(Collectors.toList());
logger.info("Affected files: {}", invalidFilesPairs);
List<Pair<String, Integer>> invalidSamplesPairs = invalidSamples.stream()
.map(sampleId -> Pair.of(metadataManager.getSampleName(studyId, sampleId), sampleId))
.collect(Collectors.toList());
logger.info("Affected samples: {}", invalidSamplesPairs);
List<Pair<String, Integer>> invalidSampleIndexesPairs = invalidSampleIndexes.stream()
.map(sampleId -> Pair.of(metadataManager.getSampleName(studyId, sampleId), sampleId))
.collect(Collectors.toList());
logger.info("Affected sample indexes: {}", invalidSampleIndexesPairs);
}
} else {
ObjectMap event = new ObjectMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
import org.bson.conversions.Bson;
import org.opencb.opencga.catalog.db.api.CohortDBAdaptor;
import org.opencb.opencga.catalog.db.api.SampleDBAdaptor;
import org.opencb.opencga.catalog.db.api.StudyDBAdaptor;
import org.opencb.opencga.catalog.db.mongodb.MongoDBAdaptor;
import org.opencb.opencga.catalog.db.mongodb.OrganizationMongoDBAdaptorFactory;
import org.opencb.opencga.catalog.migration.Migration;
import org.opencb.opencga.catalog.migration.MigrationTool;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Migration(id = "syncCohortsAndSamplesMigration" ,
description = "Sync array of samples from cohort with array of cohortIds from Sample",
description = "Sync array of samples from cohort with array of cohortIds from Sample.",
version = "2.12.6",
domain = Migration.MigrationDomain.CATALOG,
language = Migration.MigrationLanguage.JAVA,
date = 20240621
date = 20240621,
patch = 2 // TASK-6998
)
public class SyncCohortsAndSamplesMigration extends MigrationTool {

Expand All @@ -31,8 +35,18 @@ protected void run() throws Exception {
MongoCollection<Document> sampleCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_COLLECTION);
MongoCollection<Document> sampleArchiveCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_ARCHIVE_COLLECTION);

// Fill map study uid - fqn
Map<Long, String> uidFqnMap = new HashMap<>();
Bson studyProjection = Projections.include(StudyDBAdaptor.QueryParams.UID.key(), StudyDBAdaptor.QueryParams.FQN.key());
queryMongo(OrganizationMongoDBAdaptorFactory.STUDY_COLLECTION, new Document(), studyProjection, study -> {
long studyUid = study.get(StudyDBAdaptor.QueryParams.UID.key(), Number.class).longValue();
String studyFqn = study.getString(StudyDBAdaptor.QueryParams.FQN.key());
uidFqnMap.put(studyUid, studyFqn);
});

queryMongo(OrganizationMongoDBAdaptorFactory.COHORT_COLLECTION, new Document(),
Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key()),
Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key(),
CohortDBAdaptor.QueryParams.STUDY_UID.key()),
cohortDoc -> {
String cohortId = cohortDoc.getString(CohortDBAdaptor.QueryParams.ID.key());
List<Document> samples = cohortDoc.getList(CohortDBAdaptor.QueryParams.SAMPLES.key(), Document.class);
Expand All @@ -50,8 +64,11 @@ protected void run() throws Exception {
long addedMissingCohort = sampleCollection.updateMany(query, update).getModifiedCount();
sampleArchiveCollection.updateMany(query, update);

long studyUid = cohortDoc.get(CohortDBAdaptor.QueryParams.STUDY_UID.key(), Number.class).longValue();

// Ensure there aren't any samples pointing to this cohort that are not in the samples array
query = Filters.and(
Filters.eq(SampleDBAdaptor.QueryParams.STUDY_UID.key(), studyUid),
Filters.nin(SampleDBAdaptor.QueryParams.UID.key(), sampleUids),
Filters.eq(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId),
Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true)
Expand All @@ -61,10 +78,10 @@ protected void run() throws Exception {
sampleArchiveCollection.updateMany(query, update);

if (addedMissingCohort > 0 || removedNonAssociatedCohort > 0) {
logger.info("Fixed cohort '{}' references. "
logger.info("Fixed cohort '{}' references from study '{}'. "
+ "Added missing reference to {} samples. "
+ "Removed non-associated reference from {} samples.",
cohortId, addedMissingCohort, removedNonAssociatedCohort);
cohortId, uidFqnMap.get(studyUid), addedMissingCohort, removedNonAssociatedCohort);
}
}
});
Expand Down

0 comments on commit fbbdceb

Please sign in to comment.