From 61958a06b478520aee2bdb92f700ffd67c96e4e9 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 18 Apr 2022 13:19:40 +0530 Subject: [PATCH] scripts/queue: common teuthology-queue command for paddles and beanstalk queue Signed-off-by: Aishwarya Mathuria --- scripts/beanstalk_queue.py | 2 +- scripts/paddles_queue.py | 3 +- scripts/queue.py | 37 ++++++++++ teuthology/config.py | 2 +- teuthology/dispatcher/__init__.py | 2 +- teuthology/kill.py | 2 +- teuthology/queue/__init__.py | 106 ++++++++++++++++++++++++++++ teuthology/{ => queue}/beanstalk.py | 96 ------------------------- teuthology/queue/paddles.py | 88 +++++++++++++++++++++++ teuthology/report.py | 28 +++++--- teuthology/worker.py | 2 +- 11 files changed, 255 insertions(+), 113 deletions(-) create mode 100644 scripts/queue.py create mode 100644 teuthology/queue/__init__.py rename teuthology/{ => queue}/beanstalk.py (56%) create mode 100644 teuthology/queue/paddles.py diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py index 88a8242847..a8a0661ecf 100644 --- a/scripts/beanstalk_queue.py +++ b/scripts/beanstalk_queue.py @@ -1,7 +1,7 @@ import docopt import teuthology.config -import teuthology.beanstalk +import teuthology.queue.beanstalk doc = """ usage: teuthology-beanstalk-queue -h diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py index 3c69d772e6..8487fd938e 100644 --- a/scripts/paddles_queue.py +++ b/scripts/paddles_queue.py @@ -1,8 +1,7 @@ import docopt import teuthology.config -import teuthology.paddles_queue - +import teuthology.queue.paddles_queue doc = """ usage: teuthology-paddles-queue -h teuthology-paddles-queue -s -m MACHINE_TYPE diff --git a/scripts/queue.py b/scripts/queue.py new file mode 100644 index 0000000000..2c466a7be9 --- /dev/null +++ b/scripts/queue.py @@ -0,0 +1,37 @@ +import docopt + +import teuthology.config +import teuthology.queue.beanstalk +import teuthology.queue.paddles + +doc = """ +usage: teuthology-queue -h + teuthology-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-queue [-r] -m MACHINE_TYPE + teuthology-queue -m MACHINE_TYPE -D PATTERN + teuthology-queue -p SECONDS [-m MACHINE_TYPE] + +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. + +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. + +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + args = docopt.docopt(doc) + teuthology.queue.main(args) diff --git a/teuthology/config.py b/teuthology/config.py index 6a5ac7cdde..43fa0fff9b 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -143,7 +143,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, - 'backend': 'paddles', + 'backend': 'beanstalk', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 62df394fd7..8f2fd76bac 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -14,13 +14,13 @@ setup_log_file, install_except_hook, # modules - beanstalk, exporter, nuke, report, repo_utils, worker, ) +from teuthology.queue import beanstalk from teuthology.config import config as teuth_config from teuthology.dispatcher import supervisor from teuthology.exceptions import SkipJob diff --git a/teuthology/kill.py b/teuthology/kill.py index 78ff5e4fc4..dbbd4fabc5 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -10,7 +10,7 @@ import teuthology.exporter -from teuthology import beanstalk +from teuthology.queue import beanstalk from teuthology import report from teuthology.config import config from teuthology import misc diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py new file mode 100644 index 0000000000..2a0b6ff363 --- /dev/null +++ b/teuthology/queue/__init__.py @@ -0,0 +1,106 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.config import config + +log = logging.getLogger(__name__) + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) + + +def main(args): + if config.backend == 'paddles': + paddles.main(args) + else: + beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/beanstalk.py b/teuthology/queue/beanstalk.py similarity index 56% rename from teuthology/beanstalk.py rename to teuthology/queue/beanstalk.py index a1165becca..90b1cbd6d3 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -61,102 +61,6 @@ def callback(jobs_dict) processor.complete() -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - job_obj = self.jobs[job_id].get('job_obj') - if job_obj: - job_obj.delete() - report.try_delete_jobs(job_name, job_id) - - def pause_tube(connection, tube, duration): duration = int(duration) if not tube: diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py new file mode 100644 index 0000000000..f2ea8b84c8 --- /dev/null +++ b/teuthology/queue/paddles.py @@ -0,0 +1,88 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.dispatcher import pause_queue + + +log = logging.getLogger(__name__) + + +def stats_queue(machine_type): + stats = report.get_queue_stats(machine_type) + if stats['paused'] is None: + log.info("%s queue is currently running with %s jobs queued", + stats['name'], + stats['count']) + else: + log.info("%s queue is paused with %s jobs queued", + stats['name'], + stats['count']) + + +def update_priority(machine_type, priority, user, run_name=None): + if run_name is not None: + jobs = report.get_user_jobs_queue(machine_type, user, run_name) + else: + jobs = report.get_user_jobs_queue(machine_type, user) + for job in jobs: + job['priority'] = priority + report.try_push_job_info(job) + + +def walk_jobs(machine_type, processor, user): + log.info("Checking paddles queue...") + job_count = report.get_queue_stats(machine_type)['count'] + + jobs = report.get_user_jobs_queue(machine_type, user) + if job_count == 0: + log.info('No jobs in queue') + return + + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = jobs[i-1] + if job is None: + continue + job_id = job['job_id'] + processor.add_job(job_id, job) + end_progress() + processor.complete() + + +def main(args): + machine_type = args['--machine_type'] + #user = args['--user'] + #run_name = args['--run_name'] + #priority = args['--priority'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + #unpause = args['--unpause'] + #pause_duration = args['--time'] + try: + if status: + stats_queue(machine_type) + if pause_duration: + pause_queue(machine_type, pause, user, pause_duration) + #else: + #pause_queue(machine_type, pause, user) + elif priority: + update_priority(machine_type, priority, run_name) + elif delete: + walk_jobs(machine_type, + JobDeleter(delete), user) + elif runs: + walk_jobs(machine_type, + RunPrinter(), user) + else: + walk_jobs(machine_type, + JobPrinter(show_desc=show_desc, full=full), + user) + except KeyboardInterrupt: + log.info("Interrupted.") diff --git a/teuthology/report.py b/teuthology/report.py index f9d488daa6..d22e614475 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -508,7 +508,7 @@ def create_queue(self, queue): """ Create a queue on the results server - :param machine_type: The machine type specified for the job + :param queue: The queue specified for the job """ uri = "{base}/queue/".format( base=self.base_uri @@ -613,10 +613,11 @@ def queued_jobs(self, queue, user, run_name): base=self.base_uri ) request_info = {'queue': queue} + filter_field = queue if run_name is not None: filter_field = run_name uri += "?run_name=" + str(run_name) - else: + elif user is not None: filter_field = user uri += "?user=" + str(user) @@ -653,36 +654,43 @@ def create_machine_type_queue(queue): reporter.create_queue(queue) return queue +def get_all_jobs_in_queue(queue, user=None, run_name=None): + reporter = ResultsReporter() + if not reporter.base_uri: + return + if ',' in queue: + queue = 'multi' + return reporter.queued_jobs(queue) -def get_user_jobs_queue(machine_type, user, run_name=None): +def get_user_jobs_queue(queue, user, run_name=None): reporter = ResultsReporter() if not reporter.base_uri: return - return reporter.queued_jobs(machine_type, user, run_name) + return reporter.queued_jobs(queue, user, run_name) -def pause_queue(machine_type, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(machine_type, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused, paused_by, pause_duration) -def is_queue_paused(machine_type): +def is_queue_paused(queue): reporter = ResultsReporter() if not reporter.base_uri: return - stats = reporter.queue_stats(machine_type) + stats = reporter.queue_stats(queue) if stats['paused'] != 0 and stats['paused'] is not None: return True return False -def get_queue_stats(machine_type): +def get_queue_stats(queue): reporter = ResultsReporter() if not reporter.base_uri: return - stats = reporter.queue_stats(machine_type) + stats = reporter.queue_stats(queue) return stats diff --git a/teuthology/worker.py b/teuthology/worker.py index 6dcba20b0a..82f8319d1e 100644 --- a/teuthology/worker.py +++ b/teuthology/worker.py @@ -13,12 +13,12 @@ setup_log_file, install_except_hook, # modules - beanstalk, kill, report, repo_utils, safepath, ) +from teuthology.queue import beanstalk from teuthology.config import config as teuth_config from teuthology.config import set_config_attr from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries