From fc8245c4ba8c186ad7e82cff0a007eec24869772 Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 4 May 2022 19:37:40 +0530 Subject: [PATCH] teuthology/queue: Single command for queue operations Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk. Signed-off-by: Aishwarya Mathuria --- docs/docker-compose/docker-compose.yml | 2 +- docs/docker-compose/teuthology/teuthology.sh | 7 +- scripts/beanstalk_queue.py | 35 --- scripts/dispatcher.py | 8 +- scripts/kill.py | 2 +- scripts/paddles_queue.py | 45 ---- scripts/queue.py | 17 +- scripts/schedule.py | 1 - scripts/worker.py | 6 +- teuthology/config.py | 2 +- teuthology/dispatcher/__init__.py | 45 ++-- teuthology/dispatcher/supervisor.py | 7 +- teuthology/exporter.py | 2 +- teuthology/kill.py | 2 +- teuthology/orchestra/run.py | 1 + teuthology/paddles_queue.py | 219 ------------------- teuthology/queue/__init__.py | 106 --------- teuthology/queue/beanstalk.py | 19 +- teuthology/queue/paddles.py | 60 +++-- teuthology/queue/util.py | 101 +++++++++ teuthology/report.py | 49 ++--- teuthology/schedule.py | 9 +- teuthology/test/test_dispatcher.py | 73 ------- teuthology/test/test_worker.py | 43 +--- 24 files changed, 218 insertions(+), 643 deletions(-) delete mode 100644 scripts/beanstalk_queue.py delete mode 100644 scripts/paddles_queue.py delete mode 100644 teuthology/paddles_queue.py create mode 100644 teuthology/queue/util.py diff --git a/docs/docker-compose/docker-compose.yml b/docs/docker-compose/docker-compose.yml index f64d17a546..2b2546ca45 100644 --- a/docs/docker-compose/docker-compose.yml +++ b/docs/docker-compose/docker-compose.yml @@ -19,7 +19,7 @@ services: ports: - 5432:5432 paddles: - image: quay.io/ceph-infra/paddles + image: quay.io/ceph-infra/paddles:wip-amathuria-removing-beanstalkd environment: PADDLES_SERVER_HOST: 0.0.0.0 PADDLES_SQLALCHEMY_URL: postgresql+psycopg2://admin:password@postgres:5432/paddles diff --git a/docs/docker-compose/teuthology/teuthology.sh b/docs/docker-compose/teuthology/teuthology.sh index 0378f93d44..78119ed076 100755 --- a/docs/docker-compose/teuthology/teuthology.sh +++ b/docs/docker-compose/teuthology/teuthology.sh @@ -24,6 +24,7 @@ if [ -z "$TEUTHOLOGY_WAIT" ]; then --ceph-repo https://github.com/ceph/ceph.git \ --suite-repo https://github.com/ceph/ceph.git \ -c main \ + --sha1 552b6d852cbc7384a05805989a675de7d6b4a3fd \ -m $MACHINE_TYPE \ --limit 1 \ -n 100 \ @@ -37,10 +38,12 @@ if [ -z "$TEUTHOLOGY_WAIT" ]; then --force-priority \ $CUSTOM_CONF DISPATCHER_EXIT_FLAG='--exit-on-empty-queue' - teuthology-queue -m $MACHINE_TYPE -s | \ - python3 -c "import sys, json; assert json.loads(sys.stdin.read())['count'] > 0, 'queue is empty!'" + #teuthology-queue -m $MACHINE_TYPE -s | \ + # python3 -c "import sys, json; assert json.loads(sys.stdin.read())['count'] > 0, 'queue is empty!'" fi teuthology-dispatcher -v \ --log-dir /teuthology/log \ --tube $MACHINE_TYPE \ + --machine-type $MACHINE_TYPE \ $DISPATCHER_EXIT_FLAG + diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py deleted file mode 100644 index a8a0661ecf..0000000000 --- a/scripts/beanstalk_queue.py +++ /dev/null @@ -1,35 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.beanstalk - -doc = """ -usage: teuthology-beanstalk-queue -h - teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE - teuthology-beanstalk-queue [-r] -m MACHINE_TYPE - teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN - teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE] -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. -Arguments: - -m, --machine_type MACHINE_TYPE [default: multi] - Which machine type queue to work on. -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 - will unpause. If -m is passed, pause that queue, - otherwise pause all queues. -""" - - -def main(): - - args = docopt.docopt(doc) - print(args) - teuthology.beanstalk.main(args) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 5e64b382d8..b4f2fdf488 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,9 +1,9 @@ """ usage: teuthology-dispatcher --help teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR - teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND + teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] --log-dir LOG_DIR --tube TUBE --machine-type MACHINE_TYPE -Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk +Start a dispatcher for the specified tube. Grab jobs from a paddles/beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is a teuthology-dispatcher command run in supervisor mode. @@ -17,12 +17,12 @@ -v, --verbose be more verbose -l, --log-dir LOG_DIR path in which to store logs -a DIR, --archive-dir DIR path to archive results in - --machine-type MACHINE_TYPE the machine type for the job + -t, --tube TUBE which queue to read jobs from --supervisor run dispatcher in job supervisor mode --bin-path BIN_PATH teuthology bin path --job-config CONFIG file descriptor of job's config file --exit-on-empty-queue if the queue is empty, exit - --queue-backend BACKEND choose between paddles and beanstalk + --machine-type MACHINE_TYPE specify machine type eg. smithi, mira """ import docopt diff --git a/scripts/kill.py b/scripts/kill.py index e2a1a4ef09..a93bcd8629 100644 --- a/scripts/kill.py +++ b/scripts/kill.py @@ -12,7 +12,7 @@ teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN Kill running teuthology jobs: -1. Removes any queued jobs from the paddles queue +1. Removes any queued jobs from the queue 2. Kills any running jobs 3. Nukes any machines involved diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py deleted file mode 100644 index 8487fd938e..0000000000 --- a/scripts/paddles_queue.py +++ /dev/null @@ -1,45 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.paddles_queue -doc = """ -usage: teuthology-paddles-queue -h - teuthology-paddles-queue -s -m MACHINE_TYPE - teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -u -m MACHINE_TYPE -U USER - -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. - -Arguments: - -m, --machine_type MACHINE_TYPE - Which machine type queue to work on. - -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -t, --time SECONDS Pause queues for a number of seconds. - If -m is passed, pause that queue, - otherwise pause all queues. - -p, --pause Pause queue - -u, --unpause Unpause queue - -P, --priority PRIORITY - Change priority of queued jobs - -U, --user USER User who owns the jobs - -R, --run-name RUN_NAME - Used to change priority of all jobs in the run. -""" - - -def main(): - args = docopt.docopt(doc) - teuthology.paddles_queue.main(args) diff --git a/scripts/queue.py b/scripts/queue.py index 2c466a7be9..b8c7e9d293 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,15 +1,16 @@ import docopt -import teuthology.config import teuthology.queue.beanstalk import teuthology.queue.paddles +from teuthology.config import config doc = """ usage: teuthology-queue -h - teuthology-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-queue [-s|-d|-f] -m MACHINE_TYPE teuthology-queue [-r] -m MACHINE_TYPE teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER] + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -29,9 +30,17 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs (only in Paddles queues) + -U, --user USER User who owns the jobs + -R, --run-name RUN_NAME + Used to change priority of all jobs in the run. """ def main(): args = docopt.docopt(doc) - teuthology.queue.main(args) + if config.queue_backend == 'beanstalk': + teuthology.queue.beanstalk.main(args) + else: + teuthology.queue.paddles.main(args) diff --git a/scripts/schedule.py b/scripts/schedule.py index e9f0c1f5ff..5503f71b77 100644 --- a/scripts/schedule.py +++ b/scripts/schedule.py @@ -21,7 +21,6 @@ Queue backend name, use prefix '@' to append job config to the given file path as yaml. - [default: paddles] -n , --name Name of suite run the job is part of -d , --description Job description -o , --owner Job owner diff --git a/scripts/worker.py b/scripts/worker.py index 8d3228d8d0..a3e12c20d7 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -9,7 +9,7 @@ def main(): def parse_args(): parser = argparse.ArgumentParser(description=""" -Grab jobs from a paddles queue and run the teuthology tests they +Grab jobs from a beanstalk queue and run the teuthology tests they describe. One job is run at a time. """) parser.add_argument( @@ -29,8 +29,8 @@ def parse_args(): required=True, ) parser.add_argument( - '-m', '--machine-type', - help='which machine type the jobs will run on', + '-t', '--tube', + help='which beanstalk tube to read jobs from', required=True, ) diff --git a/teuthology/config.py b/teuthology/config.py index 43fa0fff9b..a5ca15da2c 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -143,7 +143,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, - 'backend': 'beanstalk', + 'queue_backend': 'beanstalk', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index 8f2fd76bac..117e5082a9 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -7,7 +7,6 @@ from datetime import datetime from typing import Dict, List -from time import sleep from teuthology import ( # non-modules @@ -78,11 +77,15 @@ def main(args): return supervisor.main(args) verbose = args["--verbose"] - machine_type = args["--machine-type"] + tube = args["--tube"] log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] exit_on_empty_queue = args["--exit-on-empty-queue"] - backend = args['--queue-backend'] + machine_type = args['--machine-type'] + backend = teuth_config.queue_backend + + if backend is None: + backend = 'beanstalk' if archive_dir is None: archive_dir = teuth_config.archive_base @@ -97,13 +100,14 @@ def main(args): if machine_type is None and teuth_config.machine_type is None: return + # setup logging for disoatcher in {log_dir} loglevel = logging.INFO if verbose: loglevel = logging.DEBUG logging.getLogger().setLevel(loglevel) log.setLevel(loglevel) - log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}") + log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}") setup_log_file(log_file_path) install_except_hook() @@ -111,7 +115,7 @@ def main(args): if backend == 'beanstalk': connection = beanstalk.connect() - beanstalk.watch_tube(connection, machine_type) + beanstalk.watch_tube(connection, tube) result_proc = None @@ -141,21 +145,25 @@ def main(args): if rc is not None: worst_returncode = max([worst_returncode, rc]) job_procs.remove(proc) - if config.backend == 'beanstalk': + if teuth_config.queue_backend == 'beanstalk': job = connection.reserve(timeout=60) + if job is not None: + job_id = job.jid + job_config = yaml.safe_load(job.body) else: job = report.get_queued_job(machine_type) + if job is not None: + job = clean_config(job) + job_id = job.get('job_id') + job_config = job if job is None: if exit_on_empty_queue and not job_procs: log.info("Queue is empty and no supervisor processes running; exiting!") break continue - job = clean_config(job) - report.try_push_job_info(job, dict(status='running')) - job_id = job.get('job_id') + report.try_push_job_info(job_config, dict(status='running')) log.info('Reserved job %s', job_id) - log.info('Config is: %s', job) - job_config = job + log.info('Config is: %s', job_config) if job_config.get('stop_worker'): keep_running = False @@ -268,18 +276,3 @@ def create_job_archive(job_name, job_archive_path, archive_dir): if not os.path.exists(run_archive): safepath.makedirs('/', run_archive) safepath.makedirs('/', job_archive_path) - - -def pause_queue(machine_type, paused, paused_by, pause_duration=None): - if paused: - report.pause_queue(machine_type, paused, paused_by, pause_duration) - ''' - If there is a pause duration specified - un-pause the queue after the time elapses - ''' - if pause_duration is not None: - sleep(int(pause_duration)) - paused = False - report.pause_queue(machine_type, paused, paused_by) - elif not paused: - report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 2d43c1cf0a..5469c99037 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -88,9 +88,8 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): if teuth_config.results_server: try: report.try_delete_jobs(job_config['name'], job_config['job_id']) - except Exception as e: - log.warning("Unable to delete job %s, exception occurred: %s", - job_config['job_id'], e) + except Exception: + log.exception("Unable to delete job %s", job_config['job_id']) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), @@ -148,7 +147,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if 'description' in job_config: + if job_config.get('description') is not None: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/exporter.py b/teuthology/exporter.py index b688d4d755..f26ddf64f3 100644 --- a/teuthology/exporter.py +++ b/teuthology/exporter.py @@ -6,10 +6,10 @@ from pathlib import Path -import teuthology.beanstalk as beanstalk import teuthology.dispatcher from teuthology.config import config from teuthology.lock.query import list_locks +from teuthology.queue import beanstalk log = logging.getLogger(__name__) diff --git a/teuthology/kill.py b/teuthology/kill.py index dbbd4fabc5..ab302e778f 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -117,7 +117,7 @@ def find_run_info(serializer, run_name): if not os.path.isdir(job_dir): continue job_num += 1 - if config.backend == 'beanstalk': + if config.queue_backend == 'beanstalk': beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): diff --git a/teuthology/orchestra/run.py b/teuthology/orchestra/run.py index 6235b0d36e..f31dfd0d7f 100644 --- a/teuthology/orchestra/run.py +++ b/teuthology/orchestra/run.py @@ -182,6 +182,7 @@ def _raise_for_status(self): command=self.command, exitstatus=self.returncode, node=self.hostname, label=self.label ) + def _get_exitstatus(self): """ :returns: the remote command's exit status (return code). Note that diff --git a/teuthology/paddles_queue.py b/teuthology/paddles_queue.py deleted file mode 100644 index 0ecc81a37a..0000000000 --- a/teuthology/paddles_queue.py +++ /dev/null @@ -1,219 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config -from teuthology.dispatcher import pause_queue - - -log = logging.getLogger(__name__) - - -def connect(): - host = config.queue_host - port = config.queue_port - if host is None or port is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.teuthology_yaml)) - return beanstalkc.Connection(host=host, port=port, parse_yaml=yaml.safe_load) - - -def watch_tube(connection, tube_name): - """ - Watch a given tube, potentially correcting to 'multi' if necessary. Returns - the tube_name that was actually used. - """ - if ',' in tube_name: - log.debug("Correcting tube name to 'multi'") - tube_name = 'multi' - connection.watch(tube_name) - connection.ignore('default') - return tube_name - - -def walk_jobs(connection, tube_name, processor, pattern=None): - """ - def callback(jobs_dict) - """ - log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(tube_name)['current-jobs-ready'] - if job_count == 0: - log.info('No jobs in Beanstalk Queue') - return - -def stats_queue(machine_type): - stats = report.get_queue_stats(machine_type) - if stats['paused'] is None: - log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) - else: - log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) - - -def update_priority(machine_type, priority, user, run_name=None): - if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) - for job in jobs: - job['priority'] = priority - report.try_push_job_info(job) - - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - - -def walk_jobs(machine_type, processor, user): - log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] - - jobs = report.get_user_jobs_queue(machine_type, user) - if job_count == 0: - log.info('No jobs in queue') - return - - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = jobs[i-1] - if job is None: - continue - job_id = job['job_id'] - processor.add_job(job_id, job) - end_progress() - processor.complete() - - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - machine_type = args['--machine_type'] - user = args['--user'] - run_name = args['--run_name'] - priority = args['--priority'] - status = args['--status'] - delete = args['--delete'] - runs = args['--runs'] - show_desc = args['--description'] - full = args['--full'] - pause = args['--pause'] - unpause = args['--unpause'] - pause_duration = args['--time'] - try: - if status: - stats_queue(machine_type) - elif pause: - if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - else: - pause_queue(machine_type, pause, user) - elif unpause: - pause = False - pause_queue(machine_type, pause, user) - elif priority: - update_priority(machine_type, priority, user, run_name) - elif delete: - walk_jobs(machine_type, - JobDeleter(delete), user) - elif runs: - walk_jobs(machine_type, - RunPrinter(), user) - else: - walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), - user) - except KeyboardInterrupt: - log.info("Interrupted.") diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py index 2a0b6ff363..e69de29bb2 100644 --- a/teuthology/queue/__init__.py +++ b/teuthology/queue/__init__.py @@ -1,106 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config - -log = logging.getLogger(__name__) - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - if config.backend == 'paddles': - paddles.main(args) - else: - beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py index 90b1cbd6d3..6f99d3405c 100644 --- a/teuthology/queue/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -1,12 +1,11 @@ import beanstalkc +import json import yaml import logging -import pprint -import sys -from collections import OrderedDict from teuthology.config import config -from teuthology import report +from teuthology.queue import util + log = logging.getLogger(__name__) @@ -47,7 +46,7 @@ def callback(jobs_dict) # Try to figure out a sane timeout based on how many jobs are in the queue timeout = job_count / 2000.0 * 60 for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = connection.reserve(timeout=timeout) if job is None or job.body is None: continue @@ -57,7 +56,7 @@ def callback(jobs_dict) if pattern is not None and pattern not in job_name: continue processor.add_job(job_id, job_config, job) - end_progress() + util.end_progress() processor.complete() @@ -100,18 +99,18 @@ def main(args): # it is not needed for pausing tubes watch_tube(connection, machine_type) if status: - print(stats_tube(connection, machine_type)) + print(json.dumps(stats_tube(connection, machine_type))) elif pause_duration: pause_tube(connection, machine_type, pause_duration) elif delete: walk_jobs(connection, machine_type, - JobDeleter(delete)) + util.JobDeleter(delete)) elif runs: walk_jobs(connection, machine_type, - RunPrinter()) + util.RunPrinter()) else: walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) + util.JobPrinter(show_desc=show_desc, full=full)) except KeyboardInterrupt: log.info("Interrupted.") finally: diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py index f2ea8b84c8..5a39f0c8d8 100644 --- a/teuthology/queue/paddles.py +++ b/teuthology/queue/paddles.py @@ -1,32 +1,25 @@ import logging -import pprint -import sys -from collections import OrderedDict +import json from teuthology import report -from teuthology.dispatcher import pause_queue - +from teuthology.queue import util log = logging.getLogger(__name__) def stats_queue(machine_type): stats = report.get_queue_stats(machine_type) - if stats['paused'] is None: - log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) - else: - log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) + result = dict( + name=stats['queue'], + count=stats['queued_jobs'], + paused=str(stats['paused']), + ) + return result -def update_priority(machine_type, priority, user, run_name=None): +def update_priority(machine_type, priority, run_name=None): if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) + jobs = report.get_jobs_by_run(machine_type, run_name) for job in jobs: job['priority'] = priority report.try_push_job_info(job) @@ -34,55 +27,54 @@ def update_priority(machine_type, priority, user, run_name=None): def walk_jobs(machine_type, processor, user): log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] + job_count = report.get_queue_stats(machine_type)['queued_jobs'] jobs = report.get_user_jobs_queue(machine_type, user) if job_count == 0: - log.info('No jobs in queue') + log.info('No jobs in Paddles queue') return for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") + util.print_progress(i, job_count, "Loading") job = jobs[i-1] if job is None: continue job_id = job['job_id'] processor.add_job(job_id, job) - end_progress() + util.end_progress() processor.complete() def main(args): machine_type = args['--machine_type'] - #user = args['--user'] - #run_name = args['--run_name'] - #priority = args['--priority'] + user = args['--user'] + run_name = args['--run-name'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] pause_duration = args['--pause'] - #unpause = args['--unpause'] - #pause_duration = args['--time'] + priority = args['--priority'] try: if status: - stats_queue(machine_type) - if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - #else: - #pause_queue(machine_type, pause, user) + print(json.dumps(stats_queue(machine_type))) + elif pause_duration: + if not user: + log.info('Please enter user to pause Paddles queue') + return + report.pause_queue(machine_type, user, pause_duration) elif priority: update_priority(machine_type, priority, run_name) elif delete: walk_jobs(machine_type, - JobDeleter(delete), user) + util.JobDeleter(delete), user) elif runs: walk_jobs(machine_type, - RunPrinter(), user) + util.RunPrinter(), user) else: walk_jobs(machine_type, - JobPrinter(show_desc=show_desc, full=full), + util.JobPrinter(show_desc=show_desc, full=full), user) except KeyboardInterrupt: log.info("Interrupted.") diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py new file mode 100644 index 0000000000..2a7642e726 --- /dev/null +++ b/teuthology/queue/util.py @@ -0,0 +1,101 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report + +log = logging.getLogger(__name__) + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) diff --git a/teuthology/report.py b/teuthology/report.py index d22e614475..da72073005 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -283,7 +283,6 @@ def write_new_job(self, run_name, job_info): sleep=1, increment=inc, action=f'write job for {run_name}') as proceed: while proceed(): response = self.session.post(run_uri, data=job_json, headers=headers) - if response.status_code == 200: resp_json = response.json() job_id = resp_json['job_id'] @@ -322,8 +321,8 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): """ if job_info is not None and not isinstance(job_info, dict): raise TypeError("job_info must be a dict") - run_uri = "{base}/runs/{name}/jobs/".format( - base=self.base_uri, name=run_name,) + run_uri = "{base}/runs/{name}/jobs/{job_id}/".format( + base=self.base_uri, name=run_name, job_id=job_id) if job_info is None: job_info = self.serializer.job_info(run_name, job_id) if dead and get_status(job_info) is None: @@ -388,7 +387,7 @@ def last_run(self): def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri, queue=queue) inc = random.uniform(0, 1) with safe_while( @@ -522,7 +521,6 @@ def create_queue(self, queue): sleep=1, increment=inc, action=f'creating queue {queue}') as proceed: while proceed(): response = self.session.post(uri, data=queue_json, headers=headers) - if response.status_code == 200: self.log.info("Successfully created queue {queue}".format( queue=queue, @@ -545,13 +543,14 @@ def create_queue(self, queue): response.raise_for_status() - def update_queue(self, queue, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) + if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} @@ -580,34 +579,28 @@ def update_queue(self, queue, paused, paused_by, pause_duration=None): def queue_stats(self, queue): - uri = "{base}/queue/stats/".format( - base=self.base_uri + uri = "{base}/queue/stats/?queue={queue}".format( + base=self.base_uri, + queue=queue ) - queue_info = {'queue': queue} - queue_json = json.dumps(queue_info) - - headers = {'content-type': 'application/json'} inc = random.uniform(0, 1) with safe_while( sleep=1, increment=inc, action=f'stats for queue {queue}') as proceed: while proceed(): - response = self.session.post(uri, data=queue_json, headers=headers) - + response = self.session.get(uri) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {queue}".format( - queue=queue, - )) return response.json() else: msg = response.text self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( + "GET to {uri} failed with status {status}: {msg}".format( uri=uri, status=response.status_code, msg=msg, )) response.raise_for_status() - + + def queued_jobs(self, queue, user, run_name): uri = "{base}/queue/queued_jobs/".format( base=self.base_uri @@ -668,12 +661,18 @@ def get_user_jobs_queue(queue, user, run_name=None): return return reporter.queued_jobs(queue, user, run_name) +def get_jobs_by_run(queue, run_name): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, None, run_name) -def pause_queue(queue, paused, paused_by, pause_duration=None): + +def pause_queue(queue, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(queue, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused_by, pause_duration) def is_queue_paused(queue): @@ -713,7 +712,7 @@ def push_job_info(run_name, job_id, job_info, base_uri=None): teuthology.exporter.JobResults.record(job_info["machine_type"], status) -def get_queued_job(machine_type): +def get_queued_job(queue): """ Retrieve a job that is queued depending on priority @@ -722,10 +721,6 @@ def get_queued_job(machine_type): reporter = ResultsReporter() if not reporter.base_uri: return - if ',' in machine_type: - queue = 'multi' - else: - queue = machine_type if is_queue_paused(queue) == True: log.info("Teuthology queue %s is currently paused", queue) diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 81dd4d548f..bf9c27886c 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,9 +1,10 @@ import os import yaml -import teuthology.beanstalk +import teuthology.queue.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report +from teuthology.config import config def main(args): @@ -27,7 +28,7 @@ def main(args): if not name or name.isdigit(): raise ValueError("Please use a more descriptive value for --name") job_config = build_config(args) - backend = args['--queue-backend'] + backend = config.queue_backend if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) elif backend.startswith('@'): @@ -115,9 +116,9 @@ def beanstalk_schedule_job(job_config, backend, num=1): """ num = int(num) tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() + beanstalk = teuthology.queue.beanstalk.connect() beanstalk.use(tube) - queue = report.create_machine_type_queue(job_config['machine_type']) + queue = report.create_machine_type_queue(tube) job_config['queue'] = queue while num > 0: job_id = report.try_create_job(job_config, dict(status='queued')) diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 9a6d0ff564..ce4e55d851 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -1,8 +1,6 @@ -from teuthology import dispatcher from unittest.mock import patch, Mock from teuthology import report -import unittest.mock as mock import unittest @@ -35,74 +33,3 @@ def test_mock_get_queue_job(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.json(), job_config) - - - @patch("teuthology.worker.fetch_teuthology") - @patch("teuthology.dispatcher.fetch_qa_suite") - @patch("teuthology.worker.fetch_qa_suite") - @patch("teuthology.dispatcher.report.get_queued_job") - @patch("teuthology.dispatcher.report.try_push_job_info") - @patch("teuthology.dispatcher.setup_log_file") - @patch("os.path.isdir") - @patch("os.getpid") - @patch("teuthology.dispatcher.teuth_config") - @patch("subprocess.Popen") - @patch("os.path.join") - @patch("teuthology.dispatcher.create_job_archive") - @patch("yaml.safe_dump") - def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, - m_worker_fetch_qa_suite, m_get_queued_job, - m_try_push_job_info, - m_setup_log, - m_isdir, m_getpid, - m_t_config, m_popen, m_join, m_create_archive, m_yaml_dump): - - args = { - '--owner': 'the_owner', - '--archive-dir': '/archive/dir', - '--log-dir': '/worker/log', - '--name': 'the_name', - '--description': 'the_description', - '--machine-type': 'test_queue', - '--supervisor': False, - '--verbose': False, - '--queue-backend': 'paddles' - } - - m = mock.MagicMock() - job_id = {'job_id': '1'} - m.__getitem__.side_effect = job_id.__getitem__ - m.__iter__.side_effect = job_id.__iter__ - job = { - 'job_id': '1', - 'description': 'DESC', - 'email': 'EMAIL', - 'first_in_suite': False, - 'last_in_suite': True, - 'machine_type': 'test_queue', - 'name': 'NAME', - 'owner': 'OWNER', - 'priority': 99, - 'results_timeout': '6', - 'verbose': False, - 'stop_worker': True, - 'archive_path': '/archive/dir/NAME/1' - } - - m_fetch_teuthology.return_value = '/teuth/path' - m_fetch_qa_suite.return_value = '/suite/path' - m_isdir.return_value = True - mock_get_patcher = patch('teuthology.dispatcher.report.get_queued_job') - mock_get = mock_get_patcher.start() - mock_get.return_value = job - - mock_prep_job_patcher = patch('teuthology.dispatcher.prep_job') - mock_prep_job = mock_prep_job_patcher.start() - mock_prep_job.return_value = (job, '/teuth/bin/path') - m_yaml_dump.return_value = '' - - m_try_push_job_info.called_once_with(job, dict(status='running')) - dispatcher.main(args) - mock_get_patcher.stop() - - diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py index 4905b57aee..8db33e1db9 100644 --- a/teuthology/test/test_worker.py +++ b/teuthology/test/test_worker.py @@ -1,4 +1,3 @@ -import beanstalkc import os from unittest.mock import patch, Mock, MagicMock @@ -275,49 +274,11 @@ def test_main_loop( job.bury.assert_called_once_with() job.delete.assert_called_once_with() - @patch("teuthology.worker.run_job") - @patch("teuthology.worker.prep_job") - @patch("beanstalkc.Job", autospec=True) - @patch("teuthology.worker.fetch_qa_suite") - @patch("teuthology.worker.fetch_teuthology") - @patch("teuthology.worker.beanstalk.watch_tube") - @patch("teuthology.worker.beanstalk.connect") - @patch("os.path.isdir", return_value=True) - @patch("teuthology.worker.setup_log_file") - def test_main_loop( - self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, - m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job, - ): - m_connection = Mock() - jobs = self.build_fake_jobs( - m_connection, - m_job, - [ - 'foo: bar', - 'stop_worker: true', - ], - ) - m_connection.reserve.side_effect = jobs - m_connect.return_value = m_connection - m_prep_job.return_value = (dict(), '/bin/path') - worker.main(self.ctx) - # There should be one reserve call per item in the jobs list - expected_reserve_calls = [ - dict(timeout=60) for i in range(len(jobs)) - ] - got_reserve_calls = [ - call[1] for call in m_connection.reserve.call_args_list - ] - assert got_reserve_calls == expected_reserve_calls - for job in jobs: - job.bury.assert_called_once_with() - job.delete.assert_called_once_with() - @patch("teuthology.worker.report.try_push_job_info") @patch("teuthology.worker.run_job") @patch("beanstalkc.Job", autospec=True) - @patch("teuthology.worker.fetch_qa_suite") - @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.repo_utils.fetch_qa_suite") + @patch("teuthology.repo_utils.fetch_teuthology") @patch("teuthology.worker.beanstalk.watch_tube") @patch("teuthology.worker.beanstalk.connect") @patch("os.path.isdir", return_value=True)