diff --git a/arc/job/adapter.py b/arc/job/adapter.py index 29d83fc97c..abf07d8dcb 100644 --- a/arc/job/adapter.py +++ b/arc/job/adapter.py @@ -518,7 +518,8 @@ def write_submit_script(self) -> None: } if queue is None: - logger.warning(f'Queue not defined for server {self.server}. Assuming the queue name is defined in your submit.py script.') + logger.debug(f'Queue not defined for server {self.server}. ' + f'Assuming the queue name is defined in your submit.py script.') del format_params['queue'] try: diff --git a/arc/scheduler.py b/arc/scheduler.py index 50cd96cbaa..b41e45925c 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -66,9 +66,10 @@ logger = get_logger() LOWEST_MAJOR_TS_FREQ, HIGHEST_MAJOR_TS_FREQ, default_job_settings, \ - default_job_types, rotor_scan_resolution, default_ts_adapters, max_rotor_trsh = \ + default_job_types, default_ts_adapters, max_rotor_trsh, rotor_scan_resolution, servers_dict = \ settings['LOWEST_MAJOR_TS_FREQ'], settings['HIGHEST_MAJOR_TS_FREQ'], settings['default_job_settings'], \ - settings['default_job_types'], settings['rotor_scan_resolution'], settings['ts_adapters'], settings['max_rotor_trsh'] + settings['default_job_types'], settings['ts_adapters'], settings['max_rotor_trsh'], \ + settings['rotor_scan_resolution'], settings['servers'] class Scheduler(object): @@ -887,6 +888,7 @@ def run_job(self, self.job_dict[label]['tsg'][tsg] = job # save job object if job.server is not None and job.server not in self.servers: self.servers.append(job.server) + self.check_max_simultaneous_jobs_limit(job.server) job.execute() self.save_restart_dict() @@ -3080,17 +3082,21 @@ def check_all_done(self, label: str): # Update restart dictionary and save the yaml restart file: self.save_restart_dict() - def get_server_job_ids(self): + def get_server_job_ids(self, specific_server: Optional[str] = None): """ - Check job status on all active servers, get a list of relevant running job IDs. + Check job status on a specific server or on all active servers, get a list of relevant running job IDs. + + Args: + specific_server (str, optional): The server to check. If ``None``, check all active servers. """ self.server_job_ids = list() for server in self.servers: - if server != 'local': - with SSHClient(server) as ssh: - self.server_job_ids.extend(ssh.check_running_jobs_ids()) - else: - self.server_job_ids.extend(check_running_jobs_ids()) + if specific_server is None or server == specific_server: + if server != 'local': + with SSHClient(server) as ssh: + self.server_job_ids.extend(ssh.check_running_jobs_ids()) + else: + self.server_job_ids.extend(check_running_jobs_ids()) def get_completed_incore_jobs(self): """ @@ -3794,6 +3800,23 @@ def save_e_elect(self, label: str): content[label] = self.species_dict[label].e_elect save_yaml_file(path=path, content=content) + def check_max_simultaneous_jobs_limit(self, server: Optional[str]): + """ + Check if the number of running jobs on the server is not above the set server limit. + + Args: + server (str): The server name. + """ + if server is not None and 'max_simultaneous_jobs' in servers_dict[server]: + continue_lopping = True + while continue_lopping: + self.get_server_job_ids(specific_server=server) + if len(self.server_job_ids) >= servers_dict[server]['max_simultaneous_jobs']: + time.sleep(90) + else: + continue_lopping = False + self.get_server_job_ids() + def species_has_freq(species_output_dict: dict, yml_path: Optional[str] = None, diff --git a/arc/settings/settings.py b/arc/settings/settings.py index 5d5d86d5fe..8014bb9cf1 100644 --- a/arc/settings/settings.py +++ b/arc/settings/settings.py @@ -39,6 +39,7 @@ 'address': 'server1.host.edu', 'un': '', 'key': 'path_to_rsa_key', + 'max_simultaneous_jobs': 10, # optional, "check_status_command" must be set to only return jobs for your user }, 'server2': { 'cluster_soft': 'Slurm', @@ -58,7 +59,7 @@ 'cluster_soft': 'HTCondor', 'un': '', 'cpus': 48, - 'queues': {'':''}, #{'queue_name':'HH:MM:SS'} + 'queues': {'':''}, # {'queue_name':'HH:MM:SS'} 'excluded_queues': ['queue_name1', 'queue_name2'], }, }