Skip to content

Commit

Permalink
Merge pull request #94 from amathuria/wip-amathuria-removing-beanstalkd
Browse files Browse the repository at this point in the history
paddles: Adding a queueing mechanism to Paddles
  • Loading branch information
zmc authored Mar 4, 2024
2 parents 79e5058 + bcc6b4e commit 27576bd
Show file tree
Hide file tree
Showing 13 changed files with 613 additions and 178 deletions.
55 changes: 55 additions & 0 deletions alembic/versions/e8de4928657_modify_jobs_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""modify jobs table
Revision ID: e8de4928657
Revises: 266e6f3efd94
Create Date: 2021-06-28 13:45:32.717585
"""

# revision identifiers, used by Alembic.
revision = 'e8de4928657'
down_revision = '266e6f3efd94'

from alembic import op
from paddles.models.types import JSONType
from sqlalchemy.schema import Sequence, CreateSequence, DropSequence

import sqlalchemy as sa


def upgrade():
op.add_column(u'jobs', sa.Column('priority', sa.Integer(), nullable=True))
op.add_column(u'jobs', sa.Column('repo', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('seed', sa.Integer(), nullable=True))
op.add_column(u'jobs', sa.Column('sleep_before_teardown', sa.Integer(), nullable=True))
op.add_column(u'jobs', sa.Column('subset', sa.String(length=32), nullable=True))
op.add_column(u'jobs', sa.Column('suite', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('suite_path', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('suite_relpath', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('suite_repo', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('teuthology_branch', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('teuthology_sha1', sa.String(length=256), nullable=True))
op.add_column(u'jobs', sa.Column('timestamp', sa.DateTime(), nullable=True))
op.add_column(u'jobs', sa.Column('user', sa.String(length=64), nullable=True))
op.add_column(u'jobs', sa.Column('queue', sa.String(length=64), nullable=True))
op.create_index(op.f('ix_jobs_job_id'), 'jobs', ['job_id'], unique=False)
op.create_index(op.f('ix_jobs_teuthology_sha1'), 'jobs', ['teuthology_sha1'], unique=False)
op.execute(CreateSequence(Sequence('jobs_id_seq')))
op.drop_index('ix_jobs_archive_path', table_name='jobs')


def downgrade():
op.create_index('ix_jobs_archive_path', 'jobs', ['archive_path'], unique=False)
op.drop_index(op.f('ix_jobs_teuthology_sha1'), table_name='jobs')
op.drop_index(op.f('ix_jobs_job_id'), table_name='jobs')
op.execute(DropSequence(Sequence('jobs_id_seq')))
op.drop_column(u'jobs', 'user')
op.drop_column(u'jobs', 'timestamp')
op.drop_column(u'jobs', 'teuthology_sha1')
op.drop_column(u'jobs', 'suite_repo')
op.drop_column(u'jobs', 'suite_relpath')
op.drop_column(u'jobs', 'suite')
op.drop_column(u'jobs', 'sleep_before_teardown')
op.drop_column(u'jobs', 'repo')
op.drop_column(u'jobs', 'priority')

50 changes: 34 additions & 16 deletions paddles/controllers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,22 @@ def index_post(self):
'attempted to update a non-existent job'
)
old_job_status = self.job.status
self.job.update(request.json)
old_priority = self.job.priority
data = request.json
if 'priority' in data:
if data['priority'] != old_priority:
if self.job.status == "queued":
log.info("Job %s/%s priority changed from %s to %s", self.job.name,
self.job.job_id, old_priority, data['priority'])
else:
log.info("Job status %s. Priority cannot be changed", self.job.status)
data['priority'] = old_priority
self.job.update(data)
Session.commit()
if self.job.status != old_job_status:
log.info("Job %s/%s status changed from %s to %s", self.job.name,
self.job.job_id, old_job_status, self.job.status)

return dict()

@index.when(method='DELETE', template='json')
Expand Down Expand Up @@ -112,33 +123,40 @@ def index_post(self):
"""
try:
data = request.json
job_id = data.get('job_id')
if not data:
raise ValueError()
except ValueError:
rollback()
error('/errors/invalid/', 'could not decode JSON body')
# we allow empty data to be pushed
if not job_id:
error('/errors/invalid/', "could not find required key: 'job_id'")
self.run = self._find_run()
if not self.run: self._create_run()

