Skip to content

Commit

Permalink
[IMP] Multi-node high-availability jobrunner
Browse files Browse the repository at this point in the history
Add support for mulit-node / HA deployment as well as stable/safe deployment
for odoo.sh
  • Loading branch information
PCatinean committed Dec 19, 2023
1 parent 8dca914 commit 90ce526
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 10 deletions.
25 changes: 25 additions & 0 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ Configuration
.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.
* Deploying in high availability mode or odoo.sh:

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

.. code-block:: ini
(...)
[queue_job]
high_availability = 1
> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment

Usage
=====

Expand Down Expand Up @@ -571,6 +588,12 @@ Known issues / Roadmap
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:
* When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.
.. code-block:: sql
update queue_job set state='pending' where state in ('started', 'enqueued')
Expand Down Expand Up @@ -631,6 +654,8 @@ Contributors
* Souheil Bejaoui <souheil.bejaoui@acsone.eu>
* Eric Antones <eantones@nuobit.com>
* Simone Orsi <simone.orsi@camptocamp.com>
* Paul Catinean <pca@pledra.com>
* Ruchir Shukla <ruchir@bizzappdev.com>
Maintainers
~~~~~~~~~~~
Expand Down
98 changes: 88 additions & 10 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import selectors
import threading
import time
import uuid
from contextlib import closing, contextmanager

import psycopg2
Expand All @@ -159,6 +160,8 @@

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
LEADER_CHECK_DELAY = 10
IDLE_TRANSACTION_TIMEOUT = 60 # idle_in_transaction_session_timeout in seconds

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,7 +195,7 @@ def _odoo_now():
return _datetime_to_epoch(dt)


def _connection_info_for(db_name):
def _connection_info_for(db_name, jobrunner_ha_uuid=None):
db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name)

for p in ("host", "port", "user", "password"):
Expand All @@ -202,6 +205,8 @@ def _connection_info_for(db_name):

if cfg:
connection_info[p] = cfg
if jobrunner_ha_uuid:
connection_info["application_name"] = "jobrunner_%s" % jobrunner_ha_uuid

Check warning on line 209 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L209

Added line #L209 was not covered by tests

return connection_info

Expand Down Expand Up @@ -260,14 +265,13 @@ def urlopen():


class Database(object):
def __init__(self, db_name):
def __init__(self, db_name, jobrunner_ha_uuid=None):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
self.jobrunner_ha_uuid = jobrunner_ha_uuid
connection_info = _connection_info_for(db_name, self.jobrunner_ha_uuid)

Check warning on line 271 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L270-L271

Added lines #L270 - L271 were not covered by tests
self.conn = psycopg2.connect(**connection_info)
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
if self.has_queue_job:
self._initialize()

def close(self):
# pylint: disable=except-pass
Expand All @@ -280,6 +284,41 @@ def close(self):
pass
self.conn = None

def _check_leader(self, jobrunner_db_names):
"""Check if the linked jobrunner is the leader of all jobrunner_db_names"""
if not self.jobrunner_ha_uuid:
return False

Check warning on line 290 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L290

Added line #L290 was not covered by tests

with closing(self.conn.cursor()) as cr:
cr.execute(

Check warning on line 293 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L293

Added line #L293 was not covered by tests
"""
SELECT substring(application_name FROM 'jobrunner_(.*)')
FROM pg_stat_activity
WHERE application_name LIKE 'jobrunner_%%' AND
datname IN %s
ORDER BY backend_start, datname
LIMIT 1;""",
(jobrunner_db_names,),
)
res = cr.fetchone()
leader_uuid = res[0] if res else ""

Check warning on line 304 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L303-L304

Added lines #L303 - L304 were not covered by tests
if leader_uuid != self.jobrunner_ha_uuid:
_logger.debug(

Check warning on line 306 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L306

Added line #L306 was not covered by tests
"jobrunner %s: not leader of db(s) [ %s ]. leader: %s. sleeping %s sec.",
self.jobrunner_ha_uuid,
", ".join(jobrunner_db_names),
leader_uuid,
LEADER_CHECK_DELAY,
)
return False

Check warning on line 313 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L313

Added line #L313 was not covered by tests

