-
Notifications
You must be signed in to change notification settings - Fork 102
/
merge_arbitrary_coverage_files.py
executable file
·78 lines (59 loc) · 2.58 KB
/
merge_arbitrary_coverage_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python
import os
import glob
import time
import gzip
import argparse
import sys
def merge_coverage_files(basename):
print(f"Merging Bismark coverage files into a file called '{basename}.cov.gz'")
cov_files = glob.glob("*.cov.gz")
if not cov_files:
print("Error: No files ending in '.cov.gz' found in the current folder.")
sys.exit(1)
allcov = {} # overarching dictionary
for file in cov_files:
print(f"Reading methylation calls from file: {file}")
isGzip = False
if file.endswith("gz"):
infile = gzip.open(file, 'rt') # mode is 'rt' for text mode
isGzip = True
else:
infile = open(file)
for line in infile:
if isGzip:
line = line.rstrip() # no need to decode if using 'rt' mode
else:
line = line.rstrip()
chrom, pos, m, u = [line.split(sep="\t")[i] for i in (0, 1, 4, 5)] # list comprehension
if chrom in allcov.keys():
pass
else:
allcov[chrom] = {}
pos = int(pos)
if pos in allcov[chrom].keys():
pass
else:
allcov[chrom][pos] = {}
allcov[chrom][pos]["meth"] = 0
allcov[chrom][pos]["unmeth"] = 0
allcov[chrom][pos]["meth"] += int(m)
allcov[chrom][pos]["unmeth"] += int(u)
infile.close()
print("Now printing out a new, merged coverage file")
with gzip.open(f"{basename}.cov.gz", "wt") as out:
for chrom in sorted(allcov.keys()):
for pos in sorted(allcov[chrom].keys()):
perc = ''
if (allcov[chrom][pos]['meth'] + allcov[chrom][pos]['unmeth'] == 0):
print("Both methylated and unmethylated positions were 0. Exiting...")
sys.exit()
else:
perc = allcov[chrom][pos]['meth'] / (allcov[chrom][pos]['meth'] + allcov[chrom][pos]['unmeth']) * 100
out.write(f"{chrom}\t{pos}\t{pos}\t{perc:.2f}\t{allcov[chrom][pos]['meth']}\t{allcov[chrom][pos]['unmeth']}\n")
print(f"All done! The merged coverage file '{basename}.cov.gz' has been created.\n")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Merge Bismark coverage files into a file called "basename.cov.gz".')
parser.add_argument('-b', '--basename', default='merged_coverage_file', help='The basename for the merged coverage file.')
args = parser.parse_args()
merge_coverage_files(args.basename)