forked from ljmartin/dockop
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ray_fp_pipe_prototypever2.py
125 lines (108 loc) · 3.96 KB
/
ray_fp_pipe_prototypever2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
import copy
import pyarrow as pa
from pyarrow import csv
import pandas as pd
import pyarrow.feather as feather
from rdkit.Chem import rdMolDescriptors
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit import DataStructs
import numpy as np
from datetime import timedelta
from timeit import time
from scipy import sparse
import pyarrow.parquet as pq
import ray
import pyarrow.csv
def next_path(path_pattern):
i = 1
while os.path.exists(path_pattern % i):
i = i * 2
a, b = (i // 2, i)
while a + 1 < b:
c = (a + b) // 2 # interval midpoint
a, b = (c, b) if os.path.exists(path_pattern % c) else (a, c)
return path_pattern % b
def gen_fp(mols, pars, fingerprint_function, scores_list, job_count):
mols_b = copy.deepcopy(mols)
row_idx = list()
col_idx = list()
count=0
for mol in mols_b:
fp = fingerprint_function(mol, **pars)
onbits = list(fp.GetOnBits())
col_idx+=onbits
row_idx += [count]*len(onbits)
count+=1
print(len(row_idx))
print(len(col_idx))
print(count)
unfolded_size = 8192
fingerprint_matrix = sparse.coo_matrix((np.ones(len(row_idx)).astype(bool), (row_idx, col_idx)),
shape=(max(row_idx)+1, unfolded_size))
fingerprint_matrix = sparse.csr_matrix(fingerprint_matrix)
target_directory = '/data/dockop_data/processed_data'
print(str(job_count))
name = os.path.join(target_directory,'processed_data'+str(job_count))
print(name)
sparse.save_npz(name+'.npz' , fingerprint_matrix)
np.save(name+'.npy', np.array(scores_list) )
print('files_saved')
return name
@ray.remote
def count_ligs(chunk, job_count):
print(job_count)
print(type(chunk))
print(chunk.schema)
table_batch = chunk
smiles = list(table_batch.column('smiles'))
scores = list(table_batch.column('dockscore'))
fingerprint_function = rdMolDescriptors.GetMorganFingerprintAsBitVect
pars = { "radius": 2,
"nBits": 8192,
"invariants": [],
"fromAtoms": [],
"useChirality": False,
"useBondTypes": True,
"useFeatures": True,
}
fingerprint_function = rdMolDescriptors.GetMorganFingerprintAsBitVect
print(f'the number of smiles in the record batch is {len(smiles)}')
count_ligs = len(smiles)
smiles = [x for x in smiles]
mols = []
scores_list = []
for count,m in enumerate(smiles):
try:
mol = Chem.MolFromSmiles(str(m))
score = scores[count]
scores_list.append(score)
mols.append(mol)
except:
print('molecule failed')
name_path = gen_fp(mols, pars, fingerprint_function, scores_list, job_count)
return name_path
def csv_chunk_extractor(chunksize, include_columns):
#select the CSV of interest.
filename = '/data/dockop_data/AmpC_screen_table.csv'
#open_csv is single threaded, so usethreads doesn't do anthing.
opts = pa.csv.ReadOptions(use_threads=True, block_size=chunksize)
#Choose the correc delimiter
parse_options= pa.csv.ParseOptions(delimiter=',')
#Only read in needed columns. This saves memory
convert_options=pa.csv.ConvertOptions(include_columns=include_columns)
table = pa.csv.open_csv(filename, opts, parse_options, convert_options)
#We use futures to act as a list of results from each arrow batch. arr is
#the numpy array with the reference fingerprint.
futures = [count_ligs.remote(chunk,count) for count,chunk in enumerate(table)]
#We use this to deserialize results from the rayworkers.
results = [ray.get(f) for f in futures]
print(f'here is the type of a slice of the results: {results}')
return results
ray.init(num_cpus=3)
pyarrow.set_cpu_count(3)
include_columns = ['smiles', 'dockscore']
chunksize = 1048576*10
table_list = csv_chunk_extractor(chunksize, include_columns)
print(results)