From 5c0d20238d5c93a98706de25f44d43ac4df85bef Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Mon, 4 Oct 2021 19:24:41 +0530 Subject: [PATCH] Add beanstalk as a possible queue backend for Teuthology Jobs along with Paddles With the use of the --queue-backend argument the user can specify which backend(paddles/beanstalk) they would like to use for maintaining the teuthology Jobs queue. In order to avoid overlapping Job IDs, when a job is being scheduled in beanstalk it is also written to paddles which returns a unique ID. This is the ID teuthology will treat as the Job ID throughout the run of the job. To differentiate between the 2 queue backends, the teuthology-queue command has been split into teuthology-paddles-queue command and teuthology-beanstalk-queue command. Signed-off-by: Aishwarya Mathuria --- scripts/beanstalk_queue.py | 35 ++++ scripts/dispatcher.py | 5 +- scripts/{queue.py => paddles_queue.py} | 16 +- setup.py | 4 +- teuthology/beanstalk.py | 214 +++++++++++++++++++++++++ teuthology/config.py | 1 + teuthology/dispatcher/__init__.py | 48 ++++-- teuthology/dispatcher/supervisor.py | 6 + teuthology/kill.py | 42 ++++- teuthology/report.py | 4 +- teuthology/schedule.py | 51 ++++-- teuthology/test/test_dispatcher.py | 3 +- teuthology/test/test_worker.py | 135 ++++++++++------ teuthology/worker.py | 30 ++-- 14 files changed, 493 insertions(+), 101 deletions(-) create mode 100644 scripts/beanstalk_queue.py rename scripts/{queue.py => paddles_queue.py} (71%) create mode 100644 teuthology/beanstalk.py diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py new file mode 100644 index 0000000000..88a8242847 --- /dev/null +++ b/scripts/beanstalk_queue.py @@ -0,0 +1,35 @@ +import docopt + +import teuthology.config +import teuthology.beanstalk + +doc = """ +usage: teuthology-beanstalk-queue -h + teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE + teuthology-beanstalk-queue [-r] -m MACHINE_TYPE + teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN + teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE] +List Jobs in queue. +If -D is passed, then jobs with PATTERN in the job name are deleted from the +queue. +Arguments: + -m, --machine_type MACHINE_TYPE [default: multi] + Which machine type queue to work on. +optional arguments: + -h, --help Show this help message and exit + -D, --delete PATTERN Delete Jobs with PATTERN in their name + -d, --description Show job descriptions + -r, --runs Only show run names + -f, --full Print the entire job config. Use with caution. + -s, --status Prints the status of the queue + -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 + will unpause. If -m is passed, pause that queue, + otherwise pause all queues. +""" + + +def main(): + + args = docopt.docopt(doc) + print(args) + teuthology.beanstalk.main(args) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index c766de4307..21df578429 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,9 +1,9 @@ """ usage: teuthology-dispatcher --help teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR - teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE + teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND -Start a dispatcher for the specified machine type. Grab jobs from a paddles +Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is a teuthology-dispatcher command run in supervisor mode. @@ -21,6 +21,7 @@ --supervisor run dispatcher in job supervisor mode --bin-path BIN_PATH teuthology bin path --job-config CONFIG file descriptor of job's config file + --queue-backend BACKEND choose between paddles and beanstalk """ import docopt diff --git a/scripts/queue.py b/scripts/paddles_queue.py similarity index 71% rename from scripts/queue.py rename to scripts/paddles_queue.py index a07598a92f..6ecfe43406 100644 --- a/scripts/queue.py +++ b/scripts/paddles_queue.py @@ -4,14 +4,14 @@ import teuthology.paddles_queue doc = """ -usage: teuthology-queue -h - teuthology-queue -s -m MACHINE_TYPE - teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-queue [-r] -m MACHINE_TYPE -U USER - teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-queue -u -m MACHINE_TYPE -U USER +usage: teuthology-paddles-queue -h + teuthology-paddles-queue -s -m MACHINE_TYPE + teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] + teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER + teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER + teuthology-paddles-queue -u -m MACHINE_TYPE -U USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the diff --git a/setup.py b/setup.py index c4c33267ca..674f75a282 100644 --- a/setup.py +++ b/setup.py @@ -72,6 +72,7 @@ 'orchestra': [ # For apache-libcloud when using python < 2.7.9 'backports.ssl_match_hostname', + 'beanstalkc3 >= 0.4.0', 'httplib2', 'ndg-httpsclient', # for requests, urllib3 'pyasn1', # for requests, urllib3 @@ -125,7 +126,8 @@ 'teuthology-results = scripts.results:main', 'teuthology-report = scripts.report:main', 'teuthology-kill = scripts.kill:main', - 'teuthology-queue = scripts.queue:main', + 'teuthology-paddles-queue = scripts.paddles_queue:main', + 'teuthology-beanstalk-queue = scripts.beanstalk_queue:main', 'teuthology-prune-logs = scripts.prune_logs:main', 'teuthology-describe = scripts.describe:main', 'teuthology-reimage = scripts.reimage:main', diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py new file mode 100644 index 0000000000..a1165becca --- /dev/null +++ b/teuthology/beanstalk.py @@ -0,0 +1,214 @@ +import beanstalkc +import yaml +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology.config import config +from teuthology import report + +log = logging.getLogger(__name__) + + +def connect(): + host = config.queue_host + port = config.queue_port + if host is None or port is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.teuthology_yaml)) + return beanstalkc.Connection(host=host, port=port) + + +def watch_tube(connection, tube_name): + """ + Watch a given tube, potentially correcting to 'multi' if necessary. Returns + the tube_name that was actually used. + """ + if ',' in tube_name: + log.debug("Correcting tube name to 'multi'") + tube_name = 'multi' + connection.watch(tube_name) + connection.ignore('default') + return tube_name + + +def walk_jobs(connection, tube_name, processor, pattern=None): + """ + def callback(jobs_dict) + """ + log.info("Checking Beanstalk Queue...") + job_count = connection.stats_tube(tube_name)['current-jobs-ready'] + if job_count == 0: + log.info('No jobs in Beanstalk Queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = connection.reserve(timeout=timeout) + if job is None or job.body is None: + continue + job_config = yaml.safe_load(job.body) + job_name = job_config['name'] + job_id = job.stats()['id'] + if pattern is not None and pattern not in job_name: + continue + processor.add_job(job_id, job_config, job) + end_progress() + processor.complete() + + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + job_obj = self.jobs[job_id].get('job_obj') + if job_obj: + job_obj.delete() + report.try_delete_jobs(job_name, job_id) + + +def pause_tube(connection, tube, duration): + duration = int(duration) + if not tube: + tubes = sorted(connection.tubes()) + else: + tubes = [tube] + + prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" + templ = prefix + ": {tubes}" + log.info(templ.format(dur=duration, tubes=tubes)) + for tube in tubes: + connection.pause_tube(tube, duration) + + +def stats_tube(connection, tube): + stats = connection.stats_tube(tube) + result = dict( + name=tube, + count=stats['current-jobs-ready'], + paused=(stats['pause'] != 0), + ) + return result + + +def main(args): + machine_type = args['--machine_type'] + status = args['--status'] + delete = args['--delete'] + runs = args['--runs'] + show_desc = args['--description'] + full = args['--full'] + pause_duration = args['--pause'] + try: + connection = connect() + if machine_type and not pause_duration: + # watch_tube needs to be run before we inspect individual jobs; + # it is not needed for pausing tubes + watch_tube(connection, machine_type) + if status: + print(stats_tube(connection, machine_type)) + elif pause_duration: + pause_tube(connection, machine_type, pause_duration) + elif delete: + walk_jobs(connection, machine_type, + JobDeleter(delete)) + elif runs: + walk_jobs(connection, machine_type, + RunPrinter()) + else: + walk_jobs(connection, machine_type, + JobPrinter(show_desc=show_desc, full=full)) + except KeyboardInterrupt: + log.info("Interrupted.") + finally: + connection.close() diff --git a/teuthology/config.py b/teuthology/config.py index 18e26d2cc3..4a2c7b13d9 100644 --- a/teuthology/config.py +++ b/teuthology/config.py @@ -140,6 +140,7 @@ class TeuthologyConfig(YamlConfig): 'archive_upload_key': None, 'archive_upload_url': None, 'automated_scheduling': False, + 'backend': 'paddles', 'reserve_machines': 5, 'ceph_git_base_url': 'https://github.com/ceph/', 'ceph_git_url': None, diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index ce4144307c..718772f2eb 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -8,6 +8,7 @@ from time import sleep from teuthology import setup_log_file, install_except_hook +from teuthology import beanstalk from teuthology import report from teuthology.config import config as teuth_config from teuthology.exceptions import SkipJob @@ -57,6 +58,8 @@ def load_config(archive_dir=None): def clean_config(config): result = {} for key in config: + if key == 'status': + continue if config[key] is not None: result[key] = config[key] return result @@ -70,6 +73,7 @@ def main(args): machine_type = args["--machine-type"] log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] + backend = args['--queue-backend'] if archive_dir is None: archive_dir = teuth_config.archive_base @@ -87,6 +91,10 @@ def main(args): load_config(archive_dir=archive_dir) + if backend == 'beanstalk': + connection = beanstalk.connect() + beanstalk.watch_tube(connection, machine_type) + result_proc = None if teuth_config.teuthology_path is None: @@ -108,17 +116,26 @@ def main(args): stop() load_config() + if backend == 'beanstalk': + job = connection.reserve(timeout=60) + if job is None: + continue + job.bury() + job_config = yaml.safe_load(job.body) + job_id = job_config.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job.body) + else: + job = report.get_queued_job(machine_type) + if job is None: + continue + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job - job = report.get_queued_job(machine_type) - if job is None: - continue - job = clean_config(job) - report.try_push_job_info(job, dict(status='running')) - job_id = job.get('job_id') - log.info('Reserved job %s', job_id) - log.info('Config is: %s', job) - job_config = job - if job_config.get('stop_worker'): keep_running = False @@ -168,6 +185,13 @@ def main(args): status='fail', failure_reason=error_message)) + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + if backend == 'beanstalk': + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") def lock_machines(job_config): @@ -189,7 +213,7 @@ def create_job_archive(job_name, job_archive_path, archive_dir): def pause_queue(machine_type, paused, paused_by, pause_duration=None): - if paused == True: + if paused: report.pause_queue(machine_type, paused, paused_by, pause_duration) ''' If there is a pause duration specified @@ -199,5 +223,5 @@ def pause_queue(machine_type, paused, paused_by, pause_duration=None): sleep(int(pause_duration)) paused = False report.pause_queue(machine_type, paused, paused_by) - elif paused == False: + elif not paused: report.pause_queue(machine_type, paused, paused_by) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 5e03eb0315..31769dab35 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -65,6 +65,12 @@ def main(args): def run_job(job_config, teuth_bin_path, archive_dir, verbose): safe_archive = safepath.munge(job_config['name']) if job_config.get('first_in_suite') or job_config.get('last_in_suite'): + if teuth_config.results_server: + try: + report.try_delete_jobs(job_config['name'], job_config['job_id']) + except Exception as e: + log.warning("Unable to delete job %s, exception occurred: %s", + job_config['job_id'], e) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), diff --git a/teuthology/kill.py b/teuthology/kill.py index ac00795d17..cfa90e3e3a 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -8,8 +8,9 @@ import logging import getpass - +from teuthology import beanstalk from teuthology import report +from teuthology.config import config from teuthology import misc log = logging.getLogger(__name__) @@ -57,6 +58,7 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None, "you must also pass --machine-type") if not preserve_queue: + remove_beanstalk_jobs(run_name, machine_type) remove_paddles_jobs(run_name) kill_processes(run_name, run_info.get('pids')) if owner is not None: @@ -93,10 +95,13 @@ def find_run_info(serializer, run_name): job_info = {} job_num = 0 jobs = serializer.jobs_for_run(run_name) + job_total = len(jobs) for (job_id, job_dir) in jobs.items(): if not os.path.isdir(job_dir): continue job_num += 1 + if config.backend == 'beanstalk': + beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): if key in run_info_fields and key not in run_info: @@ -115,6 +120,41 @@ def remove_paddles_jobs(run_name): report.try_delete_jobs(run_name, job_ids) +def remove_beanstalk_jobs(run_name, tube_name): + qhost = config.queue_host + qport = config.queue_port + if qhost is None or qport is None: + raise RuntimeError( + 'Beanstalk queue information not found in {conf_path}'.format( + conf_path=config.yaml_path)) + log.info("Checking Beanstalk Queue...") + beanstalk_conn = beanstalk.connect() + real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name) + + curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready'] + if curjobs != 0: + x = 1 + while x != curjobs: + x += 1 + job = beanstalk_conn.reserve(timeout=20) + if job is None: + continue + job_config = yaml.safe_load(job.body) + if run_name == job_config['name']: + job_id = job_config['job_id'] + msg = "Deleting job from queue. ID: " + \ + "{id} Name: {name} Desc: {desc}".format( + id=str(job_id), + name=job_config['name'], + desc=job_config['description'], + ) + log.info(msg) + job.delete() + else: + print("No jobs in Beanstalk Queue") + beanstalk_conn.close() + + def kill_processes(run_name, pids=None): if pids: to_kill = set(pids).intersection(psutil.pids()) diff --git a/teuthology/report.py b/teuthology/report.py index c149632b0b..83b0cc0d51 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -737,8 +737,8 @@ def try_push_job_info(job_config, extra_info=None): job_id = job_config['job_id'] if extra_info is not None: - job_info = extra_info.copy() - job_info.update(job_config) + job_info = job_config.copy() + job_info.update(extra_info) else: job_info = job_config diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 3a2b875b2a..db12b5d63d 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,6 +1,7 @@ import os import yaml +import teuthology.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -22,11 +23,6 @@ def main(args): if args[opt]: raise ValueError(msg_fmt.format(opt=opt)) - if args['--first-in-suite'] or args['--last-in-suite']: - report_status = False - else: - report_status = True - name = args['--name'] if not name or name.isdigit(): raise ValueError("Please use a more descriptive value for --name") @@ -34,13 +30,15 @@ def main(args): backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'paddles': - schedule_job(job_config, args['--num'], report_status) elif backend.startswith('@'): dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) + elif backend == 'paddles': + paddles_schedule_job(job_config, args['--num']) + elif backend == 'beanstalk': + beanstalk_schedule_job(job_config, args['--num']) else: raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'paddles' or '@path-to-a-file" % backend) + "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend) def build_config(args): @@ -85,9 +83,9 @@ def build_config(args): return job_config -def schedule_job(job_config, num=1, report_status=True): +def paddles_schedule_job(job_config, backend, num=1): """ - Schedule a job. + Schedule a job with Paddles as the backend. :param job_config: The complete job dict :param num: The number of times to schedule the job @@ -98,14 +96,40 @@ def schedule_job(job_config, num=1, report_status=True): ''' report.create_machine_type_queue(job_config['machine_type']) while num > 0: - job_id = report.try_create_job(job_config, dict(status='queued')) - print('Job scheduled with name {name} and ID {job_id}'.format( + print('Job scheduled in Paddles with name {name} and ID {job_id}'.format( name=job_config['name'], job_id=job_id)) job_config['job_id'] = str(job_id) - + num -= 1 + +def beanstalk_schedule_job(job_config, backend, num=1): + """ + Schedule a job with Beanstalk as the backend. + + :param job_config: The complete job dict + :param num: The number of times to schedule the job + """ + num = int(num) + tube = job_config.pop('tube') + beanstalk = teuthology.beanstalk.connect() + beanstalk.use(tube) + report.create_machine_type_queue(job_config['machine_type']) + while num > 0: + job_id = report.try_create_job(job_config, dict(status='queued')) + job_config['job_id'] = str(job_id) + job = yaml.safe_dump(job_config) + _ = beanstalk.put( + job, + ttr=60 * 60 * 24, + priority=job_config['priority'], + ) + print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) + num -= 1 + + def dump_job_to_file(path, job_config, num=1): """ Schedule a job. @@ -133,4 +157,3 @@ def dump_job_to_file(path, job_config, num=1): num -= 1 with open(count_file_path, 'w') as f: f.write(str(jid)) - diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py index 6b0dddfe2f..9a6d0ff564 100644 --- a/teuthology/test/test_dispatcher.py +++ b/teuthology/test/test_dispatcher.py @@ -65,7 +65,8 @@ def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite, '--description': 'the_description', '--machine-type': 'test_queue', '--supervisor': False, - '--verbose': False + '--verbose': False, + '--queue-backend': 'paddles' } m = mock.MagicMock() diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py index e3c831d7d8..141eb88ec5 100644 --- a/teuthology/test/test_worker.py +++ b/teuthology/test/test_worker.py @@ -1,3 +1,4 @@ +import beanstalkc import os from unittest.mock import patch, Mock, MagicMock @@ -43,7 +44,8 @@ def test_does_not_need_restart(self, m_datetime, m_exists, getmtime): @patch("os.symlink") def test_symlink_success(self, m_symlink): worker.symlink_worker_log("path/to/worker.log", "path/to/archive") - m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log") + m_symlink.assert_called_with( + "path/to/worker.log", "path/to/archive/worker.log") @patch("teuthology.worker.log") @patch("os.symlink") @@ -135,7 +137,8 @@ def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir, m_popen.return_value = m_p m_t_config.results_server = False worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) + m_symlink_log.assert_called_with( + config["worker_log"], config["archive_path"]) @patch("teuthology.worker.report.try_push_job_info") @patch("teuthology.worker.symlink_worker_log") @@ -151,7 +154,8 @@ def test_run_with_watchdog_no_reporting(self, m_sleep, m_symlink_log, m_try_push process = Mock() process.poll.return_value = "not None" worker.run_with_watchdog(process, config) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) + m_symlink_log.assert_called_with( + config["worker_log"], config["archive_path"]) m_try_push.assert_called_with( dict(name=config["name"], job_id=config["job_id"]), dict(status='dead') @@ -160,7 +164,8 @@ def test_run_with_watchdog_no_reporting(self, m_sleep, m_symlink_log, m_try_push @patch("subprocess.Popen") @patch("teuthology.worker.symlink_worker_log") @patch("time.sleep") - def test_run_with_watchdog_with_reporting(self, m_sleep, m_symlink_log, m_popen): + @patch("teuthology.worker.report.try_push_job_info") + def test_run_with_watchdog_with_reporting(self, m_tpji, m_sleep, m_symlink_log, m_popen): config = { "name": "the_name", "job_id": "1", @@ -174,12 +179,15 @@ def test_run_with_watchdog_with_reporting(self, m_sleep, m_symlink_log, m_popen) m_proc.poll.return_value = "not None" m_popen.return_value = m_proc worker.run_with_watchdog(process, config) - m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"]) + m_symlink_log.assert_called_with( + config["worker_log"], config["archive_path"]) @patch("os.path.isdir") @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.worker.teuth_config") @patch("teuthology.worker.fetch_qa_suite") def test_prep_job(self, m_fetch_qa_suite, + m_teuth_config, m_fetch_teuthology, m_isdir): config = dict( name="the_name", @@ -190,6 +198,7 @@ def test_prep_job(self, m_fetch_qa_suite, m_fetch_teuthology.return_value = '/teuth/path' m_fetch_qa_suite.return_value = '/suite/path' m_isdir.return_value = True + m_teuth_config.teuthology_path = None got_config, teuth_bin_path = worker.prep_job( config, log_file_path, @@ -207,63 +216,97 @@ def test_prep_job(self, m_fetch_qa_suite, assert m_fetch_qa_suite.called_once_with_args(branch='master') assert got_config['suite_path'] == '/suite/path' - def build_fake_jobs(self, job_bodies): + def build_fake_jobs(self, m_connection, m_job, job_bodies): """ + Given patched copies of: + beanstalkc.Connection + beanstalkc.Job And a list of basic job bodies, return a list of mocked Job objects """ + # Make sure instantiating m_job returns a new object each time + m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job) jobs = [] job_id = 0 for job_body in job_bodies: job_id += 1 - job = {} - job['job_id'] = job_id - job['body'] = job_body + job = m_job(conn=m_connection, jid=job_id, body=job_body) + job.jid = job_id + job_body += '\njob_id: ' + str(job_id) + job.body = job_body jobs.append(job) return jobs - - @patch("teuthology.worker.setup_log_file") - @patch("os.path.isdir", return_value=True) - @patch("teuthology.worker.fetch_teuthology") - @patch("teuthology.worker.fetch_qa_suite") @patch("teuthology.worker.run_job") + @patch("teuthology.worker.prep_job") + @patch("beanstalkc.Job", autospec=True) + @patch("teuthology.worker.fetch_qa_suite") + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.worker.beanstalk.watch_tube") + @patch("teuthology.worker.beanstalk.connect") + @patch("os.path.isdir", return_value=True) + @patch("teuthology.worker.setup_log_file") + def test_main_loop( + self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, + m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job, + ): + m_connection = Mock() + jobs = self.build_fake_jobs( + m_connection, + m_job, + [ + 'foo: bar', + 'stop_worker: true', + ], + ) + m_connection.reserve.side_effect = jobs + m_connect.return_value = m_connection + m_prep_job.return_value = (dict(), '/bin/path') + worker.main(self.ctx) + # There should be one reserve call per item in the jobs list + expected_reserve_calls = [ + dict(timeout=60) for i in range(len(jobs)) + ] + got_reserve_calls = [ + call[1] for call in m_connection.reserve.call_args_list + ] + assert got_reserve_calls == expected_reserve_calls + for job in jobs: + job.bury.assert_called_once_with() + job.delete.assert_called_once_with() + @patch("teuthology.worker.report.try_push_job_info") - @patch("teuthology.worker.report.get_queued_job") - @patch("teuthology.worker.clean_config") + @patch("teuthology.worker.run_job") + @patch("beanstalkc.Job", autospec=True) + @patch("teuthology.worker.fetch_qa_suite") + @patch("teuthology.worker.fetch_teuthology") + @patch("teuthology.worker.beanstalk.watch_tube") + @patch("teuthology.worker.beanstalk.connect") + @patch("os.path.isdir", return_value=True) + @patch("teuthology.worker.setup_log_file") def test_main_loop_13925( - self, m_setup_log_file, m_isdir, - m_fetch_teuthology, m_fetch_qa_suite, m_run_job, - m_try_push_job_info, m_get_queued_job, m_clean_config - ): + self, m_setup_log_file, m_isdir, m_connect, m_watch_tube, + m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job, + m_try_push_job_info, + ): + m_connection = Mock() + jobs = self.build_fake_jobs( + m_connection, + m_job, + [ + 'name: name', + 'name: name\nstop_worker: true', + ], + ) + m_connection.reserve.side_effect = jobs + m_connect.return_value = m_connection m_fetch_qa_suite.side_effect = [ '/suite/path', MaxWhileTries(), MaxWhileTries(), ] - job = { - 'job_id': '1', - 'description': 'DESC', - 'email': 'EMAIL', - 'first_in_suite': False, - 'last_in_suite': True, - 'machine_type': 'test_queue', - 'name': 'NAME', - 'owner': 'OWNER', - 'priority': 99, - 'results_timeout': '6', - 'verbose': False, - 'stop_worker': True - } - m_get_queued_job.return_value = job - m_clean_config.return_value = job - - mock_prep_job_patcher = patch('teuthology.worker.prep_job') - mock_prep_job = mock_prep_job_patcher.start() - mock_prep_job.return_value = (dict(), '/teuth/bin/path') - worker.main(self.ctx) - mock_prep_job_patcher.stop() - assert len(m_run_job.call_args_list) == 1 - assert len(m_try_push_job_info.call_args_list) == 1 - assert m_try_push_job_info.called_once_with(job, dict(status='running')) - + assert len(m_run_job.call_args_list) == 0 + assert len(m_try_push_job_info.call_args_list) == len(jobs) + for i in range(len(jobs)): + push_call = m_try_push_job_info.call_args_list[i] + assert push_call[0][1]['status'] == 'dead' diff --git a/teuthology/worker.py b/teuthology/worker.py index ff68acf1c3..1bb7bbff55 100644 --- a/teuthology/worker.py +++ b/teuthology/worker.py @@ -9,6 +9,7 @@ from datetime import datetime from teuthology import setup_log_file, install_except_hook +from teuthology import beanstalk from teuthology import report from teuthology import safepath from teuthology.config import config as teuth_config @@ -57,14 +58,6 @@ def load_config(ctx=None): teuth_config.archive_base = ctx.archive_dir -def clean_config(config): - result = {} - for key in config: - if config[key] is not None: - result[key] = config[key] - return result - - def main(ctx): loglevel = logging.INFO if ctx.verbose: @@ -81,6 +74,8 @@ def main(ctx): set_config_attr(ctx) + connection = beanstalk.connect() + beanstalk.watch_tube(connection, ctx.tube) result_proc = None if teuth_config.teuthology_path is None: @@ -103,16 +98,16 @@ def main(ctx): load_config() - job = report.get_queued_job(ctx.machine_type) + job = connection.reserve(timeout=60) if job is None: continue - job = clean_config(job) - report.try_push_job_info(job, dict(status='running')) - job_id = job.get('job_id') + # bury the job so it won't be re-run if it fails + job.bury() + job_config = yaml.safe_load(job.body) + job_id = job_config.get('job_id') log.info('Reserved job %s', job_id) - log.info('Config is: %s', job) - job_config = job + log.info('Config is: %s', job.body) if job_config.get('stop_worker'): keep_running = False @@ -132,6 +127,13 @@ def main(ctx): except SkipJob: continue + # This try/except block is to keep the worker from dying when + # beanstalkc throws a SocketError + try: + job.delete() + except Exception: + log.exception("Saw exception while trying to delete job") + def prep_job(job_config, log_file_path, archive_dir): job_id = job_config['job_id']