Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add desi_update_processing_table_statuses and update dry_run_level's #2385

Merged
merged 10 commits into from
Oct 10, 2024
15 changes: 8 additions & 7 deletions bin/desi_proc_night
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ def parse_args():
parser.add_argument("--dry-run", action="store_true",
help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+
"dry-run-level is defined as nonzero.")
parser.add_argument("--dry-run-level", type=int, default=0, required=False,
help="""If nonzero, this is a simulated run.
If level=1 the scripts will be written but not submitted.
If level=2, scripts will not be written or submitted but processing_table is created.
if level=3, no output files are written at all.
Logging will remain the same for testing as though scripts are being submitted.
Default is 0 (false).""")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm."
+ "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.")
parser.add_argument("--no-redshifts", action="store_true",
help="Whether to submit redshifts or not. If set, redshifts are not submitted.")
parser.add_argument("--ignore-proc-table-failures", action="store_true",
Expand Down
29 changes: 16 additions & 13 deletions bin/desi_resubmit_queue_failures
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ def parse_args(): # options=None):
help="File format and extension for the exp and proc tables.")
parser.add_argument("-r", "--reservation", type=str, required=False, default=None,
help="The reservation to submit jobs to. If None, it is not submitted to a reservation.")
parser.add_argument("--dry-run", action="store_true",
help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+
"dry-run-level is defined as nonzero.")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm."
+ "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.")
parser.add_argument("--resub-states", type=str, default=None, required=False,
help="The SLURM queue states that should be resubmitted. " +
"E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " +
Expand Down Expand Up @@ -74,24 +79,22 @@ if __name__ == '__main__':
if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

if args.dry_run > 0 and args.dry_run < 3:
log.warning(f"{args.dry_run=} will be run with limited simulation "
f"because we don't want to write out incorrect queue information.")

## Combine the table names and types for easier passing to io functions
table_type = 'proctable'
if args.dry_run_level > 0:
log.info(f"{args.dry_run_level=}, so will be simulating some features."
+ f" See parser for what each level limits.")

## Load in the files defined above
ptable = load_table(tablename=ptable_pathname, tabletype=table_type)
ptable = load_table(tablename=ptable_pathname, tabletype='proctable')
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable, nsubmits = update_and_recursively_submit(ptable, submits=0,
resubmission_states=args.resub_states,
no_resub_failed=args.no_resub_failed,
ptab_name=ptable_pathname, dry_run=args.dry_run,
ptab_name=ptable_pathname,
dry_run_level=args.dry_run_level,
reservation=args.reservation)

if not args.dry_run:
if args.dry_run_level < 3:
write_table(ptable, tablename=ptable_pathname)

log.info("Completed all necessary queue resubmissions from processing "
+ f"table: {ptable_pathname}")
+ f"table: {ptable_pathname}")
88 changes: 88 additions & 0 deletions bin/desi_update_proctable_statuses
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
# coding: utf-8

import argparse
import os
import numpy as np

from desiutil.log import get_logger
from desispec.workflow.tableio import load_table, write_table
from desispec.workflow.proctable import get_processing_table_pathname
from desispec.workflow.queue import update_from_queue


def parse_args(): # options=None):
"""
Get command line arguments for desi_update_proctable_statuses
"""
parser = argparse.ArgumentParser(description="Update the STATUS of all jobs "
+ "in a DESI processing table by "
+ "querying Slurm.")

parser.add_argument("-n","--night", type=str, default=None,
required=False, help="The night you want processed.")
parser.add_argument("-o","--outfile", type=str, default=None,
required=False, help="Output filename, if different from default.")
parser.add_argument("--proc-table-pathname", type=str, required=False, default=None,
help="Directory name where the output processing table should be saved.")
parser.add_argument("--tab-filetype", type=str, required=False, default='csv',
help="File format and extension for the exp and proc tables.")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm."
+ "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.")
parser.add_argument("--check-complete-jobs", action="store_true",
help="Query NERSC about jobs with STATUS 'COMPLETED'"
+ "in addition to all other jobs. Default is False, "
+ "which skips COMPLETED jobs.")
parser.add_argument("--show-updated-table", action="store_true",
help="Print a subset of the columns from the ptable with updated statuses.")

return parser.parse_args()


if __name__ == '__main__':
args = parse_args()
log = get_logger()
ptable_pathname = args.proc_table_pathname
if ptable_pathname is None:
if args.night is None:
ValueError("Either night or --proc-table-path must be specified")
## Determine where the processing table will be written
ptable_pathname = get_processing_table_pathname(prodmod=args.night,
extension=args.tab_filetype)

if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

if args.dry_run_level > 0:
log.info(f"{args.dry_run_level=}, so will be simulating some features."
+ f" See parser for what each level limits.")

## Load in the files defined above
ptable = load_table(tablename=ptable_pathname, tabletype='proctable')
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable = update_from_queue(ptable, dry_run_level=args.dry_run_level,
check_complete_jobs=args.check_complete_jobs)

if args.dry_run_level < 3:
if args.outfile is not None:
outfile = args.outfile
else:
outfile = ptable_pathname
write_table(ptable, tablename=outfile)

if args.show_updated_table:
log.info("Updated processing table:")
cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
log.info(np.array(cols))
for row in ptable:
log.info(np.array(row[cols]))
log.info("\n")

log.info(f"Done updating STATUS column for processing table: {ptable_pathname}")
6 changes: 3 additions & 3 deletions py/desispec/scripts/daily_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path
dry_run=(dry_run and ()))

if len(ptable) > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
# ptable, nsubmits = update_and_recursively_submit(ptable,
# ptab_name=proc_table_pathname, dry_run=dry_run_level)
# ptab_name=proc_table_pathname, dry_run_level=dry_run_level)

