Skip to content

Commit

Permalink
scripts/queue: common teuthology-queue command for paddles and beanst…
Browse files Browse the repository at this point in the history
…alk queue

Signed-off-by: Aishwarya Mathuria <[email protected]>
  • Loading branch information
amathuria committed Oct 12, 2023
1 parent 6d7d317 commit 61958a0
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 113 deletions.
2 changes: 1 addition & 1 deletion scripts/beanstalk_queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import docopt

import teuthology.config
import teuthology.beanstalk
import teuthology.queue.beanstalk

doc = """
usage: teuthology-beanstalk-queue -h
Expand Down
3 changes: 1 addition & 2 deletions scripts/paddles_queue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import docopt

import teuthology.config
import teuthology.paddles_queue

import teuthology.queue.paddles_queue
doc = """
usage: teuthology-paddles-queue -h
teuthology-paddles-queue -s -m MACHINE_TYPE
Expand Down
37 changes: 37 additions & 0 deletions scripts/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import docopt

import teuthology.config
import teuthology.queue.beanstalk
import teuthology.queue.paddles

doc = """
usage: teuthology-queue -h
teuthology-queue [-s|-d|-f] -m MACHINE_TYPE
teuthology-queue [-r] -m MACHINE_TYPE
teuthology-queue -m MACHINE_TYPE -D PATTERN
teuthology-queue -p SECONDS [-m MACHINE_TYPE]
List Jobs in queue.
If -D is passed, then jobs with PATTERN in the job name are deleted from the
queue.
Arguments:
-m, --machine_type MACHINE_TYPE [default: multi]
Which machine type queue to work on.
optional arguments:
-h, --help Show this help message and exit
-D, --delete PATTERN Delete Jobs with PATTERN in their name
-d, --description Show job descriptions
-r, --runs Only show run names
-f, --full Print the entire job config. Use with caution.
-s, --status Prints the status of the queue
-p, --pause SECONDS Pause queues for a number of seconds. A value of 0
will unpause. If -m is passed, pause that queue,
otherwise pause all queues.
"""


def main():
args = docopt.docopt(doc)
teuthology.queue.main(args)
2 changes: 1 addition & 1 deletion teuthology/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class TeuthologyConfig(YamlConfig):
'archive_upload_key': None,
'archive_upload_url': None,
'automated_scheduling': False,
'backend': 'paddles',
'backend': 'beanstalk',
'reserve_machines': 5,
'ceph_git_base_url': 'https://github.com/ceph/',
'ceph_git_url': None,
Expand Down
2 changes: 1 addition & 1 deletion teuthology/dispatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
setup_log_file,
install_except_hook,
# modules
beanstalk,
exporter,
nuke,
report,
repo_utils,
worker,
)
from teuthology.queue import beanstalk
from teuthology.config import config as teuth_config
from teuthology.dispatcher import supervisor
from teuthology.exceptions import SkipJob
Expand Down
2 changes: 1 addition & 1 deletion teuthology/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import teuthology.exporter

from teuthology import beanstalk
from teuthology.queue import beanstalk
from teuthology import report
from teuthology.config import config
from teuthology import misc
Expand Down
106 changes: 106 additions & 0 deletions teuthology/queue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology import report
from teuthology.config import config

log = logging.getLogger(__name__)

def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()

def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()

class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
report.try_delete_jobs(job_name, job_id)


def main(args):
if config.backend == 'paddles':
paddles.main(args)
else:
beanstalk.main(args)
96 changes: 0 additions & 96 deletions teuthology/beanstalk.py → teuthology/queue/beanstalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,102 +61,6 @@ def callback(jobs_dict)
processor.complete()


def print_progress(index, total, message=None):
msg = "{m} ".format(m=message) if message else ''
sys.stderr.write("{msg}{i}/{total}\r".format(
msg=msg, i=index, total=total))
sys.stderr.flush()


def end_progress():
sys.stderr.write('\n')
sys.stderr.flush()


class JobProcessor(object):
def __init__(self):
self.jobs = OrderedDict()

def add_job(self, job_id, job_config, job_obj=None):
job_id = str(job_id)

