Skip to content

Commit

Permalink
refactor of parser with batch processing and gene id cache
Browse files Browse the repository at this point in the history
  • Loading branch information
NikkiBytes committed Apr 1, 2024
1 parent c55c39f commit 55ef287
Showing 1 changed file with 136 additions and 76 deletions.
212 changes: 136 additions & 76 deletions src/plugins/orthology_agr/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@
import os
from biothings_client import get_client
import biothings.utils.dataload as dl
import time
import logging
import json

def setup_release(self):
release="2019-06-23"
return release

logging.basicConfig(filename='test_run_10k.log', filemode='w', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')


# def setup_release(self):
# release="2019-06-23"
# return release


def convert_score(score):
Expand All @@ -24,86 +32,138 @@ def convert_score(score):
else:
return False;

def get_entrez_id(gene_id, gene_client, bad_queries):
"""
Use the biothings_client package to query the mygene.info db for
the corresponding entrezgene(NCBI gene) ID for the given gene_id
gene_id: given id to search on
gene_client: initialized object for connection to mygene.info
"""
#gene_client = get_client('gene') # initialize mygene object
if "WB:" in gene_id: gene_id=gene_id.replace("WB:", "WormBase:")
if "FB:" in gene_id: gene_id=gene_id.replace("FB:", "FLYBASE:")
if "MGI:" in gene_id: gene_id=gene_id.replace("MGI:", "mgi:MGI\:")
#gene=gene_client.getgene(gene_id, fields='symbol,name')

#print("[INFO] searching for gene id ", gene_id)
gene=gene_client.query(gene_id, fields='symbol,name') # get search results
# check if gene id was not found
if not gene["hits"]:
#print("[INFO] Failed query on ID %s "%gene_id)
bad_queries.append(gene_id)
else:
#print("[INFO] Success query on ID %s "%gene_id)
gene_id=gene["hits"][0]["_id"]
def adjust_gene_ids(gene_id_list):
# Adjust prefixes directly in the list
adjustments = {
"WB:": "WormBase:",
"FB:": "FLYBASE:",
"XB": "XenBase:XB",
}

for i, gene_id in enumerate(gene_id_list):
for prefix, replacement in adjustments.items():
if gene_id.startswith(prefix):
gene_id_list[i] = gene_id.replace(prefix, replacement)
break

return gene_id_list

return gene_id;

def get_gene_cache(unique_gene_ids, gene_client, batch_size=1000):
gene_id_cache = {}
first_pass_fail_ct = 0
error_ids = []

def load_orthology(data_folder):
"""
Main Method - data plugin parser for ortholog data from AGR
data_folder: input folder (standard for biothings studio plugin)
"""
# Convert set to list for manipulation
unique_gene_ids=adjust_gene_ids(list(unique_gene_ids))
# Convert back to set if you need to remove duplicates after adjustment
unique_gene_ids = set(unique_gene_ids)

# setup data from the file
print("[INFO] loading orthology AGR data....")
infile = os.path.join(data_folder, "ORTHOLOGY-ALLIANCE_COMBINED.tsv")
assert os.path.exists(infile)
def batch(iterable, n=1):
"""Yield successive n-sized batches from iterable."""
# Convert set to list if iterable is a set
if isinstance(iterable, set):
iterable = list(iterable)

l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]

# Iterate through batches and perform query
for batch_ids in batch(unique_gene_ids, batch_size):
results = gene_client.querymany(
batch_ids,
scopes=["WormBase", "MGI", "FLYBASE", "Xenbase", "WB"],
fields='symbol,name,entrezgene',
verbose=False
)
for result in results:
query_id = result['query']
if 'notfound' in result:
first_pass_fail_ct += 1
# Second pass query -- attempt to get Entrez ID directly
query_res = gene_client.query(query_id, fields='entrezgene')
if query_res.get('hits'):
for hit in query_res['hits']:
if 'entrezgene' in hit:
gene_id_cache[query_id] = hit['entrezgene']
else:
logging.info(f"query ID not found in second individual query: {query_id}")
error_ids.append(query_id)
else:
# Process found queries
# original_gene_id = result.get('_id')
entrez_id = result.get('entrezgene')
# if original_gene_id and entrez_id:
if entrez_id:
gene_id_cache[query_id] = entrez_id
else:
logging.info(f"Missing data for found query: {result.get('query', 'Unknown ID')}")

