Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish removing teuthology-worker #1960

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 57 additions & 30 deletions scripts/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,62 @@
"""
usage: teuthology-dispatcher --help
teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR
teuthology-dispatcher [-v] [--archive-dir DIR] [--exit-on-empty-queue] --log-dir LOG_DIR --tube TUBE

Start a dispatcher for the specified tube. Grab jobs from a beanstalk
queue and run the teuthology tests they describe as subprocesses. The
subprocess invoked is a teuthology-dispatcher command run in supervisor
mode.

Supervisor mode: Supervise the job run described by its config. Reimage
target machines and invoke teuthology command. Unlock the target machines
at the end of the run.

standard arguments:
-h, --help show this help message and exit
-v, --verbose be more verbose
-t, --tube TUBE which beanstalk tube to read jobs from
-l, --log-dir LOG_DIR path in which to store logs
-a DIR, --archive-dir DIR path to archive results in
--supervisor run dispatcher in job supervisor mode
--bin-path BIN_PATH teuthology bin path
--job-config CONFIG file descriptor of job's config file
--exit-on-empty-queue if the queue is empty, exit
"""

import docopt
import argparse
import sys

import teuthology.dispatcher
import teuthology.dispatcher.supervisor

from .supervisor import parse_args as parse_supervisor_args


def parse_args(argv):
parser = argparse.ArgumentParser(
description="Start a dispatcher for the specified tube. Grab jobs from a beanstalk queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is teuthology-supervisor."
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="be more verbose",
)
parser.add_argument(
"-a",
"--archive-dir",
type=str,
help="path to archive results in",
)
parser.add_argument(
"-t",
"--tube",
type=str,
help="which beanstalk tube to read jobs from",
required=True,
)
parser.add_argument(
"-l",
"--log-dir",
type=str,
help="path in which to store the dispatcher log",
required=True,
)
parser.add_argument(
"--exit-on-empty-queue",
action="store_true",
help="if the queue is empty, exit",
)
return parser.parse_args(argv)


def main():
args = docopt.docopt(__doc__)
sys.exit(teuthology.dispatcher.main(args))
if "--supervisor" in sys.argv:
# This is for transitional compatibility, so the old dispatcher can
# invoke the new supervisor. Once old dispatchers are phased out,
# this block can be as well.
sys.argv.remove("--supervisor")
sys.argv[0] = "teuthology-supervisor"
sys.exit(teuthology.dispatcher.supervisor.main(
parse_supervisor_args(sys.argv[1:])
))
else:
sys.exit(teuthology.dispatcher.main(parse_args(sys.argv[1:])))


if __name__ == "__main__":
main()
44 changes: 44 additions & 0 deletions scripts/supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
import sys

import teuthology.dispatcher.supervisor


def parse_args(argv):
parser = argparse.ArgumentParser(
description="Supervise and run a teuthology job; normally only run by the dispatcher",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="be more verbose",
)
parser.add_argument(
"-a",
"--archive-dir",
type=str,
help="path in which to store the job's logfiles",
required=True,
)
parser.add_argument(
"--bin-path",
type=str,
help="teuthology bin path",
required=True,
)
parser.add_argument(
"--job-config",
type=str,
help="file descriptor of job's config file",
required=True,
)
return parser.parse_args(argv)


def main():
sys.exit(teuthology.dispatcher.supervisor.main(parse_args(sys.argv[1:])))


if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions scripts/test/test_dispatcher_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from script import Script


class TestDispatcher(Script):
script_name = 'teuthology-dispatcher'
5 changes: 5 additions & 0 deletions scripts/test/test_supervisor_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from script import Script


class TestSupervisor(Script):
script_name = 'teuthology-supervisor'
5 changes: 0 additions & 5 deletions scripts/test/test_worker.py

This file was deleted.

37 changes: 0 additions & 37 deletions scripts/worker.py

This file was deleted.

1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ console_scripts =
teuthology-wait = scripts.wait:main
teuthology-exporter = scripts.exporter:main
teuthology-node-cleanup = scripts.node_cleanup:main
teuthology-supervisor = scripts.supervisor:main

[options.extras_require]
manhole =
Expand Down
107 changes: 85 additions & 22 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
exporter,
report,
repo_utils,
worker,
)
from teuthology.config import config as teuth_config
from teuthology.dispatcher import supervisor
from teuthology.exceptions import SkipJob
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology import safepath

Expand Down Expand Up @@ -66,21 +65,10 @@ def load_config(archive_dir=None):


def main(args):
# run dispatcher in job supervisor mode if --supervisor passed
if args["--supervisor"]:
return supervisor.main(args)

verbose = args["--verbose"]
tube = args["--tube"]
log_dir = args["--log-dir"]
archive_dir = args["--archive-dir"]
exit_on_empty_queue = args["--exit-on-empty-queue"]

