Skip to content

Commit

Permalink
[ADD] queue_job_cron_jobrunner
Browse files Browse the repository at this point in the history
  • Loading branch information
ivantodorovich committed Mar 21, 2022
1 parent 5fe8275 commit d0b96a7
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 0 deletions.
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TO BE GENERATED AUTOMATICALLY
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import models
12 changes: 12 additions & 0 deletions queue_job_cron_jobrunner/__manifest__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "Queue Job Cron Jobrunner",
"summary": "Run jobs without a dedicated JobRunner",
"version": "15.0.1.0.0",
"author": "Camptocamp SA, Odoo Community Association (OCA)",
"maintainers": ["ivantodorovich"],
"website": "https://github.com/OCA/queue",
"license": "AGPL-3",
"category": "Others",
"depends": ["queue_job"],
"data": ["data/ir_cron.xml"],
}
15 changes: 15 additions & 0 deletions queue_job_cron_jobrunner/data/ir_cron.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8" ?>
<odoo noupdate="1">

<record id="queue_job_cron" model="ir.cron">
<field name="name">Queue Job Runner</field>
<field name="model_id" ref="queue_job.model_queue_job" />
<field name="state">code</field>
<field name="code">model._job_runner()</field>
<field name="user_id" ref="base.user_root" />
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="numbercall">-1</field>
</record>

</odoo>
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import queue_job
173 changes: 173 additions & 0 deletions queue_job_cron_jobrunner/models/queue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
# @author Iván Todorovich <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import logging
import traceback
from io import StringIO

from psycopg2 import OperationalError

from odoo import _, api, models, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from odoo.addons.queue_job.controllers.main import PG_RETRY
from odoo.addons.queue_job.exception import (
FailedJobError,
NothingToDoJob,
RetryableJobError,
)
from odoo.addons.queue_job.job import Job

_logger = logging.getLogger(__name__)


class QueueJob(models.Model):
_inherit = "queue.job"

@api.model
def _acquire_one_job(self):
"""Acquire the next job to be run.
:returns: queue.job record (locked for update)
"""
# TODO: This method should respect channel priority and capacity,
# rather than just fetching them by creation date.
self.flush()
self.env.cr.execute(
"""
SELECT *
FROM queue_job
WHERE state = 'pending'
AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC'))
ORDER BY date_created DESC
LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED
"""
)
row = self.env.cr.dictfetchone()
return self.browse(row and row["id"])

def _process(self, commit=False):
"""Process the job"""
self.ensure_one()
job = Job._load_from_db_record(self)
# Set it as started
job.set_started()
job.store()
_logger.debug("%s started", job.uuid)
# TODO: Commit the state change so that the state can be read from the UI
# while the job is processing. However, doing this will release the
# lock on the db, so we need to find another way.
# if commit:
# self.flush()
# self.env.cr.commit()

# Actual processing
try:
try:
with self.env.cr.savepoint():
job.perform()
job.set_done()
job.store()
except OperationalError as err:
# Automatically retry the typical transaction serialization errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
message = tools.ustr(err.pgerror, errors="replace")
job.postpone(result=message, seconds=PG_RETRY)
job.set_pending(reset_retry=False)
job.store()
_logger.debug("%s OperationalError, postponed", job)

except NothingToDoJob as err:
if str(err):
msg = str(err)
else:
msg = _("Job interrupted and set to Done: nothing to do.")
job.set_done(msg)
job.store()

except RetryableJobError as err:
# delay the job later, requeue
job.postpone(result=str(err), seconds=5)
job.set_pending(reset_retry=False)
job.store()
_logger.debug("%s postponed", job)

except (FailedJobError, Exception):
buff = StringIO()
traceback.print_exc(file=buff)
_logger.error(buff.getvalue())
job.set_failed(exc_info=buff.getvalue())
job.store()

if commit: # pragma: no cover
self.env["base"].flush()
self.env.cr.commit() # pylint: disable=invalid-commit

@api.model
def _job_runner(self, commit=True):
"""Short-lived job runner, triggered by async crons"""
job = self._acquire_one_job()
while job:
job._process(commit=commit)
job = self._acquire_one_job()
# TODO: If limit_time_real_cron is reached before all the jobs are done,
# the worker will be killed abruptly.
# Ideally, find a way to know if we're close to reaching this limit,
# stop processing, and trigger a new execution to continue.
#
# if job and limit_time_real_cron_reached_or_about_to_reach:
# self._cron_trigger()
# break

@api.model
def _cron_trigger(self, at=None):
"""Trigger the cron job runners
Odoo will prevent concurrent cron jobs from running.
So, to eventually support parallel execution, we'd need to have (at least) the
same number of ir.crons records as cron workers.
All crons should be triggered at the same time.
"""
crons = (
self.env["ir.cron"]
.sudo()
.search(
[
("model_id.model", "=", "queue.job"),
("code", "=", "model._job_runner()"),
]
)
)
for cron in crons:
cron._trigger(at=at)