_logger.info(

Check warning on line 315 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L315

Added line #L315 was not covered by tests
"jobrunner %s is now the leader of db(s) [ %s ]",
self.jobrunner_ha_uuid,
", ".join(jobrunner_db_names),
)
return True

Check warning on line 320 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L320

Added line #L320 was not covered by tests

def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
Expand Down Expand Up @@ -311,6 +350,11 @@ def _has_queue_job(self):

def _initialize(self):
with closing(self.conn.cursor()) as cr:
if self.jobrunner_ha_uuid:
cr.execute(

Check warning on line 354 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L354

Added line #L354 was not covered by tests
"SET idle_in_transaction_session_timeout TO %s",
((IDLE_TRANSACTION_TIMEOUT * 1000,)),
)
cr.execute("LISTEN queue_job")

@contextmanager
Expand Down Expand Up @@ -353,6 +397,7 @@ def __init__(
user=None,
password=None,
channel_config_string=None,
high_availability=None,
):
self.scheme = scheme
self.host = host
Expand All @@ -363,6 +408,10 @@ def __init__(
if channel_config_string is None:
channel_config_string = _channels()
self.channel_manager.simple_configure(channel_config_string)
self.uuid = False
if high_availability:
self.uuid = str(uuid.uuid4())
_logger.info("jobrunner %s initialized in HA mode" % self.uuid)

Check warning on line 414 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L413-L414

Added lines #L413 - L414 were not covered by tests
self.db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()
Expand All @@ -388,12 +437,18 @@ def from_environ_or_config(cls):
password = os.environ.get(
"ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD"
) or queue_job_config.get("http_auth_password")
if "ODOO_QUEUE_JOB_HIGH_AVAILABILITY" in os.environ:
high_availability = str(os.environ["ODOO_QUEUE_JOB_HIGH_AVAILABILITY"])

Check warning on line 441 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L441

Added line #L441 was not covered by tests
else:
high_availability = str(queue_job_config.get("high_availability"))
high_availability = high_availability.lower() in ("true", "1", "t")

Check warning on line 444 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L443-L444

Added lines #L443 - L444 were not covered by tests
runner = cls(
scheme=scheme or "http",
host=host or "localhost",
port=port or 8069,
user=user,
password=password,
high_availability=high_availability,
)
return runner

Expand Down Expand Up @@ -421,15 +476,20 @@ def close_databases(self, remove_jobs=True):
_logger.warning("error closing database %s", db_name, exc_info=True)
self.db_by_name = {}

def initialize_runner(self):
"""Listen for db notifications and load existing jobs into memory"""
for db in self.db_by_name.values():
db._initialize()

Check warning on line 482 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L482

Added line #L482 was not covered by tests
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db.db_name, *job_data)
_logger.info("queue job runner ready for db %s", db.db_name)

Check warning on line 486 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L485-L486

Added lines #L485 - L486 were not covered by tests

def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
db = Database(db_name, self.uuid)

Check warning on line 490 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L490

Added line #L490 was not covered by tests
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def run_jobs(self):
now = _odoo_now()
Expand Down Expand Up @@ -511,6 +571,17 @@ def stop(self):
# wakeup the select() in wait_notification
os.write(self._stop_pipe[1], b".")

def check_db_leader(self):
"""Check if the current jobrunner is the leader for all configured databases"""
jobrunner_db_names = tuple(self.db_by_name.keys())

Check warning on line 576 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L576

Added line #L576 was not covered by tests

if not jobrunner_db_names:
return False

Check warning on line 579 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L579

Added line #L579 was not covered by tests

# Use the first db connection to for leadership check
db_obj = self.db_by_name[jobrunner_db_names[0]]
return db_obj._check_leader(jobrunner_db_names)

Check warning on line 583 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L582-L583

Added lines #L582 - L583 were not covered by tests

def run(self):
_logger.info("starting")
while not self._stop:
Expand All @@ -520,6 +591,13 @@ def run(self):
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
while not self._stop and self.uuid:
leader = self.check_db_leader()

Check warning on line 595 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L595

Added line #L595 was not covered by tests
if leader:
break
time.sleep(LEADER_CHECK_DELAY)
continue
self.initialize_runner()

Check warning on line 600 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L597-L600