job_id = data['job_id'] = str(job_id)

self._create_job(job_id, data)
return dict()
job = self._create_job(data)
return dict({'job_id':job.job_id})

@retryOperation
def _create_job(self, job_id, data):
query = Job.query.options(load_only('id', 'job_id'))
query = query.filter_by(job_id=job_id, run=self.run)
if query.first():
error('/errors/invalid/',
"job with job_id %s already exists" % job_id)
else:
log.info("Creating job: %s/%s", data.get('name', '<no name!>'),
def _create_job(self, data):
if "job_id" in data:
job_id = data['job_id']
job_id = str(job_id)
query = Job.query.options(load_only('id', 'job_id'))
query = query.filter_by(job_id=job_id, run=self.run)
if query.first():
error('/errors/invalid/',
"job with job_id %s already exists" % job_id)
else:
log.info("Creating job: %s/%s", data.get('name', '<no name!>'),
job_id)
self.job = Job(data, self.run)
Session.commit()
return self.job
else:
# with paddles as queue backend, we generate job ID here
self.job = Job(data, self.run)
self.job.job_id = self.job.id
Session.commit()
log.info("Job ID of created job is %s", self.job.job_id)
return self.job

@expose('json')
Expand Down
146 changes: 146 additions & 0 deletions paddles/controllers/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from pecan import expose, request
from paddles.controllers import error
from paddles.exceptions import PaddlesError
from paddles.models import Queue, Job, Run, Session

import logging
log = logging.getLogger(__name__)


class QueuesController(object):
@expose(generic=True, template='json')
def index(self, machine_type='', paused_by=None):
query = Queue.query
if machine_type:
if '|' in machine_type:
query = query.filter(Queue.queue == 'multi')
else:
query = query.filter(Queue.queue == machine_type)
if paused_by:
query = query.filter(Queue.paused_by == paused_by)
return [queue.__json__() for queue in query.all()]

@index.when(method='POST', template='json')
def index_post(self):
"""
Create a new Queue
"""
try:
data = request.json
queue_name = data.get('queue')
except ValueError:
error('/errors/invalid/', 'could not decode JSON body')
if not queue_name:
error('/errors/invalid/', "could not find required key: 'queue'")

if Queue.filter_by(queue=queue_name).first():
error('/errors/invalid/',
"Queue %s already exists" % queue_name)
else:
self.queue = Queue(queue=queue_name)
try:
self.queue.update(data)
except PaddlesError as exc:
error(exc.url, str(exc))
log.info("Created {queue}: {data}".format(
queue=self.queue,
data=data,
))
return dict()

@index.when(method='PUT', template='json')
def index_put(self):
"""
Update the Queue
"""
try:
data = request.json
queue_name = data.get('queue')
except ValueError:
error('/errors/invalid', 'could not decode JSON body')
if not queue_name:
error('/errors/invalid/', "could not find required key: 'queue'")
queue = Queue.filter_by(queue=queue_name).first()
if queue:
self.queue = queue
try:
self.queue.update(data)
except PaddlesError as exc:
error(exc.url, str(exc))
log.info("Updated {queue}: {data}".format(
queue=self.queue,
data=data,
))
else:
error('/errors/invalid', "specified queue does not exist")
return dict()

@expose(template='json')
def pop_queue(self, queue):
queue_name = queue
queue = Queue.filter_by(queue=queue_name).first()
if queue is None:
log.info("%s queue is empty! No jobs to retrieve", queue_name)
return None
if queue.paused is True:
error('/errors/unavailable', "queue is paused, cannot retrieve job")
return
job_query = Job.filter_by(status='queued').filter_by(queue=queue_name)
job = job_query.order_by(Job.priority).first()
return job

@expose(template='json')
def stats(self, queue):
queue_name = queue
if not queue_name:
error('/errors/invalid/', "could not find required key: 'queue'")
queue = Queue.filter_by(queue=queue_name).first()
if queue:
stats = Job.filter_by(queue=queue_name).\
filter_by(status='queued').\
all()
current_jobs_ready = len(stats)

if queue.__json__()['paused'] is False:
return dict(
queue=queue_name,
queued_jobs=current_jobs_ready,
paused=queue.__json__()['paused']
)
else:
paused_stats = queue.__json__()
paused_stats.update(queued_jobs=current_jobs_ready)
return paused_stats
else:
error('/errors/invalid', "specified queue does not exist")


@expose(template='json')
def queued_jobs(self, user=None, run_name=None):
"""
Retrieve all the queued jobs for a particular user or a particular run
"""
try:
data = request.json
queue_name = data.get('queue')
except ValueError:
error('/errors/invalid', 'could not decode JSON body')
if not queue_name:
error('/errors/invalid/', "could not find required key: 'queue'")
queue = Queue.filter_by(queue=queue_name).first()
if queue:
if run_name:
jobs = Session.query(Job).\
filter(Job.status == 'queued').\
filter(Run.id == Job.run_id).\
filter(Run.name == run_name)
elif user:
jobs = Job.filter_by(queue=queue_name).\
filter_by(status='queued').\
filter_by(user=user)
else:
jobs = Job.filter_by(queue=queue_name).\
filter_by(status='queued')
return [job.__json__() for job in jobs.all()]
else:
error('/errors/invalid', "specified queue does not exist")
3 changes: 2 additions & 1 deletion paddles/controllers/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from paddles.controllers.runs import RunsController
from paddles.controllers.nodes import NodesController
from paddles.controllers.errors import ErrorsController

from paddles.controllers.queue import QueuesController

class RootController(object):

Expand All @@ -18,3 +18,4 @@ def index(self):
runs = RunsController()
errors = ErrorsController()
nodes = NodesController()
queue = QueuesController()
18 changes: 11 additions & 7 deletions paddles/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.orm import scoped_session, sessionmaker, object_session, mapper
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.interfaces import PoolListener
from sqlalchemy.exc import InvalidRequestError, OperationalError
from sqlalchemy.pool import Pool
from pecan import conf

from paddles.controllers import error
Expand Down Expand Up @@ -76,17 +76,20 @@ def init_model():
"""
conf.sqlalchemy.engine = _engine_from_config(conf.sqlalchemy)
config = dict(conf.sqlalchemy)
if 'sqlite' in config['url']:
event.listen(Pool, 'connect', sqlite_connect, named=True)


def sqlite_connect(**kw):
dbapi_con = kw['dbapi_connection']
dbapi_con.execute('PRAGMA journal_mode=MEMORY')
dbapi_con.execute('PRAGMA synchronous=OFF')

class SqliteListener(PoolListener):
def connect(self, dbapi_con, con_record):
dbapi_con.execute('PRAGMA journal_mode=MEMORY')
dbapi_con.execute('PRAGMA synchronous=OFF')

def _engine_from_config(configuration):
configuration = dict(configuration)
url = configuration.pop('url')
if 'sqlite' in url:
configuration['listeners'] = [SqliteListener()]
return create_engine(url, **configuration)


Expand Down Expand Up @@ -129,3 +132,4 @@ def flush():
from .runs import Run # noqa
from .jobs import Job # noqa
from .nodes import Node # noqa
from .queue import Queue # noqa
Loading

0 comments on commit 27576bd

Please sign in to comment.