Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modularize explore #91

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions anglerfish/demux/adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ class Adaptor:
def __init__(
self,
name: str,
adaptors: dict,
i7_sequence_token: str,
i5_sequence_token: str,
i7_index: str | None = None,
i5_index: str | None = None,
):
self.name: str = name
self.i5 = AdaptorPart(
sequence_token=adaptors[name]["i5"], name=name, index_seq=i5_index
)
self.i7 = AdaptorPart(
sequence_token=adaptors[name]["i7"], name=name, index_seq=i7_index
sequence_token=i7_sequence_token, name=name, index_seq=i7_index
)
self.i5 = AdaptorPart(
sequence_token=i5_sequence_token, name=name, index_seq=i5_index
)

def get_fastastring(self, insert_Ns: bool = True) -> str:
Expand Down Expand Up @@ -265,6 +266,12 @@ def load_adaptors(raw: bool = False) -> list[Adaptor] | dict:
# By default, return list of Adaptor objects
else:
adaptors = []
for adaptor_name in adaptors_dict:
adaptors.append(Adaptor(name=adaptor_name, adaptors=adaptors_dict))
for adaptor_name, adaptor_parts_dict in adaptors_dict.items():
adaptors.append(
Adaptor(
name=adaptor_name,
i7_sequence_token=adaptor_parts_dict["i7"],
i5_sequence_token=adaptor_parts_dict["i5"],
)
)
return adaptors
5 changes: 3 additions & 2 deletions anglerfish/demux/samplesheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ def __init__(self, input_csv: str, ont_barcodes_enabled: bool):
sample_name,
Adaptor(
name=row["adaptors"],
adaptors=ADAPTORS,
i5_index=i5_index,
i7_sequence_token=ADAPTORS[row["adaptors"]]["i7"],
i5_sequence_token=ADAPTORS[row["adaptors"]]["i5"],
i7_index=i7_index,
i5_index=i5_index,
),
row["fastq_path"],
ont_barcode,
Expand Down
219 changes: 162 additions & 57 deletions anglerfish/explore/explore.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
import uuid
Expand Down Expand Up @@ -26,56 +27,101 @@ def run_explore(
umi_threshold: float,
kmer_length: int,
):
# Setup a run directory
run_uuid = str(uuid.uuid4())
try:
os.mkdir(outdir)
except FileExistsError:
log.info(f"Output directory {outdir} already exists")
if not use_existing:
log.error(
f"Output directory {outdir} already exists, please use --use_existing to continue"
)
exit(1)
else:
pass
results, adaptors_included, entries, umi_threshold, kmer_length, outdir = (
_run_explore(
fastq,
outdir,
threads,
use_existing,
good_hit_threshold,
insert_thres_low,
insert_thres_high,
minimap_b,
min_hits_per_adaptor,
umi_threshold,
kmer_length,
)
)

results = check_for_umis(
results, adaptors_included, entries, umi_threshold, kmer_length, outdir
)

explore_stats_file = os.path.join(outdir, "anglerfish_explore_stats.json")
# Save results to a JSON file
with open(explore_stats_file, "w") as f:
json.dump(results, f, indent=4)

log.info(f"Results saved to {explore_stats_file}")


def _run_explore(
remiolsen marked this conversation as resolved.
Show resolved Hide resolved
remiolsen marked this conversation as resolved.
Show resolved Hide resolved
fastq: str,
outdir: str,
threads: int,
use_existing: bool,
good_hit_threshold: float,
insert_thres_low: int,
insert_thres_high: int,
minimap_b: int,
min_hits_per_adaptor: int,
umi_threshold: float,
kmer_length: int,
):
run_uuid = str(uuid.uuid4())
log.info("Running anglerfish explore")
log.info(f"Run uuid {run_uuid}")

adaptors = cast(list[Adaptor], load_adaptors())
adaptors_and_aln_paths: list[tuple[Adaptor, str]] = []
setup_explore_directory(outdir, use_existing)

# Map all reads against all adaptors
for adaptor in adaptors:
# Align
aln_path = os.path.join(outdir, f"{adaptor.name}.paf")
adaptors_and_aln_paths.append((adaptor, aln_path))
if os.path.exists(aln_path) and use_existing:
log.info(f"Skipping {adaptor.name} as alignment already exists")
continue
adaptor_path = os.path.join(outdir, f"{adaptor.name}.fasta")
with open(adaptor_path, "w") as f:
f.write(adaptor.get_fastastring(insert_Ns=False))
# Create a results dictionary which is the skeleton for json output
results = {
"run_uuid": run_uuid,
"timestamp": pd.Timestamp.now().isoformat(),
"parameters": {
"fastq": fastq,
"outdir": outdir,
"threads": threads,
"use_existing": use_existing,
"good_hit_threshold": good_hit_threshold,
"insert_thres_low": insert_thres_low,
"insert_thres_high": insert_thres_high,
"minimap_b": minimap_b,
"min_hits_per_adaptor": min_hits_per_adaptor,
"umi_threshold": umi_threshold,
"kmer_length": kmer_length,
},
"included_adaptors": {},
"excluded_adaptors": {},
}

log.info(f"Aligning {adaptor.name}")
run_minimap2(
fastq_in=fastq,
index_file=adaptor_path,
output_paf=aln_path,
threads=threads,
minimap_b=minimap_b,
)
adaptors = cast(list[Adaptor], load_adaptors())

adaptors_and_aln_paths = run_multiple_alignments(
fastq,
outdir,
threads,
use_existing,
adaptors,
minimap_b,
)

# Parse alignments
entries: dict = {}
adaptors_included = []
for adaptor, aln_path in adaptors_and_aln_paths:
log.info(f"Parsing {adaptor.name}")
adaptor_data = {"name": adaptor.name, "i5": {}, "i7": {}}
reads_alns: dict[str, list[Alignment]] = map_reads_to_alns(
aln_path, complex_identifier=True
)

