Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepcell upload loop cleaning #1023

Merged
merged 26 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a242718
loop check for successful deepcell upload
camisowers Jul 11, 2023
125dbbb
Merge branch 'main' into retry_deepcell_upload
camisowers Jul 11, 2023
6c0ce0f
update test zip paths
camisowers Jul 11, 2023
e7c98c1
helper functions pls
camisowers Jul 11, 2023
d5f5d82
zip input, upload, and extract per batch
camisowers Jul 13, 2023
3881f6f
cleaning
camisowers Jul 13, 2023
3924ff7
docstring indent
camisowers Jul 13, 2023
15fbdbe
fov -> fovs file name
camisowers Jul 13, 2023
40abe25
remove overwrite warning
camisowers Jul 13, 2023
d2b176c
new dir for each test call
camisowers Jul 13, 2023
be0f1fb
fix bad logic warning testing
camisowers Jul 14, 2023
433f100
remove arg from docstring
camisowers Jul 14, 2023
4033380
add helper function test
camisowers Jul 17, 2023
61bed7b
batch_num start at 1
camisowers Jul 18, 2023
c4f3ee6
add previously processed warning
camisowers Jul 18, 2023
f1f6e9d
set timeouts and scrap retry
camisowers Jul 18, 2023
b3f1b11
correct timeout errors and update tests
camisowers Jul 24, 2023
eb38c76
timeout 5 mins
camisowers Jul 24, 2023
f40a37d
add to unprocessed list after loop closes
camisowers Jul 24, 2023
3af7ec2
continue loop after extraction
camisowers Jul 24, 2023
05b1a2d
break instead of continue
camisowers Jul 24, 2023
bdb41dd
Merge branch 'main' into retry_deepcell_upload
camisowers Jul 25, 2023
ae68008
skipped processing print fixes
camisowers Jul 25, 2023
dd9d5e1
Merge remote-tracking branch 'origin/retry_deepcell_upload' into retr…
camisowers Jul 25, 2023
2e11f45
adjust print statements
camisowers Jul 25, 2023
9a36fe1
3 seconds before second zip call
camisowers Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 141 additions & 128 deletions src/ark/utils/deepcell_service_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import time
import warnings
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from json import JSONDecodeError
from pathlib import Path
Expand All @@ -11,17 +10,92 @@
import numpy as np
import requests
from alpineer import image_utils, io_utils, load_utils, misc_utils
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
from tifffile import imread
from tqdm.notebook import tqdm
from urllib3 import Retry


def zip_input_files(deepcell_input_dir, fov_group, batch_num):
"""Helper function which handles zipping the batch fov images into a single zip file.

Args:
deepcell_input_dir (str): path to where deepcell input image files are stored
fov_group (list): list of fovs to process in this batch
batch_num (int): the batch number
Returns:
str: path to deepcell input zip file
"""

# write all files to the zip file
zip_path = os.path.join(deepcell_input_dir, f'fovs_batch_{batch_num}.zip')

# create zip files, skip any existing
if not os.path.exists(zip_path):
with ZipFile(zip_path, 'w', compression=ZIP_DEFLATED) as zipObj:
for fov in fov_group:
# file has .tiff extension
basename = fov + '.tiff'
filename = os.path.join(deepcell_input_dir, basename)
zipObj.write(filename, basename)

return zip_path


def extract_deepcell_response(deepcell_output_dir, fov_group, batch_num, wc_suffix, nuc_suffix):
"""Helper function to extract the segmentation masks from the deepcell output zip file.

Args:
deepcell_output_dir (str):
path to where deepcell output zips are stored
fov_group (list):
list of fovs to process in this batch
batch_num (int):
the batch number
wc_suffix (str):
Suffix for whole cell DeepCell output filename. e.g. for fovX, DeepCell output
should be `<fovX>+suffix.tif`.
Whole cell DeepCell files by default get suffixed with `'feature_0'`,
it will be renamed to this arg.
nuc_suffix (str):
Suffix for nuclear DeepCell output filename. e.g. for fovX, DeepCell output
should be `<fovX>+suffix.tif`.
Nuclear DeepCell files by default get suffixed with `'feature_1'`,
it will be renamed to this arg.
"""

# extract the .tif output
batch_zip = os.path.join(
deepcell_output_dir, f"deepcell_response_fovs_batch_{batch_num}.zip")

with ZipFile(batch_zip, "r") as zipObj:
for name in zipObj.namelist():
# this files will only ever be suffixed with feature_0.tiff or feature_1.tiff
if '_feature_0.tif' in name:
resuffixed_name = name.replace('_feature_0', wc_suffix)
else:
resuffixed_name = name.replace('_feature_1', nuc_suffix)

mask_path = os.path.join(deepcell_output_dir, resuffixed_name)

# DeepCell uses .tif extension, append extra f to account for .tiff standard
mask_path += 'f'

