Skip to content

Commit

Permalink
Merge pull request #132 from zooniverse/resume-uploads
Browse files Browse the repository at this point in the history
Add option to resume subject uploads on failure
  • Loading branch information
adammcmaster authored Oct 15, 2019
2 parents a784cfe + bab2771 commit c9d1228
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 71 deletions.
288 changes: 217 additions & 71 deletions panoptes_cli/commands/subject_set.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import copy
import os
import re
import sys
Expand All @@ -8,13 +9,16 @@
import click
import humanize

from pathvalidate import is_valid_filename, sanitize_filename

from panoptes_cli.scripts.panoptes import cli
from panoptes_client import SubjectSet
from panoptes_client.panoptes import PanoptesAPIException

LINK_BATCH_SIZE = 10
MAX_PENDING_SUBJECTS = 50
MAX_UPLOAD_FILE_SIZE = 1024 * 1024
CURRENT_STATE_VERSION = 1


@cli.group(name='subject-set')
Expand Down Expand Up @@ -191,11 +195,66 @@ def upload_subjects(
Any local files will still be detected and uploaded.
"""

remote_location_count = len(remote_location)
mime_type_count = len(mime_type)
if (
len(manifest_files) > 1
and any(map(lambda m: m.endswith('.yaml'), manifest_files))
):
click.echo(
'Error: YAML manifests must be processed one at a time.',
err=True,
)
return -1
elif manifest_files[0].endswith('.yaml'):
with open(manifest_files[0], 'r') as yaml_manifest:
upload_state = yaml.load(yaml_manifest, Loader=yaml.FullLoader)
if upload_state['state_version'] > CURRENT_STATE_VERSION:
click.echo(
'Error: {} was generated by a newer version of the Panoptes '
'CLI and is not compatible with this version.'.format(
manifest_files[0],
),
err=True,
)
return -1
if upload_state['subject_set_id'] != subject_set_id:
click.echo(
'Warning: You specified subject set {} but this YAML '
'manifest is for subject set {}.'.format(
subject_set_id,
upload_state['subject_set_id'],
),
err=True,
)
click.confirm(
'Upload {} to subject set {} ({})?'.format(
manifest_files[0],
subject_set_id,
SubjectSet.find(subject_set_id).display_name,
),
abort=True
)
upload_state['subject_set_id'] = subject_set_id
resumed_upload = True
else:
upload_state = {
'state_version': CURRENT_STATE_VERSION,
'subject_set_id': subject_set_id,
'manifest_files': manifest_files,
'allow_missing': allow_missing,
'remote_location': remote_location,
'mime_type': mime_type,
'file_column': file_column,
'waiting_to_upload': [],
'waiting_to_link': {},
}
resumed_upload = False

remote_location_count = len(upload_state['remote_location'])
mime_type_count = len(upload_state['mime_type'])
if remote_location_count > 1 and mime_type_count == 1:
mime_type = mime_type * remote_location_count
upload_state['mime_type'] = (
upload_state['mime_type'] * remote_location_count
)
elif remote_location_count > 0 and mime_type_count != remote_location_count:
click.echo(
'Error: The number of MIME types given must be either 1 or equal '
Expand Down Expand Up @@ -235,91 +294,178 @@ def validate_file(file_path):
return False
return True

subject_set = SubjectSet.find(subject_set_id)
subject_rows = []
for manifest_file in manifest_files:
with open(manifest_file, 'U') as manifest_f:
file_root = os.path.dirname(manifest_file)
r = csv.reader(manifest_f)
headers = next(r)
for row in r:
metadata = dict(zip(headers, row))
files = []
if not file_column:
file_column = []
for field_number, col in enumerate(row, start=1):
file_path = os.path.join(file_root, col)
if os.path.exists(file_path):
file_column.append(field_number)
subject_set = SubjectSet.find(upload_state['subject_set_id'])
if not resumed_upload:
subject_rows = []
for manifest_file in upload_state['manifest_files']:
with open(manifest_file, 'U') as manifest_f:
file_root = os.path.dirname(manifest_file)
r = csv.reader(manifest_f)
headers = next(r)
for row in r:
metadata = dict(zip(headers, row))
files = []
if not upload_state['file_column']:
upload_state['file_column'] = []
for field_number, col in enumerate(row, start=1):
file_path = os.path.join(file_root, col)
if os.path.exists(file_path):
upload_state['file_column'].append(
field_number,
)
if not validate_file(file_path):
return -1
files.append(file_path)
else:
for field_number in upload_state['file_column']:
file_path = os.path.join(
file_root,
row[field_number - 1]
)
if not validate_file(file_path):
return -1
files.append(file_path)
else:
for field_number in file_column:
file_path = os.path.join(
file_root,
row[field_number - 1]
)
if not validate_file(file_path):
return -1
files.append(file_path)

for field_number, _mime_type in zip(remote_location, mime_type):
files.append({_mime_type: row[field_number - 1]})
for field_number, _mime_type in zip(
upload_state['remote_location'],
upload_state['mime_type'],
):
files.append({_mime_type: row[field_number - 1]})

if len(files) == 0:
click.echo('Could not find any files in row:', err=True)
click.echo(','.join(row), err=True)
if not allow_missing:
return -1
else:
continue
subject_rows.append((files, metadata))

if not subject_rows:
click.echo(
'File {} did not contain any rows.'.format(manifest_file),
err=True,
)
return -1
if len(files) == 0:
click.echo(
'Could not find any files in row:',
err=True,
)
click.echo(','.join(row), err=True)
if not upload_state['allow_missing']:
return -1
else:
continue
subject_rows.append((files, metadata))

if not subject_rows:
click.echo(
'File {} did not contain any rows.'.format(
manifest_file,
),
err=True,
)
return -1

subject_rows = list(enumerate(subject_rows))
upload_state['waiting_to_upload'] = copy.deepcopy(subject_rows)
else:
for subject_id, subject_row in upload_state['waiting_to_link'].items():
try:
subject = Subject.find(subject_id)
except PanoptesAPIException:
upload_state['waiting_to_upload'].append(subject_row)
del upload_state['waiting_to_link'][subject_id]
subject_rows = copy.deepcopy(upload_state['waiting_to_upload'])

created_subjects = []
pending_subjects = []

def move_created(limit):
while len(pending_subjects) > limit:
for subject in pending_subjects:
for subject, subject_row in pending_subjects:
if subject.async_save_result:
created_subjects.append(subject)
pending_subjects.remove(subject)
pending_subjects.remove((subject, subject_row))
upload_state['waiting_to_upload'].remove(subject_row)
upload_state['waiting_to_link'][subject.id] = subject_row
time.sleep(0.5)

def link_created(limit):
if len(created_subjects) > limit:
subject_set.add(created_subjects)
del created_subjects[:]
def link_subjects(limit):
if len(upload_state['waiting_to_link']) > limit:
subject_set.add(list(upload_state['waiting_to_link'].keys()))
upload_state['waiting_to_link'].clear()

with click.progressbar(
enumerate(subject_rows),
subject_rows,
length=len(subject_rows),
label='Uploading subjects',
) as _subject_rows:
with Subject.async_saves():
for count, (files, metadata) in _subject_rows:
subject = Subject()
subject.links.project = subject_set.links.project
for media_file in files:
subject.add_location(media_file)
subject.metadata.update(metadata)
subject.save()

pending_subjects.append(subject)

move_created(MAX_PENDING_SUBJECTS)
link_created(LINK_BATCH_SIZE)

move_created(0)
link_created(0)
try:
with Subject.async_saves():
for subject_row in _subject_rows:
count, (files, metadata) = subject_row
subject = Subject()
subject.links.project = subject_set.links.project
for media_file in files:
subject.add_location(media_file)
subject.metadata.update(metadata)
subject.save()

pending_subjects.append((subject, subject_row))

move_created(MAX_PENDING_SUBJECTS)
link_subjects(LINK_BATCH_SIZE)

move_created(0)
link_subjects(0)
finally:
if (
len(pending_subjects) > 0
or len(upload_state['waiting_to_link']) > 0
):
click.echo('Error: Upload failed.', err=True)
if click.confirm(
'Would you like to save the upload state to resume the '
'upload later?',
default=True,
):
while True:
state_file_name = 'panoptes-upload-{}.yaml'.format(
subject_set_id,
)
state_file_name = click.prompt(
'Enter filename to save to',
default=state_file_name,
)

if not state_file_name.endswith('.yaml'):
click.echo(
'Error: File name must end in ".yaml".',
err=True,
)
if click.confirm(
'Save to {}.yaml?'.format(state_file_name),
default=True,
):
state_file_name += '.yaml'
else:
continue
if not is_valid_filename(state_file_name):
click.echo(
'Error: {} is not a valid file name'.format(
state_file_name,
),
err=True,
)
sanitized_filename = sanitize_filename(
state_file_name,
)
if click.confirm(
'Save to {}?'.format(
sanitized_filename,
),
default=True,
):
state_file_name = sanitized_filename
else:
continue
if os.path.exists(state_file_name):
if not click.confirm(
'File {} already exists. Overwrite?'.format(
state_file_name,
),
default=False,
):
continue
break

with open(state_file_name, 'w') as state_file:
yaml.dump(upload_state, state_file)


@subject_set.command(name='add-subjects')
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
'PyYAML>=3.12,<5.2',
'panoptes-client>=1.0,<2.0',
'humanize>=0.5.1,<0.6',
'pathvalidate>=0.29.0,<0.30',
],
entry_points='''
[console_scripts]
Expand Down

0 comments on commit c9d1228

Please sign in to comment.