def _ensure_cron_trigger(self):
"""Create cron triggers for these jobs"""
records = self.filtered(lambda r: r.state == "pending")
if not records:
return
# Trigger immediate runs
immediate = any(not rec.eta for rec in records)
if immediate:
self._cron_trigger()
# Trigger delayed eta runs
delayed_etas = {rec.eta for rec in records if rec.eta}
if delayed_etas:
self._cron_trigger(at=list(delayed_etas))

@api.model_create_multi
def create(self, vals_list):
# When jobs are created, also create the cron trigger
records = super().create(vals_list)
records._ensure_cron_trigger()
return records

def write(self, vals):
# When a job state or eta changes, make sure a cron trigger is created
res = super().write(vals)
if "state" in vals or "eta" in vals:
self._ensure_cron_trigger()
return res
21 changes: 21 additions & 0 deletions queue_job_cron_jobrunner/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.. warning::

Don't use this module if you're already running the regular ``queue_job`` runner.


For the easiest case, no configuration is required besides installing the module.

To avoid CronWorker CPU timeout from abruptly stopping the job processing cron, it's
recommended to launch Odoo with ``--limit-time-real-cron=0``, to disable the CronWorker
timeout altogether.

.. note::

In Odoo.sh, this is done by default.


Parallel execution of jobs can be achieved by leveraging multiple ``ir.cron`` records:

* Make sure you have enough CronWorkers available (Odoo CLI ``--max-cron-threads``)
* Duplicate the ``queue_job_cron`` cron record as many times as needed, until you have
as much records as cron workers.
3 changes: 3 additions & 0 deletions queue_job_cron_jobrunner/readme/CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* `Camptocamp <https://www.camptocamp.com>`_

* Iván Todorovich <[email protected]>
14 changes: 14 additions & 0 deletions queue_job_cron_jobrunner/readme/DESCRIPTION.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
This module implements a simple ``queue.job`` runner using ``ir.cron`` triggers.

It's meant to be used on environments where the regular job runner can't be run, like
on Odoo.sh.

Unlike the regular job runner, where jobs are dispatched to the HttpWorkers, jobs are
processed on the CronWorker threads by the job runner crons. This is a design decision
because:

* Odoo.sh puts HttpWorkers to sleep when there's no network activity
* HttpWorkers are meant for traffic. Users shouldn't pay the price of background tasks.

For now, it only implements the most basic features of the ``queue_job`` runner, notably
no channel capacity nor priorities. Please check the ROADMAP for further details.
3 changes: 3 additions & 0 deletions queue_job_cron_jobrunner/readme/ROADMAP.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* Support channel capacity and priority. (See ``_acquire_one_job``)
* Gracefully handle CronWorker CPU timeouts. (See ``_job_runner``)
* Commit transaction after job state updated to started. (See ``_process``)
1 change: 1 addition & 0 deletions queue_job_cron_jobrunner/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import test_queue_job
54 changes: 54 additions & 0 deletions queue_job_cron_jobrunner/tests/test_queue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2022 Camptocamp SA (https://www.camptocamp.com).
# @author Iván Todorovich <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from datetime import timedelta

from freezegun import freeze_time

from odoo import fields
from odoo.tests.common import TransactionCase
from odoo.tools import mute_logger


class TestQueueJob(TransactionCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True))
cls.cron = cls.env.ref("queue_job_cron_jobrunner.queue_job_cron")
# Cleanup triggers just in case
cls.env["ir.cron.trigger"].search([]).unlink()

def assertTriggerAt(self, at, message=None):
"""Ensures a cron trigger is created at the given time"""
return self.assertTrue(
self.env["ir.cron.trigger"].search([("call_at", "=", at)]),
message,
)

@freeze_time("2022-02-22 22:22:22")
def test_queue_job_cron_trigger(self):
"""Test that ir.cron triggers are created for every queue.job"""
job = self.env["res.partner"].with_delay().create({"name": "test"})
job_record = job.db_record()
self.assertTriggerAt(fields.Datetime.now(), "Trigger should've been created")
job_record.eta = fields.Datetime.now() + timedelta(hours=1)
self.assertTriggerAt(job_record.eta, "A new trigger should've been created")

@mute_logger("odoo.addons.queue_job_cron_jobrunner.models.queue_job")
def test_queue_job_process(self):
"""Test that jobs are processed by the queue job cron"""
# Create some jobs
job1 = self.env["res.partner"].with_delay().create({"name": "test"})
job1_record = job1.db_record()
job2 = self.env["res.partner"].with_delay().create(False)
job2_record = job2.db_record()
job3 = self.env["res.partner"].with_delay(eta=3600).create({"name": "Test"})
job3_record = job3.db_record()
# Run the job processing cron
self.env["queue.job"]._job_runner(commit=False)
# Check that the jobs were processed
self.assertEqual(job1_record.state, "done", "Processed OK")
self.assertEqual(job2_record.state, "failed", "Has errors")
self.assertEqual(job3_record.state, "pending", "Still pending, because of eta")
6 changes: 6 additions & 0 deletions setup/queue_job_cron_jobrunner/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import setuptools

setuptools.setup(
setup_requires=['setuptools-odoo'],
odoo_addon=True,
)

0 comments on commit d0b96a7

Please sign in to comment.