Skip to content

Commit

Permalink
Revert "[IMP] Log warning in case same database added for multiple jo…
Browse files Browse the repository at this point in the history
…brunners"
  • Loading branch information
PCatinean authored Dec 19, 2023
1 parent ebc4aa2 commit 9fa09ed
Showing 1 changed file with 5 additions and 41 deletions.
46 changes: 5 additions & 41 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,27 +284,6 @@ def close(self):
pass
self.conn = None

def _check_multiple_jobrunners(self, jobrunner_db_names):
"""Check if any database in jobrunner_db_names has multiple 'jobrunner_' connections"""
with closing(self.conn.cursor()) as cr:
cr.execute(
"""
SELECT datname
FROM pg_stat_activity
WHERE application_name LIKE 'jobrunner_%%' AND
datname IN %s
GROUP BY datname
HAVING COUNT(DISTINCT application_name) > 1;""",
(jobrunner_db_names,),
)
results = cr.fetchall()

for (db_name,) in results:
_logger.warning(
"Database '%s' has multiple jobrunners configured.",
db_name,
)

def _check_leader(self, jobrunner_db_names):
"""Check if the linked jobrunner is the leader of all jobrunner_db_names"""
if not self.jobrunner_ha_uuid:
Expand Down Expand Up @@ -332,6 +311,7 @@ def _check_leader(self, jobrunner_db_names):
LEADER_CHECK_DELAY,
)
return False

_logger.info(
"jobrunner %s is now the leader of db(s) [ %s ]",
self.jobrunner_ha_uuid,
Expand Down Expand Up @@ -591,30 +571,15 @@ def stop(self):
# wakeup the select() in wait_notification
os.write(self._stop_pipe[1], b".")

def get_db_names_and_first_obj(self):
"""Return the list of all database name and first db obj"""
jobrunner_db_names = tuple(self.db_by_name.keys())
if not jobrunner_db_names:
return False, False

# Use the first db connection to for leadership check
db_obj = self.db_by_name[jobrunner_db_names[0]]
return jobrunner_db_names, db_obj

def check_db_multiple_jobrunners(self):
"""check database method for multiple jobrunners"""
jobrunner_db_names, db_obj = self.get_db_names_and_first_obj()
if not jobrunner_db_names or not db_obj:
return False
return db_obj._check_multiple_jobrunners(jobrunner_db_names)

def check_db_leader(self):
"""Check if the current jobrunner is the leader for all configured databases"""
jobrunner_db_names = tuple(self.db_by_name.keys())

jobrunner_db_names, db_obj = self.get_db_names_and_first_obj()
if not jobrunner_db_names or not db_obj:
if not jobrunner_db_names:
return False

# Use the first db connection to for leadership check
db_obj = self.db_by_name[jobrunner_db_names[0]]
return db_obj._check_leader(jobrunner_db_names)

def run(self):
Expand All @@ -626,7 +591,6 @@ def run(self):
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
self.check_db_multiple_jobrunners()
while not self._stop and self.uuid:
leader = self.check_db_leader()
if leader:
Expand Down

0 comments on commit 9fa09ed

Please sign in to comment.