Skip to content

Commit

Permalink
qc: Individual QC add plink check-sex and impute-sex functions. #TASK…
Browse files Browse the repository at this point in the history
…-6773 #TASK-6766
  • Loading branch information
fp215 committed Nov 8, 2024
1 parent 432b210 commit 9055601
Showing 1 changed file with 188 additions and 24 deletions.
212 changes: 188 additions & 24 deletions opencga-app/app/analysis/qc/individual_qc/variant_based_inferred_sex.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
class VariantBasedInferredSexAnalysis:
def __init__(self, individual_qc_executor_info):
"""
:param individual_qc_executor_info:
"""

Expand Down Expand Up @@ -74,34 +73,199 @@ def filter_variants(self):
good_vars = open(self.chrx_vars, "r")

# Create output file:
individual_id = self.individual_qc_executor_info["id_"]
filtered_vcf_name = individual_id + self.sample_id + "_filtered_variants.vcf.gz"
filtered_vcf_path = os.path.join(self.check_sex_dir, filtered_vcf_name)
LOGGER.debug("Generating VCF filtered for good quality pruned chr X variants: '{}'".format(filtered_vcf_path))

# Check if input VCF FILTER field contains PASS variants:
pass_check_cmd = "bcftools view -i 'FILTER="PASS"' " + vcf_file + " zgrep -vw '#/|CHROM' | wc -l"
pass_vars = execute_bash_command(cmd=pass_check_cmd)
LOGGER.info("Checking for PASS variants in the FILTER field of '{}'".format(vcf_file))

# Annotate input VCF with chr coordinate IDs, filter for chr X pruned variants and, if annotated, for PASS variants:
annotate_and_filter_pass_cmd = "bcftools annotate --set-id '%CHROM\:%POS\:%REF\:%FIRST_ALT' " + vcf_file + "-Oz | bcftools view --include ID==@" + self.chrx_vars + " -Oz | bcftools view -i 'FILTER="PASS"' -Oz -o " + filtered_vcf_path
annotate_and_filter_no_pass_cmd = "bcftools annotate --set-id '%CHROM\:%POS\:%REF\:%FIRST_ALT' " + vcf_file + "-Oz | bcftools view --include ID==@" + self.chrx_vars + " -Oz -o " + filtered_vcf_path
#### NEED TO FIGURE OUT A WAY TO COUNT THE NUMBER OF VARIANTS REMAINING AFTER FILTERING SO I CAN HAVE AN ELIF FOR LOW NO. VARS
if pass_vars == 0:
execute_bash_command(cmd=annotate_and_filter_no_pass_cmd)
LOGGER.debug("Annotating and filtering '{}' for all pruned chr X variants".format(vcf_file))
LOGGER.info("WARNING: no FILTER information available, input data will not be filtered for PASS variants, results may be unreliable. There are '{}'")

execute_bash_command(cmd=annotate_and_filter_pass_cmd)
LOGGER.debug("Annotating and filtering '{}' for chr X pruned PASS variants".format(vcf_file))
LOGGER.info("There are '{}' good quality chr X pruned variants after filtering".format(pass_vars))
for sample_id in self.individual_qc_executor_info["sample_ids"]:
individual_id = self.individual_qc_executor_info["id_"]
filtered_vcf_name = individual_id + sample_id + "_filtered_variants.vcf.gz"
filtered_vcf_path = os.path.join(self.check_sex_dir, filtered_vcf_name)
LOGGER.debug("Generating VCF filtered for good quality pruned chr X variants: '{}'".format(filtered_vcf_path))

# Check if input VCF FILTER field contains PASS variants:
pass_check_cmd = "bcftools view -i 'FILTER="PASS"' " + vcf_file + " zgrep -vw '#/|CHROM' | wc -l"
pass_vars = execute_bash_command(cmd=pass_check_cmd)
LOGGER.info("Checking for PASS variants in the FILTER field of '{}'".format(vcf_file))

# Annotate input VCF with chr coordinate IDs, filter for chr X pruned variants and, if annotated, for PASS variants:
annotate_and_filter_pass_cmd = "bcftools annotate --set-id '%CHROM\:%POS\:%REF\:%FIRST_ALT' " + vcf_file + "-Oz | bcftools view --include ID==@" + self.chrx_vars + " -Oz | bcftools view -i 'FILTER="PASS"' -Oz -o " + filtered_vcf_path
annotate_and_filter_no_pass_cmd = "bcftools annotate --set-id '%CHROM\:%POS\:%REF\:%FIRST_ALT' " + vcf_file + "-Oz | bcftools view --include ID==@" + self.chrx_vars + " -Oz -o " + filtered_vcf_path
if pass_vars == 0:
execute_bash_command(cmd=annotate_and_filter_no_pass_cmd)
LOGGER.debug("Annotating and filtering '{}' for all pruned chr X variants".format(vcf_file))
LOGGER.info("WARNING: no FILTER information available, input data will not be filtered for PASS variants, results may be unreliable")
else:
execute_bash_command(cmd=annotate_and_filter_pass_cmd)
LOGGER.debug("Annotating and filtering '{}' for chr X pruned PASS variants".format(vcf_file))

