-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpv.py
179 lines (143 loc) · 6.72 KB
/
rpv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/python
# -*- coding: utf-8 -*-
## ------- Descriptions ------- ##
##
## Date : 22/02/2022
## Auteur : Atchon Alban
## email : [email protected]
## github : https://github.com/ATCHON
## Version : 0.1.0
##
## ------------------------------- ##
__version__ = "0.1.0"
import shlex
import subprocess
from typing import List, Optional
import pandas as pd
import typer
from pathlib import Path
WK_DIR = Path(__file__).resolve().parent
TEMP_DIR = Path.cwd() / "temp"
TEMP_DIR.mkdir(exist_ok=True)
BLAT_EXE = WK_DIR / "content" / "src" / "blat"
FILTER_BLAST = WK_DIR / "content" / "src" / "filter_blast.py"
app = typer.Typer()
def version_callback(value: bool):
if value:
typer.echo(f"RPV Version: {__version__}")
raise typer.Exit()
def run_blat(fasta, db, identity=int, coverage=int):
"""
run_blat : Function running BLAT (Blast-Like Alignment Tool), for the identification of genes of interest.
Args: fasta (Fasta file): Assembly sequence of the strain of interest.
db (Multi-fasta file): Database => ResFinder database, PlasmidFinder database or VirulenceFactor database
Returns: tsv, Résultat temporaire de l'identification des gènes via Blat.
"""
blat_cmd = f"{BLAT_EXE} -fine -minIdentity={identity} -out=blast8 {db} {fasta} {TEMP_DIR / 'tmp.blat'}"
subprocess.run(shlex.split(blat_cmd))
flt_cmd = f"python3 {FILTER_BLAST} -r {db} {TEMP_DIR / 'tmp.blat'} -i {identity} -c {coverage} -o {TEMP_DIR / 'tmp.tsv'}"
subprocess.run(shlex.split(flt_cmd))
return pd.read_csv(f"{TEMP_DIR / 'tmp.tsv'}", sep='\t')
def sort_dataframe(df):
df = df[['pident'] + [col for col in df.columns if col != 'pident']]
df = df[['gene'] + [col for col in df.columns if col != 'gene']]
df = df[['Sample'] + [col for col in df.columns if col != 'Sample']]
df = df.iloc[:, :-8]
df = df.reset_index(drop=True)
return df
def manage_resistance_df(df):
df['gene'] = df['saccver'].str.split('_', expand=True)[0]
df = df.drop(['saccver', 'descref'], axis=1)
df = sort_dataframe(df)
return df
def manage_virulence_df(df):
df['gene'] = df['descref'].str.split(' ', expand=True, )[1].str.replace(r'(', '', regex=True).str.replace(r')', '',
regex=True)
df['description'] = df['descref'].str.split(' ', 2, expand=True)[2]
df = df.drop(['saccver', 'descref'], axis=1)
df = df[['description'] + [col for col in df.columns if col != 'description']]
df = sort_dataframe(df)
return df
def manage_plasmid_df(df):
df['gene'] = df['saccver'].str.split('_', expand=True)[0]
df['description'] = df['descref'].str.split('_', 2, expand=True)[2]
df = df.drop(['saccver', 'descref'], axis=1)
# df = df[['description'] + [col for col in df.columns if col != 'description']]
df = sort_dataframe(df)
return df
# @app.command('run')
def main(database, sequences, identity, coverage):
"""
main : Execute search on database and add sample name on dataframe
Args:
identity: Sequences identity percent.
database : Database of nucleotide sequences in fasta format without duplicate sequences.
sequences : Nucleotide sequences from read assemblies.
Returns: dataframe
"""
df = None
for nb_strain, sequence in enumerate(sequences, start=1):
seq_name = Path(sequence).name
typer.echo(f"Search genes for sample {seq_name} .... N°{nb_strain}")
df_temp = run_blat(sequence, database, identity, coverage)
df_temp['Sample'] = str(seq_name.split('.')[0])
df = pd.concat([df, df_temp]) if df is not None else df_temp
typer.echo('--' * 20)
return df
@app.command(help="Search resistance genes")
def resistance(database: Path = typer.Argument(..., help="ResFinder Database fasta format."),
output: Optional[Path] = typer.Option(None, help="Output file path of the result (.tsv)."),
identity: int = typer.Option(90, max=100, help="Minimum percentage of sequence identity."),
coverage: int = typer.Option(80, max=100, help="Minimum percentage of sequence identity."),
sequences: List[Path] = typer.Argument(..., help="Fasta format sequences.")):
"""
Search for resistance genes via the Resfinder database
"""
df = main(database=database, sequences=sequences, identity=identity, coverage=coverage)
if df.empty is not True: # type: ignore
df = manage_resistance_df(df)
typer.echo(df)
if output:
df.to_csv(Path(output), sep='\t')
else:
typer.echo("No resistance genes have been identified")
@app.command(help="Search plasmid genes")
def plasmid(database: Path = typer.Argument(..., help="PlasmidFinder Database fasta format."),
output: Optional[Path] = typer.Option(None, help="Output file path of the result (.tsv)."),
identity: int = typer.Option(90, max=100, help="Minimum percentage of sequence identity."),
coverage: int = typer.Option(80, max=100, help="Minimum percentage of sequence identity."),
sequences: List[Path] = typer.Argument(..., help="Fasta format sequences.")):
"""
Search for plasmid genes via the PlasmidFinder database
"""
df = main(database=database, sequences=sequences, identity=identity, coverage=coverage)
if df.empty is not True: # type: ignore
df = manage_plasmid_df(df)
typer.echo(df)
if output:
df.to_csv(Path(output), sep='\t')
else:
typer.echo("No plasmid gene has been identified")
@app.command(help="Search virulence genes")
def virulence(database: Path = typer.Argument(..., help="VFDB (Virulence Factor Database) fasta format."),
output: Optional[Path] = typer.Option(None, help="Output file path of the result (.tsv)."),
identity: int = typer.Option(90, max=100, help="Minimum percentage of sequence identity."),
coverage: int = typer.Option(80, max=100, help="Minimum percentage of sequence identity."),
sequences: List[Path] = typer.Argument(..., help="Fasta format sequences.")):
"""
Search for virulence genes via the Virulence Factor database
"""
df = main(database=database, sequences=sequences, identity=identity, coverage=coverage)
if df.empty is not True: # type: ignore
df = manage_virulence_df(df)
typer.echo(df)
if output:
df.to_csv(Path(output), sep='\t')
else:
typer.echo("No virulence genes have been identified")
@app.callback()
def version_call(version: Optional[bool] = typer.Option(
None, "--version", callback=version_callback)):
typer.echo("")
if __name__ == "__main__":
app()