# read the file from the .zip file and save as segmentation mask
byte_repr = zipObj.read(name)
ranked_segmentation_mask = (_convert_deepcell_seg_masks(byte_repr)).squeeze()
image_utils.save_image(mask_path, ranked_segmentation_mask)

# verify that all the files were extracted
for fov in fov_group:
if fov + '_feature_0.tif' not in zipObj.namelist():
warnings.warn(f'Deep Cell whole cell output file was not found for {fov}.')
if fov + '_feature_1.tif' not in zipObj.namelist():
warnings.warn(f'Deep Cell nuclear output file was not found for {fov}.')


def create_deepcell_output(deepcell_input_dir, deepcell_output_dir, fovs=None,
wc_suffix='_whole_cell', nuc_suffix='_nuclear',
host='https://deepcell.org', job_type='mesmer',
scale=1.0, timeout=3600, zip_size=5, parallel=False):
scale=1.0, timeout=300, zip_size=5):
"""Handles all of the necessary data manipulation for running deepcell tasks.
Creates .zip files (to be used as input for DeepCell),
calls run_deepcell_task method,
Expand Down Expand Up @@ -57,13 +131,10 @@ def create_deepcell_output(deepcell_input_dir, deepcell_output_dir, fovs=None,
Default: 1.0
timeout (int):
Approximate seconds until timeout.
Default: 1 hour (3600)
Default: 5 minutes (300)
zip_size (int):
Maximum number of files to include in zip.
Default: 100
parallel (bool):
Tries to zip, upload, and extract zip files in parallel
Default: False
Default: 5
Raises:
ValueError:
Raised if there is some fov X (from fovs list) s.t.
Expand Down Expand Up @@ -99,85 +170,49 @@ def create_deepcell_output(deepcell_input_dir, deepcell_output_dir, fovs=None,

print(f'Processing tiffs in {len(fov_groups)} batches...')

# yes this is function, don't worry about it
# long story short, too many args to pass if function not in local scope
# i.e easier to map fov_groups
def _zip_run_extract(fov_group, group_index):
# define the location of the zip file for our fovs
zip_path = os.path.join(deepcell_input_dir, f'fovs_batch_{group_index + 1}.zip')

if os.path.isfile(zip_path):
warnings.warn(f'{zip_path} will be overwritten')

# write all files to the zip file
print('Zipping preprocessed tiff files.')

def zip_write(zip_path):
with ZipFile(zip_path, 'w', compression=ZIP_DEFLATED) as zipObj:
for fov in fov_group:
# file has .tiff extension
basename = fov + '.tiff'
filename = os.path.join(deepcell_input_dir, basename)
zipObj.write(filename, basename)

zip_write(zip_path)

# pass the zip file to deepcell.org
print('Uploading files to DeepCell server.')
status = run_deepcell_direct(
zip_path, deepcell_output_dir, host, job_type, scale, timeout
)

# ensure execution is halted if run_deepcell_direct returned non-zero exit code
if status != 0:
print("The following FOVs could not be processed: %s" % ','.join(fov_group))
return

# extract the .tif output
print("Extracting tif files from DeepCell response.")
zip_names = io_utils.list_files(deepcell_output_dir, substrs=[".zip"])

zip_files = [os.path.join(deepcell_output_dir, name) for name in zip_names]

# sort by newest added
zip_files.sort(key=os.path.getmtime)

with ZipFile(zip_files[-1], "r") as zipObj:
for name in zipObj.namelist():
# this files will only ever be suffixed with feature_0.tiff or feature_1.tiff
if '_feature_0.tif' in name:
resuffixed_name = name.replace('_feature_0', wc_suffix)
else:
resuffixed_name = name.replace('_feature_1', nuc_suffix)

mask_path = os.path.join(deepcell_output_dir, resuffixed_name)
unprocessed_fovs = {}
for batch_num, fov_group in enumerate(fov_groups, start=1):
# create zipped input files
input_zip_path = zip_input_files(deepcell_input_dir, fov_group, batch_num)

# add timeout limit
batch_filename = Path(input_zip_path).name
output_zip_path = os.path.join(deepcell_output_dir, f"deepcell_response_" + batch_filename)
if os.path.exists(output_zip_path):
print(f"Skipping previously processed batch_{batch_num}.")

# upload to deepcell
total_time, status = 0, 0
start = time.time()
while not os.path.exists(output_zip_path) and total_time < timeout:
# pass the zip file to deepcell.org
status = run_deepcell_direct(
input_zip_path, deepcell_output_dir, host, job_type, scale, timeout
)

# DeepCell uses .tif extension, append extra f to account for .tiff standard
mask_path += 'f'
# successful deepcell response
if status == 0:
# extract segmentation masks from deepcell output
extract_deepcell_response(deepcell_output_dir, fov_group, batch_num, wc_suffix,
camisowers marked this conversation as resolved.
Show resolved Hide resolved
nuc_suffix)
break

# read the file from the .zip file and save as segmentation mask
byte_repr = zipObj.read(name)
ranked_segmentation_mask = (_convert_deepcell_seg_masks(byte_repr)).squeeze()
image_utils.save_image(mask_path, ranked_segmentation_mask)
total_time = time.time() - start

# verify that all the files were extracted
for fov in fov_group:
if fov + '_feature_0.tif' not in zipObj.namelist():
warnings.warn(f'Deep Cell whole cell output file was not found for {fov}.')
if fov + '_feature_1.tif' not in zipObj.namelist():
warnings.warn(f'Deep Cell nuclear output file was not found for {fov}.')
if status != 0:
unprocessed_fovs[batch_num] = fov_group
if total_time >= timeout:
print(f"This batch exceeded the allotted processing time of {timeout / 60} minutes "
f"and will be skipped.")

# make calls in parallel
if parallel:
with ThreadPoolExecutor() as executor:
executor.map(_zip_run_extract, fov_groups, range(len(fov_groups)))
executor.shutdown(wait=True)
else:
list(map(_zip_run_extract, fov_groups, range(len(fov_groups))))
if unprocessed_fovs:
camisowers marked this conversation as resolved.
Show resolved Hide resolved
print("\nThe following batches were not processed:")
for batch in unprocessed_fovs.keys():
print(f"fovs_batch_{batch} {unprocessed_fovs[batch]}")


def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
job_type='mesmer', scale=1.0, timeout=3600, num_retries=5):
job_type='mesmer', scale=1.0, timeout=300):
"""Uses direct calls to DeepCell API and saves output to output_dir.

