Skip to content

Commit

Permalink
Merge pull request #2491 from opencb/TASK-6515
Browse files Browse the repository at this point in the history
TASK-6515 - Port Patch 1.10.6.1 -> 2.2.1
  • Loading branch information
pfurio authored Aug 20, 2024
2 parents cb26306 + 568244c commit f1572cb
Show file tree
Hide file tree
Showing 51 changed files with 667 additions and 369 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ private void rebuildSampleFileIds(VariantStorageMetadataManager metadataManager,
for (Map.Entry<Integer, List<Integer>> entry : batch.entrySet()) {
Integer sampleId = entry.getKey();
List<Integer> fileIds = entry.getValue();

List<Integer> actualFiles = metadataManager.getSampleMetadata(studyId, sampleId).getFiles();
if (actualFiles.size() != fileIds.size() || !actualFiles.containsAll(fileIds)) {
fixedSamples++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -191,14 +190,7 @@ public ObjectMap getConfiguration() {

public Lock lockGlobal(long lockDuration, long timeout, String lockName)
throws StorageEngineException {
try {
return projectDBAdaptor.lockProject(lockDuration, timeout, lockName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException("Unable to lock the Project", e);
} catch (TimeoutException e) {
throw new StorageEngineException("Unable to lock the Project", e);
}
return projectDBAdaptor.lockProject(lockDuration, timeout, lockName);
}

public Lock lockStudy(int studyId) throws StorageEngineException {
Expand Down Expand Up @@ -282,17 +274,14 @@ public <E extends Exception> StudyMetadata updateStudyMetadata(Object study, Upd
throws StorageEngineException, E {
int studyId = getStudyId(study);

Lock lock = lockStudy(studyId);
try {
try (Lock lock = lockStudy(studyId)) {
StudyMetadata sm = getStudyMetadata(studyId);

sm = updater.update(sm);

lock.checkLocked();
unsecureUpdateStudyMetadata(sm);
return sm;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -557,16 +546,8 @@ public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateConsume
public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateFunction<ProjectMetadata, E> function)
throws StorageEngineException, E {
Objects.requireNonNull(function);
Lock lock;
try {
lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException("Unable to lock the Project", e);
} catch (TimeoutException e) {
throw new StorageEngineException("Unable to lock the Project", e);
}
try {

try (Lock lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout)) {
ProjectMetadata projectMetadata = getProjectMetadata();
int countersHash = (projectMetadata == null ? Collections.emptyMap() : projectMetadata.getCounters()).hashCode();

Expand All @@ -579,8 +560,6 @@ public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateFunctio
lock.checkLocked();
projectDBAdaptor.updateProjectMetadata(projectMetadata, updateCounters);
return projectMetadata;
} finally {
lock.unlock();
}
}

Expand All @@ -594,11 +573,15 @@ public ProjectMetadata getProjectMetadata() {

public ProjectMetadata getAndUpdateProjectMetadata(ObjectMap options) throws StorageEngineException {
ProjectMetadata projectMetadata = getProjectMetadata();

checkSameSpeciesAndAssembly(options, projectMetadata);
if (options != null && (projectMetadata == null
|| StringUtils.isEmpty(projectMetadata.getSpecies()) && options.containsKey(SPECIES.key())
|| StringUtils.isEmpty(projectMetadata.getAssembly()) && options.containsKey(ASSEMBLY.key()))) {

projectMetadata = updateProjectMetadata(pm -> {
// Check again, in case it was updated by another thread
checkSameSpeciesAndAssembly(options, pm);
if (pm == null) {
pm = new ProjectMetadata();
}
Expand All @@ -619,6 +602,25 @@ public ProjectMetadata getAndUpdateProjectMetadata(ObjectMap options) throws Sto
return projectMetadata;
}

private static void checkSameSpeciesAndAssembly(ObjectMap options, ProjectMetadata projectMetadata) throws StorageEngineException {
if (options != null && projectMetadata != null) {
if (options.containsKey(ASSEMBLY.key())) {
if (StringUtils.isNotEmpty(projectMetadata.getAssembly()) && !projectMetadata.getAssembly()
.equalsIgnoreCase(options.getString(ASSEMBLY.key()))) {
throw new StorageEngineException("Incompatible assembly change from '" + projectMetadata.getAssembly() + "' to '"
+ options.getString(ASSEMBLY.key()) + "'");
}
}
if (options.containsKey(SPECIES.key())) {
if (StringUtils.isNotEmpty(projectMetadata.getSpecies()) && !projectMetadata.getSpecies()
.equalsIgnoreCase(toCellBaseSpeciesName(options.getString(SPECIES.key())))) {
throw new StorageEngineException("Incompatible species change from '" + projectMetadata.getSpecies() + "' to '"
+ options.getString(SPECIES.key()) + "'");
}
}
}
}

public DataResult<VariantFileMetadata> getVariantFileMetadata(int studyId, int fileId, QueryOptions options)
throws StorageEngineException {
return fileDBAdaptor.getVariantFileMetadata(studyId, fileId, options);
Expand Down Expand Up @@ -673,16 +675,14 @@ public void unsecureUpdateFileMetadata(int studyId, FileMetadata file) {
public <E extends Exception> FileMetadata updateFileMetadata(int studyId, int fileId, UpdateConsumer<FileMetadata, E> update)
throws E, StorageEngineException {
getFileName(studyId, fileId); // Check file exists
Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout);
try {

try (Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout)) {
FileMetadata fileMetadata = getFileMetadata(studyId, fileId);
update.update(fileMetadata);
lock.checkLocked();
unsecureUpdateFileMetadata(studyId, fileMetadata);
fileIdIndexedCache.put(studyId, fileId, fileMetadata.isIndexed());
return fileMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -863,6 +863,19 @@ public Iterator<FileMetadata> fileMetadataIterator(int studyId) {
return fileDBAdaptor.fileIterator(studyId);
}

public SampleMetadata getSampleMetadata(Integer studyId, Integer sampleId) {
return getSampleMetadata(studyId.intValue(), sampleId.intValue());
}

public SampleMetadata getSampleMetadata(int studyId, Integer sampleId) {
return getSampleMetadata(studyId, sampleId.intValue());
}

public SampleMetadata getSampleMetadata(int studyId, Object sample) {
int sampleId = getSampleIdOrFail(studyId, sample);
return getSampleMetadata(studyId, sampleId);
}

public SampleMetadata getSampleMetadata(int studyId, int sampleId) {
return sampleDBAdaptor.getSampleMetadata(studyId, sampleId, null);
}
Expand All @@ -875,15 +888,13 @@ public void unsecureUpdateSampleMetadata(int studyId, SampleMetadata sample) {
public <E extends Exception> SampleMetadata updateSampleMetadata(int studyId, int sampleId, UpdateConsumer<SampleMetadata, E> consumer)
throws E, StorageEngineException {
getSampleName(studyId, sampleId); // Check sample exists
Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout);
try {

try (Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout)) {
SampleMetadata sample = getSampleMetadata(studyId, sampleId);
sample = consumer.toFunction().update(sample);
lock.checkLocked();
unsecureUpdateSampleMetadata(studyId, sample);
return sample;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -1054,15 +1065,12 @@ public void unsecureUpdateCohortMetadata(int studyId, CohortMetadata cohort) {
public <E extends Exception> CohortMetadata updateCohortMetadata(int studyId, int cohortId, UpdateConsumer<CohortMetadata, E> update)
throws E, StorageEngineException {
getCohortName(studyId, cohortId); // Check cohort exists
Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout);
try {
try (Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout)) {
CohortMetadata cohortMetadata = getCohortMetadata(studyId, cohortId);
update.update(cohortMetadata);
lock.checkLocked();
unsecureUpdateCohortMetadata(studyId, cohortMetadata);
return cohortMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -1190,13 +1198,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle
for (Integer sampleId : sampleIds) {
Integer finalCohortId = cohortId;
if (secondaryIndexCohort) {
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addSecondaryIndexCohort(finalCohortId);
});
if (!getSampleMetadata(studyId, sampleId).getSecondaryIndexCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addSecondaryIndexCohort(finalCohortId);
});
}
} else {
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addCohort(finalCohortId);
});
if (!getSampleMetadata(studyId, sampleId).getCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addCohort(finalCohortId);
});
}
}
}

Expand All @@ -1209,13 +1223,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle
Integer finalCohortId = cohortId;
if (!sampleIds.contains(sampleFromCohort)) {
if (secondaryIndexCohort) {
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId);
});
if (getSampleMetadata(studyId, sampleFromCohort).getSecondaryIndexCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId);
});
}
} else {
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getCohorts().remove(finalCohortId);
});
if (getSampleMetadata(studyId, sampleFromCohort).getCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getCohorts().remove(finalCohortId);
});
}
}
}
}
Expand Down Expand Up @@ -1326,15 +1346,12 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng
public <E extends Exception> TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer<TaskMetadata, E> consumer)
throws E, StorageEngineException {
getTask(studyId, taskId); // Check task exists
Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout);
try {
try (Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout)) {
TaskMetadata task = getTask(studyId, taskId);
consumer.update(task);
lock.checkLocked();
unsecureUpdateTask(studyId, task);
return task;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.opencb.opencga.storage.core.metadata.models.ProjectMetadata;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Created on 02/05/18.
Expand All @@ -17,14 +16,12 @@
public interface ProjectMetadataAdaptor extends AutoCloseable {

default Lock lockProject(long lockDuration, long timeout)
throws InterruptedException, TimeoutException, StorageEngineException {
throws StorageEngineException {
return lockProject(lockDuration, timeout, null);
}

Lock lockProject(long lockDuration, long timeout, String lockName)
throws InterruptedException, TimeoutException, StorageEngineException;

void unLockProject(long lockId) throws StorageEngineException;
throws StorageEngineException;

DataResult<ProjectMetadata> getProjectMetadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ public VariantQueryExecutor getVariantQueryExecutor(Query query, QueryOptions op
public VariantQueryExecutor getVariantQueryExecutor(ParsedVariantQuery variantQuery) {
try {
for (VariantQueryExecutor executor : getVariantQueryExecutors()) {
if (executor.canUseThisExecutor(variantQuery.getQuery(), variantQuery.getInputOptions())) {
if (executor.canUseThisExecutor(variantQuery, variantQuery.getInputOptions())) {
logger.info("Using VariantQueryExecutor : " + executor.getClass().getName());
logger.info(" Query : " + VariantQueryUtils.printQuery(variantQuery.getInputQuery()));
logger.info(" Options : " + variantQuery.getInputOptions().toJson());
Expand All @@ -1362,6 +1362,19 @@ public VariantQueryExecutor getVariantQueryExecutor(ParsedVariantQuery variantQu
throw new VariantQueryException("No VariantQueryExecutor found to run the query!");
}

public final VariantQueryExecutor getVariantQueryExecutor(Class<? extends VariantQueryExecutor> clazz)
throws StorageEngineException {
Optional<VariantQueryExecutor> first = getVariantQueryExecutors()
.stream()
.filter(e -> e instanceof SearchIndexVariantQueryExecutor)
.findFirst();
if (first.isPresent()) {
return first.get();
} else {
throw new StorageEngineException("VariantQueryExecutor " + clazz + " not found");
}
}

public Query preProcessQuery(Query originalQuery, QueryOptions options) {
try {
return getVariantQueryParser().preProcessQuery(originalQuery, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public enum VariantStorageOptions implements ConfigurationOption {

INDEX_SEARCH("indexSearch", false), // Build secondary indexes using search engine.

METADATA_LOCK_DURATION("metadata.lock.duration", 5000),
METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 60000),
METADATA_LOCK_DURATION("metadata.lock.duration", 60000),
METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 600000),
METADATA_LOAD_BATCH_SIZE("metadata.load.batchSize", 10),
METADATA_LOAD_THREADS("metadata.load.numThreads", 4),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@
import org.opencb.opencga.storage.core.io.plain.StringDataReader;
import org.opencb.opencga.storage.core.io.plain.StringDataWriter;
import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager;
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.FileMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.metadata.models.*;
import org.opencb.opencga.storage.core.variant.adaptors.GenotypeClass;
import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor;
import org.opencb.opencga.storage.core.variant.io.VariantReaderUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,6 @@ public VariantStudyQuery setStudies(ParsedQuery<String> studies) {
return this;
}

public String getStudyOrFail() {
if (studies == null || studies.size() != 1) {
throw new VariantQueryException("Require exactly one study");
} else {
return studies.get(0);
}
}

public ParsedQuery<KeyOpValue<SampleMetadata, List<String>>> getGenotypes() {
return genotypes;
}
Expand Down Expand Up @@ -311,6 +303,19 @@ public void setDefaultStudy(StudyMetadata defaultStudy) {
public StudyMetadata getDefaultStudy() {
return defaultStudy;
}

public StudyMetadata getDefaultStudyOrFail() {
if (defaultStudy == null) {
if (studies.size() != 1) {
throw new VariantQueryException("Only one study is allowed. Found " + studies.size() + " studies");
} else {
throw new VariantQueryException("One study required. None provided");
}
} else {
return defaultStudy;
}
}

}

public static class VariantQueryXref {
Expand Down
Loading

0 comments on commit f1572cb

Please sign in to comment.