diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 6e098b9a1..1590207d6 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -4,6 +4,10 @@ Generates the condor submit files and the master DAG. """ # pylint: disable=invalid-name # have a lot of snake_case varaibles here from "old times" +# also.. yeah.. this is a long and somehow complex file, but we are not going to break it now +# pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-lines, too-many-arguments +# there just one very long line in the HTCondor JDL template +# pylint: disable=line-too-long import os import re @@ -17,7 +21,7 @@ from ast import literal_eval from ServerUtilities import MAX_DISK_SPACE, MAX_IDLE_JOBS, MAX_POST_JOBS, TASKLIFETIME -from ServerUtilities import getLock, downloadFromS3, checkS3Object, uploadToS3 +from ServerUtilities import getLock, checkS3Object, uploadToS3 import TaskWorker.DataObjects.Result from TaskWorker.Actions.TaskAction import TaskAction @@ -26,6 +30,7 @@ from CMSGroupMapper import get_egroup_users import WMCore.WMSpec.WMTask +from WMCore import Lexicon from WMCore.Services.CRIC.CRIC import CRIC from WMCore.WMRuntime.Tools.Scram import ARCH_TO_OS, SCRAM_TO_ARCH @@ -128,7 +133,8 @@ WhenToTransferOutput = ON_EXIT_OR_EVICT +SpoolOnEvict = false -# Keep job in the queue upon completion long enough for the postJob to run, allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob +# Keep job in the queue upon completion long enough for the postJob to run, +# allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob LeaveJobInQueue = ifThenElse((JobStatus=?=4 || JobStatus=?=3) && (time() - EnteredCurrentStatus < 30 * 60*60), true, false) universe = vanilla @@ -189,12 +195,16 @@ def getCreateTimestamp(taskname): + """ name says it all """ return "_".join(taskname.split(":")[:1]) def makeLFNPrefixes(task): - ## Once we don't care anymore about backward compatibility with crab server < 3.3.1511 - ## we can uncomment the 1st line below and remove the next 6 lines. + """ + create LFN's for output files both on /store/temp and on final destination + Once we don't care anymore about backward compatibility with crab server < 3.3.1511 + we can uncomment the 1st line below and remove the next 6 lines. + """ #primaryds = task['tm_primary_dataset'] if task['tm_primary_dataset']: primaryds = task['tm_primary_dataset'] @@ -211,7 +221,7 @@ def makeLFNPrefixes(task): hash_input = hash_input.encode('utf-8') pset_hash = hashlib.sha1(hash_input).hexdigest() user = task['tm_username'] - tmp_user = "%s.%s" % (user, pset_hash) + tmp_user = f"{user}.{pset_hash}" publish_info = task['tm_publish_name'].rsplit('-', 1) #publish_info[0] is the publishname or the taskname timestamp = getCreateTimestamp(task['tm_taskname']) splitlfn = lfn.split('/') @@ -231,7 +241,6 @@ def validateLFNs(path, outputFiles): :param outputFiles: list of strings: the filenames to be published (w/o the jobId, i.e. out.root not out_1.root) :return: nothing if all OK. If LFN is not valid Lexicon raises an AssertionError exception """ - from WMCore import Lexicon # fake values to get proper LFN length, actual numbers chance job by job jobId = '10000' # current max is 10k jobs per task dirCounter = '0001' # need to be same length as 'counter' used later in makeDagSpecs @@ -239,14 +248,14 @@ def validateLFNs(path, outputFiles): for origFile in outputFiles: info = origFile.rsplit(".", 1) if len(info) == 2: # filename ends with ., put jobId before the dot - fileName = "%s_%s.%s" % (info[0], jobId, info[1]) + fileName = f"{info[0]}_{jobId}.{info[1]}" else: - fileName = "%s_%s" % (origFile, jobId) + fileName = f"{origFile}_{jobId}" testLfn = os.path.join(path, dirCounter, fileName) Lexicon.lfn(testLfn) # will raise if testLfn is not a valid lfn # since Lexicon does not have lenght check, do it manually here. if len(testLfn) > 500: - msg = "\nYour task specifies an output LFN %d-char long " % len(testLfn) + msg = f"\nYour task specifies an output LFN {len(testLfn)}-char long " msg += "\n which exceeds maximum length of 500" msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) @@ -258,7 +267,6 @@ def validateUserLFNs(path, outputFiles): :param outputFiles: list of strings: the filenames to be published (w/o the jobId, i.e. out.root not out_1.root) :return: nothing if all OK. If LFN is not valid Lexicon raises an AssertionError exception """ - from WMCore import Lexicon # fake values to get proper LFN length, actual numbers chance job by job jobId = '10000' # current max is 10k jobs per task dirCounter = '0001' # need to be same length as 'counter' used later in makeDagSpecs @@ -266,18 +274,17 @@ def validateUserLFNs(path, outputFiles): for origFile in outputFiles: info = origFile.rsplit(".", 1) if len(info) == 2: # filename ends with ., put jobId before the dot - fileName = "%s_%s.%s" % (info[0], jobId, info[1]) + fileName = f"{info[0]}_{jobId}.{info[1]}" else: - fileName = "%s_%s" % (origFile, jobId) + fileName = f"{origFile}_{jobId}" testLfn = os.path.join(path, dirCounter, fileName) Lexicon.userLfn(testLfn) # will raise if testLfn is not a valid lfn # since Lexicon does not have lenght check, do it manually here. if len(testLfn) > 500: - msg = "\nYour task specifies an output LFN %d-char long " % len(testLfn) + msg = f"\nYour task specifies an output LFN {len(testLfn)}-char long " msg += "\n which exceeds maximum length of 500" msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) - return def transform_strings(data): """ @@ -293,7 +300,7 @@ def transform_strings(data): 'required_arch', 'resthost', 'dbinstance', 'submitter_ip_addr', \ 'task_lifetime_days', 'task_endtime', 'maxproberuntime', 'maxtailruntime': val = data.get(var, None) - if val == None: + if val is None: info[var] = 'undefined' else: info[var] = json.dumps(val) @@ -306,7 +313,7 @@ def transform_strings(data): for var in 'siteblacklist', 'sitewhitelist', 'addoutputfiles', 'tfileoutfiles', 'edmoutfiles': val = data[var] - if val == None: + if val is None: info[var] = "{}" else: info[var] = "{" + json.dumps(val)[1:-1] + "}" @@ -332,15 +339,13 @@ def transform_strings(data): return info -def getLocation(default_name, checkout_location): +def getLocation(default_name): """ Get the location of the runtime code (job wrapper, postjob, anything executed on the schedd and on the worker node) First check if the files are present in the current working directory Then check if CRABTASKWORKER_ROOT is in the environment and use that location (that viariable is set by the taskworker init script. In the prod source script we use "export CRABTASKWORKER_ROOT") - Finally, check if the CRAB3_CHECKOUT variable is set. That option is interesting for developer who - can use this to point to their github repository. (Marco: we need to check this) """ loc = default_name if not os.path.exists(loc): @@ -349,9 +354,7 @@ def getLocation(default_name, checkout_location): fname = os.path.join(os.environ['CRABTASKWORKER_ROOT'], path, loc) if os.path.exists(fname): return fname - if 'CRAB3_CHECKOUT' not in os.environ: - raise Exception("Unable to locate %s" % loc) - loc = os.path.join(os.environ['CRAB3_CHECKOUT'], checkout_location, loc) + raise Exception(f"Unable to locate {loc}") # pylint: disable=broad-exception-raised loc = os.path.abspath(loc) return loc @@ -363,17 +366,19 @@ class DagmanCreator(TaskAction): """ def __init__(self, config, crabserver, procnum=-1, rucioClient=None): + """ need a comment line here """ TaskAction.__init__(self, config, crabserver, procnum) self.rucioClient = rucioClient def populateGlideinMatching(self, info): + """ actually simply set the required arch """ scram_arch = info['tm_job_arch'] # Set defaults info['required_arch'] = "X86_64" # The following regex matches a scram arch into four groups # for example el9_amd64_gcc10 is matched as (el)(9)_(amd64)_(gcc10) # later, only the third group is returned, the one corresponding to the arch. - m = re.match("([a-z]+)(\d+)_(\w+)_(\w+)", scram_arch) + m = re.match(r"([a-z]+)(\d+)_(\w+)_(\w+)", scram_arch) if m: _, _, arch, _ = m.groups() if arch not in SCRAM_TO_ARCH: @@ -392,12 +397,11 @@ def getDashboardTaskType(self, task): return task['tm_activity'] def isHammerCloud(self, task): - if task['tm_activity'] and 'HC' in task['tm_activity'].upper(): - return True - else: - return False + " name says it all " + return task['tm_activity'] and 'HC' in task['tm_activity'].upper() def setCMS_WMTool(self, task): + " for reporting to MONIT " if self.isHammerCloud(task): WMTool = 'HammerCloud' else: @@ -405,6 +409,7 @@ def setCMS_WMTool(self, task): return WMTool def setCMS_TaskType(self, task): + " for reporting to MONIT " if self.isHammerCloud(task): taskType = task['tm_activity'] else: @@ -415,6 +420,7 @@ def setCMS_TaskType(self, task): return taskType def setCMS_Type(self, task): + " for reporting to MONIT " if self.isHammerCloud(task): cms_type = 'Test' else: @@ -494,7 +500,6 @@ def makeJobSubmit(self, task): self.populateGlideinMatching(info) - # TODO: pass through these correctly. info['runs'] = [] info['lumis'] = [] info['saveoutput'] = 1 if info['tm_transfer_outputs'] == 'T' else 0 # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py @@ -539,11 +544,11 @@ def makeJobSubmit(self, task): info['additional_environment_options'] += 'CRAB_RUNTIME_TARBALL=local' info['additional_input_file'] += ", CMSRunAnalysis.tar.gz" else: - raise TaskWorkerException("Cannot find CMSRunAnalysis.tar.gz inside the cwd: %s" % os.getcwd()) + raise TaskWorkerException(f"Cannot find CMSRunAnalysis.tar.gz inside the cwd: {os.getcwd()}") if os.path.exists("TaskManagerRun.tar.gz"): info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' else: - raise TaskWorkerException("Cannot find TaskManagerRun.tar.gz inside the cwd: %s" % os.getcwd()) + raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") info['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap info['additional_input_file'] += ", run_and_lumis.tar.gz" info['additional_input_file'] += ", input_files.tar.gz" @@ -572,13 +577,14 @@ def getPreScriptDefer(self, task, jobid): releaseTimeout = int(ej.split('=')[1]) if slowJobRelease: - prescriptDeferString = 'DEFER 4 %s' % (jobid * releaseTimeout) + prescriptDeferString = f"DEFER 4 {jobid * releaseTimeout}" else: prescriptDeferString = '' return prescriptDeferString def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasites, outfiles, startjobid, parent=None, stage='conventional'): + """ need a comment line here """ dagSpecs = [] i = startjobid temp_dest, dest = makeLFNPrefixes(task) @@ -593,7 +599,7 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite except AssertionError as ex: msg = "\nYour task specifies an output LFN which fails validation in" msg += "\n WMCore/Lexicon and therefore can not be handled in our DataBase" - msg += "\nError detail: %s" % (str(ex)) + msg += f"\nError detail: {ex}" raise SubmissionRefusedException(msg) from ex groupid = len(siteinfo['group_sites']) siteinfo['group_sites'][groupid] = list(availablesites) @@ -620,23 +626,21 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite if parent is None or parent == "": count = str(i) else: - count = '{parent}-{i}'.format(parent=parent, i=i) + count = f"{parent}-{i}" siteinfo[count] = groupid remoteOutputFiles = [] localOutputFiles = [] for origFile in outfiles: info = origFile.rsplit(".", 1) if len(info) == 2: - fileName = "%s_%s.%s" % (info[0], count, info[1]) + fileName = f"{info[0]}_{count}.{info[1]}" else: - fileName = "%s_%s" % (origFile, count) - remoteOutputFiles.append("%s" % fileName) - localOutputFiles.append("%s=%s" % (origFile, fileName)) + fileName = f"{origFile}_{count}" + remoteOutputFiles.append(fileName) + localOutputFiles.append(f"{origFile}={fileName}") remoteOutputFilesStr = " ".join(remoteOutputFiles) localOutputFiles = ", ".join(localOutputFiles) - # no need to use // in the next line, thanks to integer formatting with `%d` - # see: https://docs.python.org/3/library/string.html#formatstrings - counter = "%04d" % (i / 1000) + counter = f"{(i // 1000):04d}" # counter=0000 for i<999, 1 for 1000 200 # info['faillimit'] = 100 @@ -1118,9 +1120,7 @@ def getBlacklistMsg(): return info, splitterResult, subdags, dagSpecs def getHighPrioUsers(self, userProxy, workflow, egroups): - # Import needed because the DagmanCreator module is also imported in the schedd, - # where there is no ldap available. This function however is only called - # in the TW (where ldap is installed) during submission. + """ get the list of high priority users """ highPrioUsers = set() try: @@ -1129,23 +1129,22 @@ def getHighPrioUsers(self, userProxy, workflow, egroups): except Exception as ex: # pylint: disable=broad-except msg = "Error when getting the high priority users list." \ " Will ignore the high priority list and continue normally." \ - " Error reason: %s" % str(ex) + f" Error reason: {ex}" self.uploadWarning(msg, userProxy, workflow) return [] return highPrioUsers def executeInternal(self, *args, **kw): - # So, the filename becomes http:// -- and doesn't really work. Hardcoding the analysis wrapper. - #transform_location = getLocation(kw['task']['tm_transformation'], 'CAFUtilities/src/python/transformation/CMSRunAnalysis/') - transform_location = getLocation('CMSRunAnalysis.sh', 'CRABServer/scripts/') - cmscp_location = getLocation('cmscp.py', 'CRABServer/scripts/') - cmscpsh_location = getLocation('cmscp.sh', 'CRABServer/scripts/') - gwms_location = getLocation('gWMS-CMSRunAnalysis.sh', 'CRABServer/scripts/') - env_location = getLocation('submit_env.sh', 'CRABServer/scripts/') - dag_bootstrap_location = getLocation('dag_bootstrap_startup.sh', 'CRABServer/scripts/') - bootstrap_location = getLocation("dag_bootstrap.sh", "CRABServer/scripts/") - adjust_location = getLocation("AdjustSites.py", "CRABServer/scripts/") + """ all real work is done here """ + transform_location = getLocation('CMSRunAnalysis.sh') + cmscp_location = getLocation('cmscp.py') + cmscpsh_location = getLocation('cmscp.sh') + gwms_location = getLocation('gWMS-CMSRunAnalysis.sh') + env_location = getLocation('submit_env.sh') + dag_bootstrap_location = getLocation('dag_bootstrap_startup.sh') + bootstrap_location = getLocation("dag_bootstrap.sh") + adjust_location = getLocation("AdjustSites.py") shutil.copy(transform_location, '.') shutil.copy(cmscp_location, '.') @@ -1176,9 +1175,9 @@ def executeInternal(self, *args, **kw): f"\nError reason: {ex}") from ex # Bootstrap the runtime if it is available. - job_runtime = getLocation('CMSRunAnalysis.tar.gz', 'CRABServer/') + job_runtime = getLocation('CMSRunAnalysis.tar.gz',) shutil.copy(job_runtime, '.') - task_runtime = getLocation('TaskManagerRun.tar.gz', 'CRABServer/') + task_runtime = getLocation('TaskManagerRun.tar.gz') shutil.copy(task_runtime, '.') kw['task']['resthost'] = self.crabserver.server['host'] @@ -1205,6 +1204,7 @@ def executeInternal(self, *args, **kw): def execute(self, *args, **kw): + """ entry point called by Hanlder """ cwd = os.getcwd() try: os.chdir(kw['tempDir'])