Skip to content

Commit

Permalink
Add job expiration dates
Browse files Browse the repository at this point in the history
This feature has two parts:
* Specifying expiration dates when scheduling test runs
* A global maximum age

Expiration dates are provided by passing `--expire` to `teuthology-suite` with
a relative value like `1d` (one day), `1w` (one week), or an absolute value like
`1999-12-31_23:59:59`.

A new configuration item, `max_job_age`, is specified in seconds. This defaults
to two weeks.

When the dispatcher checks the queue for the next job to run, it will first
compare the job's `timestamp` value - which reflects the time the job was
scheduled. If more than `max_job_age` seconds have passed, the job is skipped
and marked dead. It next checks for an `expire` value; if that value is in the
past, the job is skipped and marked dead. Otherwise, it will be run as usual.

Signed-off-by: Zack Cerza <[email protected]>
  • Loading branch information
zmc committed Jul 31, 2024
1 parent c62fbdc commit 339c82e
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 3 deletions.
4 changes: 4 additions & 0 deletions docs/siteconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ Here is a sample configuration with many of the options set and documented::
# processes
watchdog_interval: 120

# How old a scheduled job can be, in seconds, before the dispatcher
# considers it 'expired', skipping it.
max_job_age: 1209600

# How long a scheduled job should be allowed to run, in seconds, before
# it is killed by the supervisor process.
max_job_time: 259200
Expand Down
2 changes: 2 additions & 0 deletions scripts/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
When tests finish or time out, send an email
here. May also be specified in ~/.teuthology.yaml
as 'results_email'
--expire <datetime> Do not execute jobs in the run if they have not
completed by this time
--rocketchat <rocketchat> Comma separated list of Rocket.Chat channels where
to send a message when tests finished or time out.
To be used with --sleep-before-teardown option.
Expand Down
1 change: 1 addition & 0 deletions teuthology/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class TeuthologyConfig(YamlConfig):
'job_threshold': 500,
'lab_domain': 'front.sepia.ceph.com',
'lock_server': 'http://paddles.front.sepia.ceph.com/',
'max_job_age': 1209600, # 2 weeks
'max_job_time': 259200, # 3 days
'nsupdate_url': 'http://nsupdate.front.sepia.ceph.com/update',
'results_server': 'http://paddles.front.sepia.ceph.com/',
Expand Down
28 changes: 28 additions & 0 deletions teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from teuthology.dispatcher import supervisor
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
from teuthology.util.time import parse_timestamp
from teuthology import safepath

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -232,6 +233,8 @@ def match(proc):

def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']
check_job_expiration(job_config)

safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
Expand Down Expand Up @@ -306,6 +309,31 @@ def prep_job(job_config, log_file_path, archive_dir):
return job_config, teuth_bin_path


def check_job_expiration(job_config):
job_id = job_config['job_id']
expired = False
now = datetime.datetime.now(datetime.timezone.utc)
if expire_str := job_config.get('timestamp'):
expire = parse_timestamp(expire_str) + \
datetime.timedelta(seconds=teuth_config.max_job_age)
expired = expire < now
if not expired and (expire_str := job_config.get('expire')):
try:
expire = parse_timestamp(expire_str)
expired = expired or expire < now
except ValueError:
log.warning(f"Failed to parse job expiration: {expire_str=}")
pass
if expired:
log.info(f"Skipping job {job_id} because it is expired: {expire_str} is in the past")
report.try_push_job_info(
job_config,
# TODO: Add a 'canceled' status to paddles, and use that.
dict(status='dead'),
)
raise SkipJob()


def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
Expand Down
28 changes: 28 additions & 0 deletions teuthology/dispatcher/test/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from teuthology import dispatcher
from teuthology.config import FakeNamespace
from teuthology.contextutil import MaxWhileTries
from teuthology.util.time import TIMESTAMP_FMT


