Skip to content

Commit

Permalink
Merge pull request #15 from EssentNovaTeam/14.0-nova-multi-node-lock
Browse files Browse the repository at this point in the history
[14.0] Support multi-nodes with lock on jobrunner (port of OCA#256)
  • Loading branch information
StefanRijnhart authored May 17, 2021
2 parents 54a0695 + ee6d196 commit a47553f
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 5 deletions.
146 changes: 141 additions & 5 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,35 @@
* It does not run jobs itself, but asks Odoo to run them through an
anonymous ``/queue_job/runjob`` HTTP request. [1]_
How does concurrent job runners work?
-------------------------------------
If several nodes (on different hosts or not) of job runners are started,
a shared lock ensures that only one job runner works on a database at
a time. These rules are to take in consideration:
* The identifier of the shared lock is based on the database list provided,
so either ``--database``/``db_name`` or all the databases in PostgreSQL.
* When 2 job runners with the exact same list of databases are started,
only the first one will work. The second one will wait and take over
if the first one is stopped.
Caveats:
* If 2 job runners have a database in common but a different list (e.g.
``db_name=project1,project2`` and ``db_name=project2,project3``), both job
runners will work and listen to ``project2``, which will lead to unexpected
behavior.
* The same applies when no database is specified and all the cluster's databases
are used. If a job runner is started on the cluster's databases, a new database
is created and a second job runner is started, they'll both work on a same set
of databases with unexpected behaviors.
* PostgreSQL advisory locks are based on a integer, the list of database names
is sorted, hashed and converted to an int64, so we lose information in the
identifier. A low risk of collision is possible. If it happens some day, we
should add an option for a custom lock identifier.
How to use it?
--------------
Expand Down Expand Up @@ -134,6 +163,7 @@
"""

import datetime
import hashlib
import logging
import os
import select
Expand All @@ -152,6 +182,8 @@
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager

SELECT_TIMEOUT = 60
TRY_ACQUIRE_INTERVAL = 30 # seconds
SHARED_LOCK_KEEP_ALIVE = 60 # seconds
ERROR_RECOVERY_DELAY = 5

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -251,7 +283,63 @@ def urlopen():
thread.start()


class Database(object):
class SharedLockDatabase(object):
def __init__(self, db_name, lock_name):
self.db_name = db_name
self.lock_ident = self.name_to_int64(lock_name)
connection_info = _connection_info_for(db_name)
self.conn = psycopg2.connect(**connection_info)
self.acquired = False
self._keep_alive_cursor = None
self.try_acquire()

@staticmethod
def name_to_int64(lock_name):
hasher = hashlib.sha256()
hasher.update(lock_name.encode("utf-8"))
# pg_try_advisory_lock is limited to an 8-byte (64bit) signed integer
return int.from_bytes(hasher.digest()[:8], byteorder="big", signed=True)

def try_acquire(self):
self.acquired = self._acquire()
if self.acquired:
# we open a transaction that we will never commit;
# at most every SHARED_LOCK_KEEP_ALIVE seconds we will
# keep it alive with a simple SELECT 1 query;
# if the process crashes or if the connection is cut,
# the pg server will terminate self.conn after
# 2*SHARED_LOCK_KEEP_ALIVE seconds, which will
# free the advisory lock and let another worker take over
self._keep_alive_cursor = self.conn.cursor()
self._keep_alive_cursor.execute(
"SET idle_in_transaction_session_timeout = %s;",
(SHARED_LOCK_KEEP_ALIVE * 1000 * 2,),
)

def _acquire(self):
with closing(self.conn.cursor()) as cr:
# session level lock
cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,))
acquired = cr.fetchone()[0]
return acquired

def keep_alive(self):
query = "SELECT 1"
self._keep_alive_cursor.execute(query)

def close(self):
# pylint: disable=except-pass
# if close fail for any reason, it's either because it's already closed
# and we don't care, or for any reason but anyway it will be closed on
# del
try:
self.conn.close()
except Exception:
pass
self.conn = None


class QueueDatabase(object):
def __init__(self, db_name):
self.db_name = db_name
connection_info = _connection_info_for(db_name)
Expand Down Expand Up @@ -356,6 +444,11 @@ def __init__(
channel_config_string = _channels()
self.channel_manager.simple_configure(channel_config_string)
self._load_capacity = 0

self.shared_lock_db = None
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.list_db_names = self.get_db_names()
self.db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()
Expand Down Expand Up @@ -405,11 +498,22 @@ def close_databases(self, remove_jobs=True):
db.close()
except Exception:
_logger.warning("error closing database %s", db_name, exc_info=True)

self.db_by_name = {}

if self.shared_lock_db:
try:
self.shared_lock_db.close()
except Exception:
_logger.warning(
"error closing database %s",
self.shared_lock_db.db_name,
exc_info=True,
)

def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
for db_name in self.list_db_names:
db = QueueDatabase(db_name)
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
Expand Down Expand Up @@ -524,6 +628,12 @@ def wait_notification(self):
for conn in conns:
conn.poll()

def keep_alive_shared_lock(self):
self.shared_lock_db.keep_alive()

def _lock_ident(self):
return "qj:{}".format("-".join(sorted(self.list_db_names)))

def stop(self):
_logger.info("graceful stop requested")
self._stop = True
Expand All @@ -535,16 +645,42 @@ def run(self):
while not self._stop:
# outer loop does exception recovery
try:
# When concurrent jobrunners are started, the first to win the
# race acquires an advisory lock on PostgreSQL and gets to
# work. When a jobrunner is stopped, the lock is released, and
# another node can take over.
self.shared_lock_db = SharedLockDatabase("postgres", self._lock_ident())
if not self.shared_lock_db.acquired:
self.close_databases()
_logger.info("already started on another node")
# no database to work with... retry later in case a concurrent
# node is stopped
time.sleep(TRY_ACQUIRE_INTERVAL)
continue

_logger.info("initializing database connections")
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
_logger.info("database connections ready")

last_keep_alive = None

# inner loop does the normal processing
while not self._stop:
self.process_notifications()
self.run_jobs()
self.wait_notification()
if (
not last_keep_alive
or time.time() >= last_keep_alive + SHARED_LOCK_KEEP_ALIVE
):
last_keep_alive = time.time()
# send a keepalive on the shared lock connection at
# most every SHARED_LOCK_KEEP_ALIVE seconds
self.keep_alive_shared_lock()
# TODO here, when we have no "db_name", we could list again
# the databases and if the list changed, try to acquire a new
# lock

except KeyboardInterrupt:
self.stop()
except InterruptedError:
Expand Down
25 changes: 25 additions & 0 deletions queue_job/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,28 @@

.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.
Odoo.sh configuration
~~~~~~~~~~~~~~~~~~~~~

When using odoo.sh, the configuration in ``.config/odoo/odoo.conf`` must include
(at least):


.. code-block::
[options]
server_wide_modules=web,queue_job
[queue_job]
host=<your-odoo-instance>.odoo.com
scheme=https
port=443
Example of host: ``myproject-main-1552740.dev.odoo.com``

.. note::
Odoo.sh puts workers to sleep when they stop receiving HTTP requests.
Jobs scheduled in the future or by a scheduled action could therefore not run.
A workaround is to wake up the workers periodically using an external
service (a simple GET on any URL served by Odoo is enough).

0 comments on commit a47553f

Please sign in to comment.