Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-5448 - Add mandatory variant setup step #2454

Merged
merged 25 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3e75a86
storage: Implement HBase table autoscale #TASK-5448
j-coll Apr 19, 2024
d710c9d
Merge branch 'develop' into TASK-5448
j-coll May 14, 2024
3a77b59
storage: Fix hbase compatibility for table autoscale. #TASK-5448
j-coll May 16, 2024
28d9ecd
storage: Add variant-setup operation. #TASK-5861, #TASK-5448
j-coll May 16, 2024
17955cf
storage: Make variant-setup mandatory for any operation. #TASK-5861, …
j-coll May 16, 2024
55d5655
storage: Change some default variant-setup values. #TASK-5861, #TASK-…
j-coll May 17, 2024
0e9dda9
app: Add migration VariantSetupMigration. Add migration test. #TASK-5…
j-coll May 22, 2024
5e31a8a
storage: Change param names from VariantSetupParams #TASK-5861, #TASK…
j-coll May 22, 2024
a65f4a2
app: Fix compilation issue. #TASK-5861, #TASK-5448
j-coll May 22, 2024
bdd4f80
storage: Fix tests. #TASK-5861, #TASK-5448
j-coll May 23, 2024
f5ea5a4
core: Add data field descriptions to VariantSetupPatams. #TASK-5861, …
j-coll May 23, 2024
e396772
storage: Add normalization extensions to setup params. #TASK-5861, #T…
j-coll May 23, 2024
eb0497f
Merge branch 'develop' into TASK-5448
j-coll Jun 6, 2024
dac0233
storage: Change default enabled normalization extensions. From NONE t…
j-coll Jun 6, 2024
6193833
server: Add variant/setup operation. #TASK-5861, #TASK-5448
j-coll Jun 6, 2024
d1155ad
Merge branch 'develop' into TASK-5448
j-coll Jul 10, 2024
e8666f9
app: Fix merge issues. #TASK-5448
j-coll Jul 10, 2024
44062c1
storage: Allow repeat variant-setup before indexing any file. #TASK-5448
j-coll Jul 10, 2024
00b985d
storage: Remove some default normalization extensions. #TASK-5448
j-coll Jul 10, 2024
2b111b2
Merge remote-tracking branch 'origin/release-3.2.x' into TASK-5448
j-coll Jul 19, 2024
ce0a8db
analysis: Fix compilation issues. #TASK-5448
j-coll Jul 19, 2024
0364cfe
Merge branch 'release-3.2.x' into TASK-5448
j-coll Aug 29, 2024
ce3d6c4
storage: Remove verbose logging. #TASK-5448
j-coll Sep 2, 2024
2c63795
Merge branch 'release-3.2.x' into TASK-5448
j-coll Sep 2, 2024
fc2e0d9
storage: Fix compilation issue. #TASK-5448
j-coll Sep 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@
import org.opencb.opencga.core.models.sample.SamplePermissions;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.models.study.StudyPermissions;
import org.opencb.opencga.core.models.study.VariantSetupResult;
import org.opencb.opencga.core.models.variant.VariantSetupParams;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.storage.core.variant.query.VariantQueryResult;
import org.opencb.opencga.core.tools.ToolParams;
import org.opencb.opencga.storage.core.StorageEngineFactory;
import org.opencb.opencga.storage.core.StoragePipelineResult;
Expand All @@ -88,6 +89,7 @@
import org.opencb.opencga.storage.core.variant.adaptors.iterators.VariantDBIterator;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory.VariantOutputFormat;
import org.opencb.opencga.storage.core.variant.query.ParsedQuery;
import org.opencb.opencga.storage.core.variant.query.VariantQueryResult;
import org.opencb.opencga.storage.core.variant.query.VariantQueryUtils;
import org.opencb.opencga.storage.core.variant.query.projection.VariantQueryProjectionParser;
import org.opencb.opencga.storage.core.variant.score.VariantScoreFormatDescriptor;
Expand Down Expand Up @@ -490,6 +492,18 @@ public void aggregate(String studyStr, VariantAggregateParams params, String tok
});
}