class TestDispatcher(object):
Expand Down Expand Up @@ -172,3 +173,30 @@ def test_main_loop_13925(
for i in range(len(jobs)):
push_call = m_try_push_job_info.call_args_list[i]
assert push_call[0][1]['status'] == 'dead'

@pytest.mark.parametrize(
["timestamp", "expire", "skip"],
[
[datetime.timedelta(days=-1), None, False],
[datetime.timedelta(days=-30), None, True],
[None, datetime.timedelta(days=1), False],
[None, datetime.timedelta(days=-1), True],
[datetime.timedelta(days=-1), datetime.timedelta(days=1), False],
[datetime.timedelta(days=1), datetime.timedelta(days=-1), True],
]
)
def test_check_job_expiration(self, timestamp, expire, skip):
now = datetime.datetime.now(datetime.timezone.utc)
job_config = dict(
job_id="1",
name="job_name",
)
if timestamp:
job_config["timestamp"] = (now + timestamp).strftime(TIMESTAMP_FMT)
if expire:
job_config["expire"] = (now + expire).strftime(TIMESTAMP_FMT)
if skip:
with pytest.raises(dispatcher.SkipJob):
dispatcher.check_job_expiration(job_config)
else:
dispatcher.check_job_expiration(job_config)
3 changes: 3 additions & 0 deletions teuthology/suite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def process_args(args):
elif key == 'subset' and value is not None:
# take input string '2/3' and turn into (2, 3)
value = tuple(map(int, value.split('/')))
elif key == 'expire' and value is None:
# Skip empty 'expire' values
continue
elif key in ('filter_all', 'filter_in', 'filter_out', 'rerun_statuses'):
if not value:
value = []
Expand Down
34 changes: 32 additions & 2 deletions teuthology/suite/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import datetime
import logging
import os
import pwd
Expand All @@ -8,7 +9,6 @@

from humanfriendly import format_timespan

from datetime import datetime
from tempfile import NamedTemporaryFile
from teuthology import repo_utils

Expand All @@ -24,7 +24,7 @@
from teuthology.suite.merge import config_merge
from teuthology.suite.build_matrix import build_matrix
from teuthology.suite.placeholder import substitute_placeholders, dict_templ
from teuthology.util.time import TIMESTAMP_FMT
from teuthology.util.time import parse_offset, parse_timestamp, TIMESTAMP_FMT

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -87,6 +87,15 @@ def create_initial_config(self):
:returns: A JobConfig object
"""
now = datetime.datetime.now(datetime.UTC)
expires = self.get_expiration()
if expires:
if now > expires:
util.schedule_fail(
f"Refusing to schedule because the expiration date is in the past: {self.args.expire}",
dry_run=self.args.dry_run,
)

self.os = self.choose_os()
self.kernel_dict = self.choose_kernel()
ceph_hash = self.choose_ceph_hash()
Expand Down Expand Up @@ -124,8 +133,29 @@ def create_initial_config(self):
suite_relpath=self.args.suite_relpath,
flavor=self.args.flavor,
)
if expires:
self.config_input["expires"] = expires.strftime(TIMESTAMP_FMT)
return self.build_base_config()

def get_expiration(self, _base_time: datetime.datetime | None = None) -> datetime.datetime | None:
"""
_base_time: For testing, calculate relative offsets from this base time
:returns: True if the job should run; False if it has expired
"""
log.info(f"Checking for expiration ({self.args.expire})")
expires_str = self.args.expire
if expires_str is None:
return None
now = datetime.datetime.now(datetime.UTC)
if _base_time is None:
_base_time = now
try:
expires = parse_timestamp(expires_str)
except ValueError:
expires = _base_time + parse_offset(expires_str)
return expires

def choose_os(self):
os_type = self.args.distro
os_version = self.args.distro_version
Expand Down
35 changes: 34 additions & 1 deletion teuthology/suite/test/test_run_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import contextlib
import yaml

from datetime import datetime
from datetime import datetime, timedelta, UTC
from mock import patch, call, ANY
from io import StringIO
from io import BytesIO
Expand Down Expand Up @@ -90,6 +90,39 @@ def test_branch_nonexistent(
with pytest.raises(ScheduleFailError):
self.klass(self.args)

@pytest.mark.parametrize(
["expire", "delta", "result"],
[
["1m", timedelta(), True],
["1m", timedelta(minutes=-2), False],
["1m", timedelta(minutes=2), True],
["7d", timedelta(days=-14), False],
]
)
@patch('teuthology.repo_utils.fetch_repo')
@patch('teuthology.suite.run.util.git_branch_exists')
@patch('teuthology.suite.run.util.package_version_for_hash')
@patch('teuthology.suite.run.util.git_ls_remote')
def test_expiration(
self,
m_git_ls_remote,
m_package_version_for_hash,
m_git_branch_exists,
m_fetch_repo,
expire,
delta,
result,
):
m_git_ls_remote.side_effect = 'hash'
m_package_version_for_hash.return_value = 'a_version'
m_git_branch_exists.return_value = True
self.args.expire = expire
obj = self.klass(self.args)
now = datetime.now(UTC)
expires_result = obj.get_expiration(_base_time=datetime.now(UTC) + delta)
assert expires_result is not None
assert (now < expires_result) is result

@patch('teuthology.suite.run.util.fetch_repos')
@patch('requests.head')
@patch('teuthology.suite.run.util.git_branch_exists')
Expand Down

0 comments on commit 339c82e

Please sign in to comment.