# Return filtered VCF path:
return [filtered_vcf_path, filtered_vcf_name]

def variant_check(self, filtered_vcf_path, filtered_vcf_name):
# Calculate number of variants remaining for Plink check-sex/impute-sex after all filtering
variant_check_cmd = "zgrep -vw '#/|CHROM' " + filtered_vcf_path + " | wc -l"
filtered_vars = execute_bash_command(cmd=variant_check_cmd)
LOGGER.info("Checking final number of variants in {}".format(filtered_vcf_name))

# Set thresholds for warning messages - NB, current thresholds are random - will need some testing
if filtered_vars >= 1000:
LOGGER.info("There are '{}' good quality chromosome X LD pruned variants after filtering".format(filtered_vars))
elif filtered_vars >= 500 & filtered_vars < 1000:
LOGGER.info("WARNING: There are only '{}' good quality chromosome X LD pruned variants after filtering, results may be unreliable".format(filtered_vars))
else:
LOGGER.info("WARNING: Poor quality data, there are only '{}' good quality chromosome X LD pruned variants after filtering, results will be unreliable".format(filtered_vars))

def get_individual_sex(self):
"""
Retrieve individual sex for sample if it is available, and recode for Plink input.
:return:
"""
individual_info = open(self.individual_qc_executor_info["info_file"])
individual_info_json = json.load(individual_info)
indsex = individual_info_json['sex']['id']
karsex = individual_info_json['karyotypicSex']
ind_metadata = {}
for sample in self.individual_qc_executor_info["sample_ids"]:
ind_metadata[sample] = {'individualId': individual_info_json['id'], 'individualSex': 0, 'sampleId': 'NA'}

LOGGER.debug("Retrieve sex information and recode")
for sam in individual_info_json['samples']:
if sam['id'] in self.individual_qc_executor_info["sample_ids"]:
ind_metadata[sam['id']]['sampleId'] = sample['id']
if indsex.upper() == 'MALE' or karsex == 'XY':
ind_metadata[sam['id']]['individualSex'] = 1
elif indsex.upper() == 'FEMALE' or karsex == 'XX':
ind_metadata[sam['id']]['individualSex'] = 2
else:
LOGGER.info("Sex information for Individual '{}' (sample '{}') is not available, sex will be inferred".format(sam['id'],sample['id']))

# Check individual information for each sample is present:
for sample, individual_info in ind_metadata.items():
if individual_info['individualSex'] == 0:
LOGGER.warning("No individual information available for sample '{}'".format(individual_info['sampleId']))
else:
LOGGER.info("Individual information for sample '{}' found".format(individual_info['sampleId']))
# Return sex information:
return ind_metadata

def generate_plink_fam_file_input(self):
# Retrieve sex information:
individual_id = self.individual_qc_executor_info["id_"]
ind_metadata = self.get_individual_sex()

# Create a directory for Plink if it does not already exist:
plink_dir = create_output_dir(path_elements[self.check_sex_dir, 'plink_check-sex'])

# Generate a text file to update the Plink fam file with sex information:
sex_info_file_name = individual_id + '_sex_information.txt'
sex_info_path = os.path.join(plink_dir, sex_info_file_name)
sex_info_input = open(sex_info_path, 'w')
LOGGER.debug("Generating text file to update the Plink input sex information: '{}'".format(sex_info_path))

for sample in self.individual_qc_executor_info["sample_ids"]:
sample_id = sample
# Individual sample information:
individual_info = ind_metadata[sample]
# Plink sex update file format: familyId individualId sex
sex_info_input.write(('/t'.join([sample,sample,individual_info['individualSex']])) + '\n')
# Note, for the purpose of this function, individualId replaces familyId to prevent issues where an individual belongs to more than one family
LOGGER.info("Test file generated to update the Plink fam file with sex information: '{}'".format(sex_info_path))

# Return paths of text files generated:
return sex_info_path

def check_sex_plink(self, filtered_vcf_path, plink_path="plink1.9"):
method = "Plink/check-sex"
# Define Plink check-sex methodology:
LOGGER.info("Method: {}".format(method))

# Create a directory for Plink if it does not already exist:
plink_dir = create_output_dir(path_elements[self.check_sex_dir, 'plink_check-sex'])
sex_info = self.generate_plink_fam_file_input()