Added lines #L597 - L600 were not covered by tests
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
Expand Down
17 changes: 17 additions & 0 deletions queue_job/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,20 @@

.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.
* Deploying in high availability mode or odoo.sh:

When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:

.. code-block:: ini
(...)
[queue_job]
high_availability = 1
> :warning: **Warning:** Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment
2 changes: 2 additions & 0 deletions queue_job/readme/CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
* Souheil Bejaoui <[email protected]>
* Eric Antones <[email protected]>
* Simone Orsi <[email protected]>
* Paul Catinean <[email protected]>
* Ruchir Shukla <[email protected]>
6 changes: 6 additions & 0 deletions queue_job/readme/ROADMAP.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

* When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.

.. code-block:: sql
update queue_job set state='pending' where state in ('started', 'enqueued')
21 changes: 21 additions & 0 deletions queue_job/static/description/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ <h1><a class="toc-backref" href="#toc-entry-2">Configuration</a></h1>
of running Odoo is obviously not for production purposes.</td></tr>
</tbody>
</table>
<ul class="simple">
<li>Deploying in high availability mode or odoo.sh:</li>
</ul>
<p>When deploying queue_job on multiple nodes or on odoo.sh, on top of the configuration
parameters mentioned above you need to also set the env variable
ODOO_QUEUE_JOB_HIGH_AVAILABILITY=1 or via config parameter as such:</p>
<pre class="code ini literal-block">
<span class="na">(...)</span><span class="w">
</span><span class="k">[queue_job]</span><span class="w">
</span><span class="na">high_availability</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">1</span>
</pre>
<p>&gt; :warning: <strong>Warning:</strong> Failure to enable the high_availability flag on odoo.sh could
constitute a breach of Acceptable Use Policy. Always enable this flag via the odoo.conf file for odoo.sh
deployment</p>
</div>
<div class="section" id="usage">
<h1><a class="toc-backref" href="#toc-entry-3">Usage</a></h1>
Expand Down Expand Up @@ -871,6 +885,11 @@ <h1><a class="toc-backref" href="#toc-entry-11">Known issues / Roadmap</a></h1>
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement <em>before starting Odoo</em>:</li>
<li>When deployed in high_availability mode the allocated databases for the
jobrunners must be identical. If the databases are different and overlap
i.e jobrunner A runs on DB1,DB2 and jobrunner B runs on DB2,DB3 then either
DB1 or DB3 will not proccess jobs because there can be only one leader per
sets of databases.</li>
</ul>
<pre class="code sql literal-block">
<span class="k">update</span><span class="w"> </span><span class="n">queue_job</span><span class="w"> </span><span class="k">set</span><span class="w"> </span><span class="k">state</span><span class="o">=</span><span class="s1">'pending'</span><span class="w"> </span><span class="k">where</span><span class="w"> </span><span class="k">state</span><span class="w"> </span><span class="k">in</span><span class="w"> </span><span class="p">(</span><span class="s1">'started'</span><span class="p">,</span><span class="w"> </span><span class="s1">'enqueued'</span><span class="p">)</span>
Expand Down Expand Up @@ -930,6 +949,8 @@ <h2><a class="toc-backref" href="#toc-entry-17">Contributors</a></h2>
<li>Souheil Bejaoui &lt;<a class="reference external" href="mailto:souheil.bejaoui&#64;acsone.eu">souheil.bejaoui&#64;acsone.eu</a>&gt;</li>
<li>Eric Antones &lt;<a class="reference external" href="mailto:eantones&#64;nuobit.com">eantones&#64;nuobit.com</a>&gt;</li>
<li>Simone Orsi &lt;<a class="reference external" href="mailto:simone.orsi&#64;camptocamp.com">simone.orsi&#64;camptocamp.com</a>&gt;</li>
<li>Paul Catinean &lt;<a class="reference external" href="mailto:pca&#64;pledra.com">pca&#64;pledra.com</a>&gt;</li>
<li>Ruchir Shukla &lt;<a class="reference external" href="mailto:ruchir&#64;bizzappdev.com">ruchir&#64;bizzappdev.com</a>&gt;</li>
</ul>
</div>
<div class="section" id="maintainers">
Expand Down

0 comments on commit 90ce526

Please sign in to comment.