Args:
Expand All @@ -195,9 +230,7 @@ def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
Default: 1.0
timeout (int):
Approximate seconds until timeout.
Default: 1 hour (3600)
num_retries (int):
The maximum number of times to call the Deepcell API in case of failure
Default: 5 minutes (300)
"""

# upload zip file
Expand All @@ -210,46 +243,20 @@ def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
}
f.seek(0)

# define and mount a retry instance to call the Deepcell API again if needed
retry_strategy = Retry(
total=num_retries,
status_forcelist=[404, 500, 502, 503, 504],
allowed_methods=['HEAD', 'GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'TRACE']
)
adapter = HTTPAdapter(max_retries=retry_strategy)

http = requests.Session()
http.mount('https://', adapter)
http.mount('http://', adapter)

total_retries = 0
while total_retries < num_retries:
# handles the case if the main endpoint can't be reached
try:
upload_response = http.post(
upload_url,
timeout=timeout,
files=upload_fields
)
except RetryError as re:
print(re)
return 1

# handles the case if the endpoint returns an invalid JSON
# indicating an internal API error
try:
upload_response = upload_response.json()
except JSONDecodeError as jde:
total_retries += 1
continue

# if we reach the end no errors were encountered on this attempt
break

# if the JSON could not be decoded num_retries number of times
if total_retries == num_retries:
print("The JSON response from DeepCell could not be decoded after %d attempts" %
num_retries)
try:
upload_response = requests.post(
upload_url,
timeout=timeout,
files=upload_fields
)
except (requests.ConnectionError, requests.ReadTimeout) as e:
return 1

# handles the case if the endpoint returns an invalid JSON
# indicating an internal API error
try:
upload_response = upload_response.json()
except JSONDecodeError as jde:
return 1
camisowers marked this conversation as resolved.
Show resolved Hide resolved

# call prediction
Expand All @@ -271,7 +278,8 @@ def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
# check redis every 3 seconds
redis_url = host + '/api/redis'

print('Segmentation progress:')
batch_num = (io_utils.remove_file_extensions([filename])[0]).split("_")[-1]
print(f'Segmentation progress for batch_{batch_num}:')
progress_bar = tqdm(total=100,
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]')

Expand Down Expand Up @@ -303,14 +311,19 @@ def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
total_time += 3
progress_bar.close()

# print timeout message
if total_time >= timeout:
return 1

# when done, download result or examine errors
if len(redis_response['value'][4]) > 0:
# error happened
print(f"Encountered Failure(s): {unquote_plus(redis_response['value'][4])}")
return 1

deepcell_output = requests.get(redis_response['value'][2], allow_redirects=True)

with open(os.path.join(output_dir, "deepcell_response.zip"), mode="wb") as f:
with open(os.path.join(output_dir, "deepcell_response_" + filename), mode="wb") as f:
f.write(deepcell_output.content)

# being kind and sending an expire signal to deepcell
Expand All @@ -319,7 +332,7 @@ def run_deepcell_direct(input_dir, output_dir, host='https://deepcell.org',
expire_url,
json={
'hash': predict_hash,
'expireIn': 3600,
'expireIn': 90,
}
)

Expand Down
Loading
Loading