public VariantSetupResult variantSetup(String studyStr, VariantSetupParams params, String token)
throws CatalogException, StorageEngineException {
return secureOperation(VariantSetupOperationManager.ID, studyStr, params.toObjectMap(), token,
engine -> new VariantSetupOperationManager(this, engine).setup(getStudyFqn(studyStr, token), params, token));
}

public boolean hasVariantSetup(String studyStr, String token) throws CatalogException {
Study study = catalogManager.getStudyManager().get(studyStr,
new QueryOptions(INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE.key()), token).first();
return VariantSetupOperationManager.hasVariantSetup(study);
}

public ObjectMap configureProject(String projectStr, ObjectMap params, String token) throws CatalogException, StorageEngineException {
return secureOperationByProject("configure", projectStr, params, token, engine -> {
DataStore dataStore = getDataStoreByProjectId(projectStr, token);
Expand Down Expand Up @@ -1181,7 +1195,7 @@ private interface VariantOperationFunction<R> {
private <R> R secureOperationByProject(String operationName, String project, ObjectMap params, String token, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {
try (VariantStorageEngine variantStorageEngine = getVariantStorageEngineByProject(project, params, token)) {
return secureTool(operationName, true, params, token, variantStorageEngine, operation);
return secureTool(operationName, true, null, params, token, variantStorageEngine, operation);
} catch (IOException e) {
throw new StorageEngineException("Error closing the VariantStorageEngine", e);
}
Expand All @@ -1190,7 +1204,7 @@ private <R> R secureOperationByProject(String operationName, String project, Obj
private <R> R secureOperation(String operationName, String study, ObjectMap params, String token, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {
try (VariantStorageEngine variantStorageEngine = getVariantStorageEngineForStudyOperation(study, params, token)) {
return secureTool(operationName, true, params, token, variantStorageEngine, operation);
return secureTool(operationName, true, study, params, token, variantStorageEngine, operation);
} catch (IOException e) {
throw new StorageEngineException("Error closing the VariantStorageEngine", e);
}
Expand All @@ -1199,7 +1213,7 @@ private <R> R secureOperation(String operationName, String study, ObjectMap para
private <R> R secureAnalysis(String operationName, String study, ObjectMap params, String token, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {
try (VariantStorageEngine variantStorageEngine = getVariantStorageEngineForStudyOperation(study, params, token)) {
return secureTool(operationName, false, params, token, variantStorageEngine, operation);
return secureTool(operationName, false, study, params, token, variantStorageEngine, operation);
} catch (IOException e) {
throw new StorageEngineException("Error closing the VariantStorageEngine", e);
}
Expand All @@ -1221,7 +1235,7 @@ private <R> R secureOperationByProject(String operationName, String projectStr,
return secureOperationByProject(operationName, projectStr, params, token, operation);
}

private <R> R secureTool(String toolId, boolean isOperation, ObjectMap params, String token,
private <R> R secureTool(String toolId, boolean isOperation, String study, ObjectMap params, String token,
VariantStorageEngine variantStorageEngine, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {

Expand All @@ -1241,6 +1255,15 @@ private <R> R secureTool(String toolId, boolean isOperation, ObjectMap params, S
throw new StorageEngineException("Unable to execute operation '" + toolId + "'. "
+ "The storage engine is in mode=" + storageConfiguration.getMode());
}
if (isOperation && study != null && !VariantSetupOperationManager.ID.equals(toolId)) {
// Ensure that the variant setup has been executed
// do not check for the setup operation itself
// Project level operations can not be checked for setup.
if (!hasVariantSetup(study, token)) {
throw new StorageEngineException("Unable to execute operation '" + toolId + "'. "
+ "The variant storage has not been setup for study '" + study + "'");
}
}
result = operation.apply(variantStorageEngine);
return result;
} catch (CatalogException | StorageEngineException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.opencb.opencga.analysis.variant.manager.operations;

import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.variant.manager.VariantStorageManager;
import org.opencb.opencga.catalog.db.api.StudyDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.models.study.VariantSetupResult;
import org.opencb.opencga.core.models.variant.VariantSetupParams;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VariantSetupOperationManager extends OperationManager {


public static final String ID = "variant-setup";
private static Logger logger = LoggerFactory.getLogger(VariantSetupOperationManager.class);

public VariantSetupOperationManager(VariantStorageManager variantStorageManager, VariantStorageEngine variantStorageEngine) {
super(variantStorageManager, variantStorageEngine);
}

public VariantSetupResult setup(String studyFqn, VariantSetupParams params, String token)
throws CatalogException, StorageEngineException {
// Copy params to avoid modifying input object
params = new VariantSetupParams(params);
check(studyFqn, params, token);

VariantSetupResult result = new VariantSetupResult();
result.setDate(TimeUtils.getTime());
result.setUserId(catalogManager.getUserManager().getUserIdContextStudy(studyFqn, token));
result.setParams(params.toObjectMap());
result.setStatus(VariantSetupResult.Status.READY);

inferParams(params);

ObjectMap options = variantStorageEngine.inferConfigurationParams(params);
result.setOptions(options);

catalogManager.getStudyManager().setVariantEngineSetupOptions(studyFqn, result, token);

return result;
}

/**
* Infer some parameters from others.
* - averageFileSize inferred from fileType
* - samplesPerFile inferred from dataDistribution or expectedSamplesNumber and expectedFilesNumber
* - numberOfVariantsPerSample inferred from fileType
* @param params params to infer
*/
private void inferParams(VariantSetupParams params) {
if (params.getFileType() != null) {
switch (params.getFileType()) {
case GENOME_gVCF:
if (params.getAverageFileSize() == null) {
params.setAverageFileSize("1GiB");
}
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(5000000);
}
break;
case GENOME_VCF:
if (params.getAverageFileSize() == null) {
params.setAverageFileSize("500MiB");
}
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(5000000);
}
break;
case EXOME:
if (params.getAverageFileSize() == null) {
params.setAverageFileSize("100MiB");
}
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(100000);
}
break;
default:
throw new IllegalArgumentException("Unknown fileType " + params.getFileType());
}
}
// Unable to tell. Use a default value for numberOfVariantsPerSample
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(5000000);
}

if (params.getAverageSamplesPerFile() == null) {
if (params.getDataDistribution() == null) {
params.setAverageSamplesPerFile(params.getExpectedSamples().floatValue() / params.getExpectedFiles().floatValue());
} else {
switch (params.getDataDistribution()) {
case SINGLE_SAMPLE_PER_FILE:
params.setAverageSamplesPerFile(1f);
break;
case MULTIPLE_SAMPLES_PER_FILE:
params.setAverageSamplesPerFile(params.getExpectedSamples().floatValue() / params.getExpectedFiles().floatValue());
break;
case MULTIPLE_FILES_PER_SAMPLE:
// Hard to tell. Let's assume 2 samples per file
params.setAverageSamplesPerFile(2f);
break;
case FILES_SPLIT_BY_CHROMOSOME:
case FILES_SPLIT_BY_REGION:
params.setAverageSamplesPerFile(params.getExpectedSamples().floatValue());
break;
default:
throw new IllegalArgumentException("Unknown dataDistribution " + params.getDataDistribution());
}
}
}
}

private void check(String studyStr, VariantSetupParams params, String token) throws CatalogException, StorageEngineException {
Study study = catalogManager.getStudyManager().get(studyStr,
new QueryOptions(QueryOptions.INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE.key()), token)
.first();

VariantStorageMetadataManager metadataManager = variantStorageEngine.getMetadataManager();
if (metadataManager.studyExists(studyStr)) {
int studyId = metadataManager.getStudyId(studyStr);
if (!metadataManager.getIndexedFiles(studyId).isEmpty()) {
throw new IllegalArgumentException("Unable to execute variant-setup on study '" + studyStr + "'. "
+ "It already has indexed files.");
}
}
if (hasVariantSetup(study)) {
logger.info("Study {} was already setup. Re executing variant-setup", studyStr);
}

if (params.getExpectedFiles() == null || params.getExpectedFiles() <= 0) {
throw new IllegalArgumentException("Missing expectedFiles");
}
if (params.getExpectedSamples() == null || params.getExpectedSamples() <= 0) {
throw new IllegalArgumentException("Missing expectedSamples");
}

if (params.getAverageFileSize() == null && params.getFileType() == null) {
throw new IllegalArgumentException("Missing averageFileSize or fileType");
}
}

public static boolean hasVariantSetup(Study study) {
boolean hasSetup = false;
VariantSetupResult setup = getVariantSetupResult(study);
if (setup != null && setup.getStatus() == VariantSetupResult.Status.READY) {
hasSetup = true;
}
return hasSetup;
}

private static VariantSetupResult getVariantSetupResult(Study study) {
if (study.getInternal() != null
&& study.getInternal().getConfiguration() != null
&& study.getInternal().getConfiguration().getVariantEngine() != null) {
return study.getInternal().getConfiguration().getVariantEngine().getSetup();
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ protected void check() throws Exception {
params.putIfNotEmpty(VariantStorageOptions.INCLUDE_GENOTYPE.key(), indexParams.getIncludeGenotypes());
params.put(VariantStorageOptions.STATS_AGGREGATION.key(), indexParams.getAggregated());
params.putIfNotEmpty(VariantStorageOptions.STATS_AGGREGATION_MAPPING_FILE.key(), indexParams.getAggregationMappingFile());
params.put(VariantStorageOptions.GVCF.key(), indexParams.isGvcf());
if (indexParams.isGvcf()) {
params.put(VariantStorageOptions.GVCF.key(), indexParams.isGvcf());
}

// queryOptions.putIfNotNull(VariantFileIndexerStorageOperation.TRANSFORMED_FILES, indexParams.transformedPaths);

Expand All @@ -92,7 +94,9 @@ protected void check() throws Exception {
params.put(VariantStorageOptions.FAMILY.key(), indexParams.isFamily());
params.put(VariantStorageOptions.SOMATIC.key(), indexParams.isSomatic());
params.putIfNotEmpty(VariantStorageOptions.LOAD_SPLIT_DATA.key(), indexParams.getLoadSplitData());
params.put(VariantStorageOptions.LOAD_MULTI_FILE_DATA.key(), indexParams.isLoadMultiFileData());
if (indexParams.isLoadMultiFileData()) {
params.put(VariantStorageOptions.LOAD_MULTI_FILE_DATA.key(), indexParams.isLoadMultiFileData());
}
params.putIfNotEmpty(VariantStorageOptions.LOAD_SAMPLE_INDEX.key(), indexParams.getLoadSampleIndex());
params.putIfNotEmpty(VariantStorageOptions.LOAD_ARCHIVE.key(), indexParams.getLoadArchive());
params.putIfNotEmpty(VariantStorageOptions.LOAD_HOM_REF.key(), indexParams.getLoadHomRef());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opencb.biodata.models.variant.avro.SequenceOntologyTerm;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.analysis.variant.OpenCGATestExternalResource;
import org.opencb.opencga.analysis.variant.manager.VariantOperationsTest;
import org.opencb.opencga.analysis.variant.manager.VariantStorageManager;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.AbstractClinicalManagerTest;
Expand Down Expand Up @@ -56,11 +57,12 @@ public static AbstractClinicalManagerTest getClinicalTest(OpenCGATestExternalRes
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), false);

VariantStorageManager variantStorageManager = new VariantStorageManager(opencga.getCatalogManager(), opencga.getStorageEngineFactory());
VariantStorageManager variantStorageManager = opencga.getVariantStorageManager();

Path outDir = Paths.get("target/test-data").resolve("junit_clinical_analysis_" + RandomStringUtils.randomAlphabetic(10));
Files.createDirectories(outDir);

VariantOperationsTest.dummyVariantSetup(variantStorageManager, clinicalTest.studyFqn, clinicalTest.token);
variantStorageManager.index(clinicalTest.studyFqn, "family.vcf", outDir.toString(), storageOptions, clinicalTest.token);
variantStorageManager.index(clinicalTest.studyFqn, "exomiser.vcf.gz", outDir.toString(), storageOptions, clinicalTest.token);
variantStorageManager.index(clinicalTest.studyFqn, "HG004.1k.vcf.gz", outDir.toString(), storageOptions, clinicalTest.token);
Expand Down
Loading
Loading