job_dict = dict(
index=(len(self.jobs) + 1),
job_config=job_config,
)
if job_obj:
job_dict['job_obj'] = job_obj
self.jobs[job_id] = job_dict

self.process_job(job_id)

def process_job(self, job_id):
pass

def complete(self):
pass


class JobPrinter(JobProcessor):
def __init__(self, show_desc=False, full=False):
super(JobPrinter, self).__init__()
self.show_desc = show_desc
self.full = full

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_index = self.jobs[job_id]['index']
job_priority = job_config['priority']
job_name = job_config['name']
job_desc = job_config['description']
print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
i=job_index,
pri=job_priority,
job_id=job_id,
job_name=job_name,
))
if self.full:
pprint.pprint(job_config)
elif job_desc and self.show_desc:
for desc in job_desc.split():
print('\t {}'.format(desc))


class RunPrinter(JobProcessor):
def __init__(self):
super(RunPrinter, self).__init__()
self.runs = list()

def process_job(self, job_id):
run = self.jobs[job_id]['job_config']['name']
if run not in self.runs:
self.runs.append(run)
print(run)


class JobDeleter(JobProcessor):
def __init__(self, pattern):
self.pattern = pattern
super(JobDeleter, self).__init__()

def add_job(self, job_id, job_config, job_obj=None):
job_name = job_config['name']
if self.pattern in job_name:
super(JobDeleter, self).add_job(job_id, job_config, job_obj)

def process_job(self, job_id):
job_config = self.jobs[job_id]['job_config']
job_name = job_config['name']
print('Deleting {job_name}/{job_id}'.format(
job_id=job_id,
job_name=job_name,
))
job_obj = self.jobs[job_id].get('job_obj')
if job_obj:
job_obj.delete()
report.try_delete_jobs(job_name, job_id)


def pause_tube(connection, tube, duration):
duration = int(duration)
if not tube:
Expand Down
88 changes: 88 additions & 0 deletions teuthology/queue/paddles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
import pprint
import sys
from collections import OrderedDict

from teuthology import report
from teuthology.dispatcher import pause_queue


log = logging.getLogger(__name__)


def stats_queue(machine_type):
stats = report.get_queue_stats(machine_type)
if stats['paused'] is None:
log.info("%s queue is currently running with %s jobs queued",
stats['name'],
stats['count'])
else:
log.info("%s queue is paused with %s jobs queued",
stats['name'],
stats['count'])


def update_priority(machine_type, priority, user, run_name=None):
if run_name is not None:
jobs = report.get_user_jobs_queue(machine_type, user, run_name)
else:
jobs = report.get_user_jobs_queue(machine_type, user)
for job in jobs:
job['priority'] = priority
report.try_push_job_info(job)


def walk_jobs(machine_type, processor, user):
log.info("Checking paddles queue...")
job_count = report.get_queue_stats(machine_type)['count']

jobs = report.get_user_jobs_queue(machine_type, user)
if job_count == 0:
log.info('No jobs in queue')
return

for i in range(1, job_count + 1):
print_progress(i, job_count, "Loading")
job = jobs[i-1]
if job is None:
continue
job_id = job['job_id']
processor.add_job(job_id, job)
end_progress()
processor.complete()


def main(args):
machine_type = args['--machine_type']
#user = args['--user']
#run_name = args['--run_name']
#priority = args['--priority']
status = args['--status']
delete = args['--delete']
runs = args['--runs']
show_desc = args['--description']
full = args['--full']
pause_duration = args['--pause']
#unpause = args['--unpause']
#pause_duration = args['--time']
try:
if status:
stats_queue(machine_type)
if pause_duration:
pause_queue(machine_type, pause, user, pause_duration)
#else:
#pause_queue(machine_type, pause, user)
elif priority:
update_priority(machine_type, priority, run_name)
elif delete:
walk_jobs(machine_type,
JobDeleter(delete), user)
elif runs:
walk_jobs(machine_type,
RunPrinter(), user)
else:
walk_jobs(machine_type,
JobPrinter(show_desc=show_desc, full=full),
user)
except KeyboardInterrupt:
log.info("Interrupted.")
Loading

0 comments on commit 61958a0

Please sign in to comment.