logging.info("Completed building gene ID cache.")
logging.info(f"First pass fail count: {first_pass_fail_ct}")
logging.info(f"Final error IDs (complete fail): {len(error_ids)}")

return gene_id_cache

def load_orthology(data_folder, sample_limit=None):
"""Main method for parsing ortholog data from AGR."""
start_time = time.time()

infile = os.path.join(data_folder, "ORTHOLOGY-ALLIANCE_COMBINED.tsv.gz")
logging.info(f"Starting upload for orthology AGR from {infile}")
assert os.path.exists(infile), "Input file does not exist."

gene_client = get_client('gene')
records=[]# initialize record list for orthologs
gene_dict={}

bad_queries=[] # initialize gene query ids that return None (empty)

# use dataload from biothings util to load tsv file
for i, x in enumerate(list(dl.tabfile_feeder(infile, header=15, sep="\t"))):
# pull out first row for the column header names
if i == 0: cols_names=x, print('[COLS] %s \n'%" ".join(x))
# if it isn't the first row, continue with id record
else:
# initialize data holders
gene_id=get_entrez_id(x[0], gene_client, bad_queries)
ortholog_id=get_entrez_id(x[4], gene_client, bad_queries)


if gene_id not in gene_dict.keys():

gene_dict[gene_id]={"orthologs": []}

# setup dictionary record
ortholog_dict={
"geneid": ortholog_id,
"symbol": x[5],
"taxid": int(float(x[6].split(":")[1])),
"algorithmsmatch": x[9],
"outofalgorithms": x[10],
"isbestscore": convert_score(x[11]),
"isbestrevscore": convert_score(x[12])
unique_gene_ids = set()

# Collect unique gene IDs
line_count = 0
for line in dl.tabfile_feeder(infile, header=1, sep="\t"):
if sample_limit and line_count >= sample_limit:
break
if len(line) >= 5:
unique_gene_ids.update([line[0], line[4]])
line_count += 1

# Fetch gene IDs in parallel -- use batch with querymany
gene_id_cache = get_gene_cache(unique_gene_ids, gene_client)

# Construct records using fetched gene IDs
records = []
line_count = 0
for line in dl.tabfile_feeder(infile, header=1, sep="\t"):
if sample_limit and line_count >= sample_limit:
break
if len(line) < 5:
continue
gene_id, ortholog_id = gene_id_cache.get(line[0]), gene_id_cache.get(line[4])
if gene_id and ortholog_id:
record = {
"_id": gene_id,
"agr": {
"orthologs": [{
"gene_id": gene_id,
"ortholog_id": ortholog_id,
"symbol": line[5],
"taxid": int(float(line[6].split(":")[1])),
"algorithmsmatch": line[9],
"outofalgorithms": line[10],
"isbestscore": convert_score(line[11]),
"isbestrevscore": convert_score(line[12])
}]
}
}
gene_dict[gene_id]["orthologs"].append(ortholog_dict)
records.append(record)
line_count += 1 # Increment the line counter

gene_list=sorted(gene_dict.items())
elapsed_time = time.time() - start_time
logging.info(f"Completed processing {len(records)} records in {elapsed_time:.2f} seconds.")
if records:
logging.info(f"Sample record: {json.dumps(records[0], indent=2)}")

for d in gene_list:
_id = d[0]
ortholog_dict=d[1]

record= {
"_id": _id,
"agr":ortholog_dict
}
records.append(record)
return records

return records;
# Example usage
data_folder = "/Users/nacosta/Documents/update-ortho-plugin"
load_orthology(data_folder, sample_limit=10100)

0 comments on commit 55ef287

Please sign in to comment.