# Convert input to Plink format:
plink_path = str(plink_path)
for sample_id in self.individual_qc_executor_info["sample_ids"]:
file_prefix = self.individual_qc_executor_info["id_"] + sample_id + "_plink_check-sex_results"
plink_output_prefix = os.path.join(plink_dir, file_prefix)
plink_convert_args = ["--vcf", str(filtered_vcf_path),
"--make-bed",
"--keep-allele-order",
"--update-sex", sex_info_path,
"--split-x", "hg38", "no-fail",
"--out", plink_output_prefix]
plink_convert_cmd = [plink_path] + plink_convert_args
LOGGER.debug("Generating Plink check-sex input files")
plink_input_files = execute_bash_command(plink_convert_cmd)
if plink_convert_args[0] == 0:
plink_files_generated = os.listdir(plink_dir)
LOGGER.info("Files available: '{}':\n{}".format(plink_dir, plink_files_generated))

# Run Plink check-sex analysis:
plink_checksex_args = ["--bfile", plink_output_prefix,
"--read-freq", str(self.chrx_var_frq),
"--check-sex",
"--out", plink_output_prefix]
plink_checksex_cmd = [plink_path] + plink_checksex_args
LOGGER.debug("Performing Plink check-sex")
plink_checksex = execute_bash_command(plink_checksex_cmd)
if plink_checksex_args[0] == 0:
checksex_files_generated = os.listdir(plink_dir)
LOGGER.info("Files available: '{}':\n{}".format(plink_dir, checksex_files_generated))

plink_checksex_path = plink_output_prefix + '.sexcheck'
if os.path.isfile(plink_checksex_path) == False:
LOGGER.error("File '{}' does not exist. Check:\nSTDOUT: '{}'\nSTDERR: '{}'".format(plink_checksex_path, plink_checksex[1], plink_checksex[2]))
raise Exception("File '{}' does not exist. Check:\nSTDOUT: '{}'\nSTDERR: '{}'".format(plink_checksex_path, plink_checksex[1], plink_checksex[2]))
else:
# Return method used and path to the .secheck output file generated by Plink
return [method, plink_checksex]

def impute_sex_plink(self, filtered_vcf_path, plink_path="plink1.9"):
method = "Plink/impute-sex"
# Define Plink impute-sex methodology:
LOGGER.info("Method: {}".format(method))

# Create a directory for Plink if it does not already exist:
plink_dir = create_output_dir(path_elements[self.check_sex_dir, 'plink_impute-sex'])

# Convert input to Plink format:
plink_path = str(plink_path)
for sample_id in self.individual_qc_executor_info["sample_ids"]:
file_prefix = self.individual_qc_executor_info["id_"] + sample_id + "_plink_impute_sex_results"
plink_output_prefix = os.path.join(plink_dir, file_prefix)
plink_convert_args = ["--vcf", str(filtered_vcf_path),
"--make-bed",
"--keep-allele-order",
"--split-x", "hg38", "no-fail",
"--out", plink_output_prefix]
plink_convert_cmd = [plink_path] + plink_convert_args
LOGGER.debug("Generating Plink impute-sex input files")
plink_input_files = execute_bash_command(plink_convert_cmd)
if plink_convert_args[0] == 0:
plink_files_generated = os.listdir(plink_dir)
LOGGER.info("Files available: '{}':\n{}".format(plink_dir, plink_files_generated))

# Run Plink impute-sex analysis:
plink_imputesex_args = ["--bfile", plink_output_prefix,
"--read-freq", str(self.chrx_var_frq),
"--impute-sex",
"--out", plink_output_prefix]
plink_imputesex_cmd = [plink_path] + plink_imputesex_args
LOGGER.debug("Performing Plink impute-sex")
plink_imputesex = execute_bash_command(plink_imputesex_cmd)
if plink_imputesex_args[0] == 0:
imputesex_files_generated = os.listdir(plink_dir)
LOGGER.info("Files available: '{}':\n{}".format(plink_dir, imputesex_files_generated))

plink_imputesex_path = plink_output_prefix + '.sexcheck'
if os.path.isfile(plink_imputesex_path) == False:
LOGGER.error("File '{}' does not exist. Check:\nSTDOUT: '{}'\nSTDERR: '{}'".format(plink_imputesex_path, plink_imputesex[1], plink_imputesex[2]))
raise Exception("File '{}' does not exist. Check:\nSTDOUT: '{}'\nSTDERR: '{}'".format(plink_imputesex_path, plink_imputesex[1], plink_imputesex[2]))
else:
# Return method used and path to the .secheck output file generated by Plink
return [method, plink_imputesex]








0 comments on commit 9055601

Please sign in to comment.