## Exposure table doesn't change in the interim, so no need to re-write it to disk
if dry_run_level < 3:
Expand Down Expand Up @@ -481,7 +481,7 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path
resubmit_partial_complete=resubmit_partial_complete,
z_submit_types=z_submit_types)
## All jobs now submitted, update information from job queue and save
ptable = update_from_queue(ptable, dry_run=dry_run_level)
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)

Expand Down
34 changes: 18 additions & 16 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
override_pathname (str): Full path to the override file.
update_exptable (bool): If true then the exposure table is updated.
The default is False.
dry_run_level (int, optional): If nonzero, this is a simulated run.
If dry_run_level=1 the scripts will be written but not submitted.
If dry_run_level=2, the scripts will not be written nor submitted
but the processing_table is still created.
If dry_run_level=3, no output files are written.
Logging will remain the same for testing as though scripts are
being submitted. Default is 0 (false).
dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
0 which runs the code normally.
1 writes all files but doesn't submit any jobs to Slurm.
2 writes tables but doesn't write scripts or submit anything.
3 Doesn't write or submit anything but queries Slurm normally for job status.
4 Doesn't write, submit jobs, or query Slurm.
5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
dry_run (bool, optional): When to run without submitting scripts or
not. If dry_run_level is defined, then it over-rides this flag.
dry_run_level not set and dry_run=True, dry_run_level is set to 2
dry_run_level not set and dry_run=True, dry_run_level is set to 3
(no scripts generated or run). Default for dry_run is False.
no_redshifts (bool, optional): Whether to submit redshifts or not.
If True, redshifts are not submitted.
Expand Down Expand Up @@ -191,7 +191,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,

## Reconcile the dry_run and dry_run_level
if dry_run and dry_run_level == 0:
dry_run_level = 2
dry_run_level = 3
elif dry_run_level > 0:
dry_run = True

Expand Down Expand Up @@ -247,6 +247,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
resubmit_partial_complete = (not dont_resubmit_partial_jobs)
require_cals = (not dont_require_cals)
do_cte_flats = (not no_cte_flats)
## False if not submitting or simulating
update_proctable = (dry_run_level == 0 or dry_run_level > 3)

## cte flats weren't available before 20211130 so hardcode that in
if do_cte_flats and night < 20211130:
Expand Down Expand Up @@ -358,7 +360,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
## Update processing table
tableng = len(ptable)
if tableng > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
if update_proctable:
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
if any_jobs_failed(ptable['STATUS']):
Expand All @@ -373,7 +376,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
ptable, nsubmits = update_and_recursively_submit(ptable,
no_resub_failed=no_resub_failed,
ptab_name=proc_table_pathname,
dry_run=dry_run,
dry_run_level=dry_run_level,
reservation=reservation)
elif not ignore_proc_table_failures:
err = "Some jobs have an incomplete job status. This script " \
Expand Down Expand Up @@ -588,11 +591,10 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
################### Wrap things up ###################
unproc_table = None
if len(ptable) > 0:
## All jobs now submitted, update information from job queue and save
## But only if actually submitting or fully simulating, don't simulate
## outputs that will be written to disk (levels 1 and 2)
if dry_run_level < 1 or dry_run_level > 2:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
## All jobs now submitted, update information from job queue
## If dry_run_level > 3, then Slurm isn't queried
if update_proctable:
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
## Now that processing is complete, lets identify what we didn't process
Expand Down
19 changes: 13 additions & 6 deletions py/desispec/test/test_proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ def test_proc_night_dryrun3(self):
self.assertEqual(len(prodfiles), 1)
self.assertTrue(prodfiles[0].endswith('exposure_tables'))

def test_proc_night_dryrun3(self):
"""Test that dry_run_level=4 doesn't produce any output"""
proctable, unproctable = proc_night(self.night, z_submit_types=['cumulative',],
dry_run_level=4, sub_wait_time=0.0)

prodfiles = glob.glob(self.proddir+'/*')
self.assertEqual(len(prodfiles), 1)
self.assertTrue(prodfiles[0].endswith('exposure_tables'))

def test_proc_night_noz(self):
"""Test that z_submit_types=None doesn't submit any redshift jobs"""

Expand Down Expand Up @@ -203,7 +212,7 @@ def test_proc_night_resubmit_queue_failures(self):
desispec.workflow.proctable.reset_tilenight_ptab_cache()

## test that the code runs
updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, dry_run=3)
updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, dry_run_level=4)
self.assertFalse(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])),
msg='No TIMEOUTs in nominal resubmission')

Expand All @@ -214,7 +223,7 @@ def test_proc_night_resubmit_queue_failures(self):
proctable2['STATUS'][proctable2['INTID']==cumulative2['INTID']] = 'TIMEOUT'
updatedtable2, nsubmits = update_and_recursively_submit(proctable2,
submits=0,
dry_run=1)
dry_run_level=4)
self.assertFalse(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])),
msg='Cross night resubmission should leave no TIMEOUTs')

Expand All @@ -237,11 +246,9 @@ def test_proc_night_resubmit_queue_failures(self):
## Run resubmission code
updatedtable2, nsubmits = update_and_recursively_submit(proctable2,
submits=0,
dry_run=1)
self.assertTrue(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])),
msg='Cross night resubmission should leave two TIMEOUTs')
dry_run_level=4)
self.assertTrue(np.sum(updatedtable2['STATUS'] == 'DEP_NOT_SUBD')==2,
msg='Cross night resubmission should have 2 TIMEOUTs' \
msg='Cross night resubmission should have 2 DEP_NOT_SUBDs' \
+ ' after forcing failed previous night jobs.')


Expand Down
Loading
Loading