if archive_dir is None:
archive_dir = teuth_config.archive_base
archive_dir = args.archive_dir or teuth_config.archive_base

# Refuse to start more than one dispatcher per machine type
procs = find_dispatcher_processes().get(tube)
procs = find_dispatcher_processes().get(args.tube)
if procs:
raise RuntimeError(
"There is already a teuthology-dispatcher process running:"
Expand All @@ -89,18 +77,18 @@ def main(args):

# setup logging for disoatcher in {log_dir}
loglevel = logging.INFO
if verbose:
if args.verbose:
loglevel = logging.DEBUG
logging.getLogger().setLevel(loglevel)
log.setLevel(loglevel)
log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}")
log_file_path = os.path.join(args.log_dir, f"dispatcher.{args.tube}.{os.getpid()}")
setup_log_file(log_file_path)
install_except_hook()

load_config(archive_dir=archive_dir)

connection = beanstalk.connect()
beanstalk.watch_tube(connection, tube)
beanstalk.watch_tube(connection, args.tube)
result_proc = None

if teuth_config.teuthology_path is None:
Expand Down Expand Up @@ -131,7 +119,7 @@ def main(args):
job_procs.remove(proc)
job = connection.reserve(timeout=60)
if job is None:
if exit_on_empty_queue and not job_procs:
if args.exit_on_empty_queue and not job_procs:
log.info("Queue is empty and no supervisor processes running; exiting!")
break
continue
Expand All @@ -148,7 +136,7 @@ def main(args):
keep_running = False

try:
job_config, teuth_bin_path = worker.prep_job(
job_config, teuth_bin_path = prep_job(
job_config,
log_file_path,
archive_dir,
Expand All @@ -161,8 +149,7 @@ def main(args):
job_config = lock_machines(job_config)

run_args = [
os.path.join(teuth_bin_path, 'teuthology-dispatcher'),
'--supervisor',
os.path.join(teuth_bin_path, 'teuthology-supervisor'),
'-v',
'--bin-path', teuth_bin_path,
'--archive-dir', archive_dir,
Expand Down Expand Up @@ -243,6 +230,82 @@ def match(proc):
return procs


def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']
safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
archive_dir, safe_archive, str(job_id))
job_config['archive_path'] = archive_path_full

# If the teuthology branch was not specified, default to main and
# store that value.
teuthology_branch = job_config.get('teuthology_branch', 'main')
job_config['teuthology_branch'] = teuthology_branch
teuthology_sha1 = job_config.get('teuthology_sha1')
if not teuthology_sha1:
repo_url = repo_utils.build_git_url('teuthology', 'ceph')
try:
teuthology_sha1 = repo_utils.ls_remote(repo_url, teuthology_branch)
except Exception as exc:
log.exception(f"Could not get teuthology sha1 for branch {teuthology_branch}")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()
if not teuthology_sha1:
reason = "Teuthology branch {} not found; marking job as dead".format(teuthology_branch)
log.error(reason)
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=reason)
)
raise SkipJob()
if teuth_config.teuthology_path is None:
log.info('Using teuthology sha1 %s', teuthology_sha1)

try:
if teuth_config.teuthology_path is not None:
teuth_path = teuth_config.teuthology_path
else:
teuth_path = repo_utils.fetch_teuthology(branch=teuthology_branch,
commit=teuthology_sha1)
# For the teuthology tasks, we look for suite_branch, and if we
# don't get that, we look for branch, and fall back to 'main'.
# last-in-suite jobs don't have suite_branch or branch set.
ceph_branch = job_config.get('branch', 'main')
suite_branch = job_config.get('suite_branch', ceph_branch)
suite_sha1 = job_config.get('suite_sha1')
suite_repo = job_config.get('suite_repo')
if suite_repo:
teuth_config.ceph_qa_suite_git_url = suite_repo
job_config['suite_path'] = os.path.normpath(os.path.join(
repo_utils.fetch_qa_suite(suite_branch, suite_sha1),
job_config.get('suite_relpath', ''),
))
except (BranchNotFoundError, CommitNotFoundError) as exc:
log.exception("Requested version not found; marking job as dead")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()
except MaxWhileTries as exc:
log.exception("Failed to fetch or bootstrap; marking job as dead")
report.try_push_job_info(
job_config,
dict(status='dead', failure_reason=str(exc))
)
raise SkipJob()

teuth_bin_path = os.path.join(teuth_path, 'virtualenv', 'bin')
if not os.path.isdir(teuth_bin_path):
raise RuntimeError("teuthology branch %s at %s not bootstrapped!" %
(teuthology_branch, teuth_bin_path))
return job_config, teuth_bin_path


def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
Expand Down
Loading
Loading