-
Notifications
You must be signed in to change notification settings - Fork 2
/
merge_CP_DP.py
203 lines (169 loc) · 7.32 KB
/
merge_CP_DP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
This file holds the functions necessary for merging IDR_stream CP and DP outputs.
As the IDR_stream output follows the pycytominer output format, these merge functions should work for any pycytominer output data.
"""
import pandas as pd
import pathlib
import math
import uuid
def full_loc_map(dp_coord: tuple, cp_image_data_locations: pd.Series) -> tuple:
"""
helper function for merge_CP_DP_batch_data
get cp_coord from cp_image_data_locations that is closest to dp_coord
Parameters
----------
dp_coord : tuple
dp coord to find closest cp coord for
cp_image_data_locations : pd.Series
series of cp coords to get closest one from
Returns
-------
tuple
closest cp_coord to given dp_coord
"""
return min(
cp_image_data_locations,
key=lambda cp_coord: math.hypot(
cp_coord[0] - dp_coord[0], cp_coord[1] - dp_coord[1]
),
)
def merge_CP_DP_batch_data(
cp_batch_data: pd.DataFrame, dp_batch_data: pd.DataFrame, add_cell_uuid: bool = True
) -> pd.DataFrame:
"""
merge dataframes for IDR_stream output with CP and DP features
the two features dataframes should have aligned location metadata (plate, well, frame, etc) and the same number of rows (cells)
Parameters
----------
cp_batch_data : pd.DataFrame
idrstream_cp batch output
dp_batch_data : pd.DataFrame
idrstream_dp batch output
add_cell_uuid : bool
whether or not to add a uuid for each cell to the final merged dataframe
Returns
-------
pd.DataFrame
merged batch data with metadata, CP features, and DP features
Raises
------
IndexError
cp and dp dataframes have different number of rows (cells)
"""
# covert x and y coordiantes to integers
cp_batch_data[["Location_Center_X", "Location_Center_Y"]] = cp_batch_data[
["Location_Center_X", "Location_Center_Y"]
].astype(int)
dp_batch_data[["Location_Center_X", "Location_Center_Y"]] = dp_batch_data[
["Location_Center_X", "Location_Center_Y"]
].astype(int)
# check batch data have same number of rows (cells)
# if batch data have different number of cells, raise an error because they must not have close segmentations
if cp_batch_data.shape[0] != dp_batch_data.shape[0]:
raise IndexError("Batch data have different number of rows (cells)!")
# hide warning for pandas chained assignment
# this hides the warnings produced by main necessary chained assingments with pandas (can't use .iloc[] for some operations)
pd.options.mode.chained_assignment = None
# get cp and dp column names
cp_columns = cp_batch_data.columns
dp_columns = dp_batch_data.columns
# get metadata columns (columns that show up in both dataframes)
metadata_columns = [col for col in cp_columns if col in dp_columns]
# remove metadata columns from cp and dp columns
cp_columns = set(cp_columns) - set(metadata_columns)
dp_columns = set(dp_columns) - set(metadata_columns)
# add CP and DP prefixes to their respective columns
cp_batch_data = cp_batch_data.rename(
columns={col: f"CP__{col}" for col in cp_columns}
)
dp_batch_data = dp_batch_data.rename(
columns={col: f"DP__{col}" for col in dp_columns}
)
# Raise an error if Metadata_DNA not in cp_batch_data
if "Metadata_DNA" not in cp_batch_data.columns:
raise IndexError("Metadata_DNA not found in CP batch data!")
# get each image path because cells within the same image are trying to be associated
image_paths = cp_batch_data["Metadata_DNA"].unique()
# list to compile merged dataframes from each image
compiled_merged_data = []
# iterate through each image to focus on matching cell positions (x,y)
for image_path in image_paths:
# only work with data from the image of interest
cp_image_data = cp_batch_data.loc[cp_batch_data["Metadata_DNA"] == image_path]
dp_image_data = dp_batch_data.loc[dp_batch_data["Metadata_DNA"] == image_path]
# create a location column with x and y coordinates as tuple
cp_image_data["Full_Location"] = list(
zip(
cp_image_data["Location_Center_X"],
cp_image_data["Location_Center_Y"],
)
)
dp_image_data["Full_Location"] = list(
zip(
dp_image_data["Location_Center_X"],
dp_image_data["Location_Center_Y"],
)
)
# make location for dp match the closest cp location (distance minimized with hypotenuse)
dp_image_data["Full_Location"] = dp_image_data["Full_Location"].map(
lambda dp_coord: full_loc_map(dp_coord, cp_image_data["Full_Location"])
)
# drop metadata columns from DP before merge
dp_image_data = dp_image_data.drop(columns=metadata_columns)
# merge cp and dp data on location
merged_image_data = pd.merge(cp_image_data, dp_image_data, on="Full_Location")
# remove of full location column
merged_image_data = merged_image_data.drop(columns=["Full_Location"])
# add merged image data to the compilation list
compiled_merged_data.append(merged_image_data)
# show warning again (if other methods should be showing this error)
pd.options.mode.chained_assignment = "warn"
# compile merged data into one dataframe with concat and reset index for compiled dataframe
compiled_merged_data = pd.concat(compiled_merged_data).reset_index(drop=True)
# add cell uuid to merged data to give each cell a unique identifier
if add_cell_uuid:
cell_uuids = [uuid.uuid4() for _ in range(compiled_merged_data.shape[0])]
compiled_merged_data.insert(loc=0, column="Cell_UUID", value=cell_uuids)
return compiled_merged_data
def save_merged_CP_DP_run(
cp_data_dir_path: pathlib.Path,
dp_data_dir_path: pathlib.Path,
merged_data_dir_path: pathlib.Path,
):
"""
merge CP and DP IDR_stream outputs into one set of batch files
Parameters
----------
cp_data_dir_path : pathlib.Path
path to directory with IDR_stream CP batch output files
dp_data_dir_path : pathlib.Path
path to directory with IDR_stream DP batch output files
merged_data_dir_path : pathlib.Path
path to directory to save merged batch output files
"""
# create merged data directory if it doesn't already exist
merged_data_dir_path.mkdir(parents=True, exist_ok=True)
# iterate through all batch files in cp data directory
for cp_batch_data_path in sorted(cp_data_dir_path.iterdir()):
# load cp batch data
cp_batch_data = pd.read_csv(
cp_batch_data_path,
compression="gzip",
index_col=0,
)
# load dp batch data
dp_batch_data_path = pathlib.Path(
f"{dp_data_dir_path}/{cp_batch_data_path.name}"
)
dp_batch_data = pd.read_csv(
dp_batch_data_path,
compression="gzip",
index_col=0,
)
# get merged batch data
merged_batch_data = merge_CP_DP_batch_data(cp_batch_data, dp_batch_data)
# save merged batch data
merged_batch_data_path = pathlib.Path(
f"{merged_data_dir_path}/{cp_batch_data_path.name}"
)
merged_batch_data.to_csv(merged_batch_data_path, compression="gzip")