diff --git a/paddles/controllers/jobs.py b/paddles/controllers/jobs.py index 3d5cd73..8746f89 100644 --- a/paddles/controllers/jobs.py +++ b/paddles/controllers/jobs.py @@ -1,8 +1,7 @@ import logging -from sqlalchemy import Sequence from sqlalchemy.orm import load_only -from pecan import expose, abort, request, conf +from pecan import expose, abort, request from paddles import models from paddles.decorators import retryOperation @@ -126,45 +125,38 @@ def index_post(self): data = request.json if not data: raise ValueError() - config = dict(conf.sqlalchemy) - if 'sqlite' in config['url']: - # Need this check since Sequence is not supported in SQLite - job = Session.query(Job).order_by(Job.id.desc()).first() - if job: - job_id = job.id + 1 - else: - job_id = 1 - else: - job_id = Session.execute(Sequence('jobs_id_seq')) - job_id = str(job_id) - data['job_id'] = job_id - data['id'] = int(job_id) except ValueError: rollback() error('/errors/invalid/', 'could not decode JSON body') # we allow empty data to be pushed - if not job_id: - error('/errors/invalid/', "could not find required key: 'job_id'") self.run = self._find_run() if not self.run: self._create_run() - job_id = data['job_id'] = str(job_id) - - self._create_job(job_id, data) - return dict({'job_id':job_id}) + job = self._create_job(data) + return dict({'job_id':job.job_id}) @retryOperation - def _create_job(self, job_id, data): - query = Job.query.options(load_only('id', 'job_id')) - query = query.filter_by(job_id=job_id, run=self.run) - if query.first(): - error('/errors/invalid/', - "job with job_id %s already exists" % job_id) - else: - log.info("Creating job: %s/ Job ID: %s", data.get('name', ''), + def _create_job(self, data): + if "job_id" in data: + job_id = data['job_id'] + job_id = str(job_id) + query = Job.query.options(load_only('id', 'job_id')) + query = query.filter_by(job_id=job_id, run=self.run) + if query.first(): + error('/errors/invalid/', + "job with job_id %s already exists" % job_id) + else: + log.info("Creating job: %s/%s", data.get('name', ''), job_id) + self.job = Job(data, self.run) + Session.commit() + return self.job + else: + # with paddles as queue backend, we generate job ID here self.job = Job(data, self.run) + self.job.job_id = self.job.id Session.commit() + log.info("Job ID of created job is %s", self.job.job_id) return self.job @expose('json') diff --git a/paddles/controllers/queue.py b/paddles/controllers/queue.py index 56e75d9..0d41432 100644 --- a/paddles/controllers/queue.py +++ b/paddles/controllers/queue.py @@ -79,6 +79,9 @@ def index_put(self): def pop_queue(self, queue): queue_name = queue queue = Queue.filter_by(queue=queue_name).first() + if queue is None: + log.info("%s queue is empty! No jobs to retrieve", queue_name) + return None if queue.paused is True: error('/errors/unavailable', "queue is paused, cannot retrieve job") return diff --git a/paddles/models/jobs.py b/paddles/models/jobs.py index 12a38ff..054792a 100644 --- a/paddles/models/jobs.py +++ b/paddles/models/jobs.py @@ -139,8 +139,6 @@ class Job(Base): def __init__(self, json_data, run): self.run = run self.posted = datetime.utcnow() - self.id = json_data['id'] - self.job_id = json_data['job_id'] self.set_or_update(json_data) def set_or_update(self, json_data):