if len(reads_alns) == 0:
log.info(
f"No hits for {adaptor.name} in alignment file (perhaps the read file was mis-formatted?)"
)
continue

# Choose only the highest scoring alignment for each combination of read, adaptor end and strand
reads_to_highest_q_aln_dict: dict[str, dict] = {}
for aln_name, alns in reads_alns.items():
Expand Down Expand Up @@ -130,8 +176,16 @@ def run_explore(
)
df_good_hits = df_mim[requirements]

median_insert_length = df_good_hits["insert_len"].median()
insert_lengths = df_good_hits["insert_len"].value_counts()

if len(insert_lengths) == 0:
median_insert_length = None
insert_lengths_histogram = {}
else:
median_insert_length = df_good_hits["insert_len"].median()
insert_lengths_histogram = insert_lengths[
sorted(insert_lengths.index)
].to_dict()
else:
m_re_cs = r"^cs:Z::([1-9][0-9]*)$"
df_good_hits = df[df.cg.str.match(m_re_cs)]
Expand All @@ -148,6 +202,7 @@ def run_explore(
df_good_hits = df_good_hits[df_good_hits["match_1_len"] >= thres]

median_insert_length = None
insert_lengths_histogram = {}
insert_lengths = None

# Filter multiple hits per read and adaptor end
Expand All @@ -164,46 +219,96 @@ def run_explore(
f"{adaptor.name}:{adaptor_end_name} had {len(df_good_hits)} good hits."
)

# Collect data for the results dictionary
adaptor_data[adaptor_end_name] = {
"nr_good_hits": len(df_good_hits),
"index_length": median_insert_length,
"insert_lengths_histogram": insert_lengths_histogram,
"relative_entropy": {},
}

if min(nr_good_hits.values()) >= min_hits_per_adaptor:
log.info(f"Adaptor {adaptor.name} is included in the analysis")
results["included_adaptors"][adaptor.name] = adaptor_data
adaptors_included.append(adaptor)
else:
results["excluded_adaptors"][adaptor.name] = adaptor_data
log.info(f"Adaptor {adaptor.name} is excluded from the analysis")

# Print insert length info for adaptor types included in the analysis
return results, adaptors_included, entries, umi_threshold, kmer_length, outdir


def setup_explore_directory(outdir: str, use_existing: bool):
# Setup a run directory

try:
os.mkdir(outdir)
except FileExistsError:
log.info(f"Output directory {outdir} already exists")
if not use_existing:
log.error(
f"Output directory {outdir} already exists, please use --use_existing to continue"
)
exit(1)
else:
pass


def run_multiple_alignments(
fastq: str,
outdir: str,
threads: int,
use_existing: bool,
adaptors: list[Adaptor],
minimap_b: int,
) -> list[tuple[Adaptor, str]]:
adaptors_and_aln_paths: list[tuple[Adaptor, str]] = []

# Map all reads against all adaptors
for adaptor in adaptors:
# Align
aln_path = os.path.join(outdir, f"{adaptor.name}.paf")
adaptors_and_aln_paths.append((adaptor, aln_path))
if os.path.exists(aln_path) and use_existing:
log.info(f"Skipping {adaptor.name} as alignment already exists")
continue
adaptor_path = os.path.join(outdir, f"{adaptor.name}.fasta")
with open(adaptor_path, "w") as f:
f.write(adaptor.get_fastastring(insert_Ns=False))

log.info(f"Aligning {adaptor.name}")
run_minimap2(
fastq_in=fastq,
index_file=adaptor_path,
output_paf=aln_path,
threads=threads,
minimap_b=minimap_b,
)
return adaptors_and_aln_paths


def check_for_umis(
results, adaptors_included, entries, umi_threshold, kmer_length, outdir
):
# Traverse the adaptors to include and check whether they need UMI entropy analysis
for adaptor in adaptors_included:
adaptor_data = results["included_adaptors"][adaptor.name]

for adaptor_end_name, adaptor_end in zip(
["i5", "i7"], [adaptor.i5, adaptor.i7]
):
df_good_hits = entries[adaptor.name][adaptor_end_name]
if adaptor_end.has_index:
median_insert_length = df_good_hits["insert_len"].median()

if median_insert_length > umi_threshold:
# Calculate entropies here
entropies = calculate_relative_entropy(
df_good_hits, kmer_length, median_insert_length
)
entropy_file = os.path.join(
outdir, f"{adaptor.name}_{adaptor_end_name}.entropy.csv"
)
pd.Series(entropies).to_csv(entropy_file, float_format="%.2f")
log.info(
f"{adaptor.name}:{adaptor_end_name}, relative entropy for k={kmer_length}, over the index saved to {entropy_file}"
)
insert_lengths = df_good_hits["insert_len"].value_counts()
adaptor_data[adaptor_end_name]["relative_entropy"] = entropies
log.info(
f"{adaptor.name}:{adaptor_end_name} had {len(df_good_hits)} good hits with median insert length {median_insert_length}"
)
histogram_file = os.path.join(
outdir, f"{adaptor.name}_{adaptor_end_name}.hist.csv"
)
insert_lengths[sorted(insert_lengths.index)].to_csv(histogram_file)
log.info(
f"{adaptor.name}:{adaptor_end_name} insert length histogram saved {histogram_file}"
)
else:
median_insert_length = None
insert_lengths = None
log.info(
f"{adaptor.name}:{adaptor_end_name} had {len(df_good_hits)} good hits (no insert length since no index)"
)

return results
Empty file added tests/__init__.py
Empty file.
Loading
Loading