Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14.0][IMP] queue_job: add cron to purge dead jobs. #653

Open
wants to merge 3 commits into
base: 14.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions queue_job/data/queue_data.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
<field name="state">code</field>
<field name="code">model.requeue_stuck_jobs()</field>
</record>
<record id="ir_cron_queue_job_fail_dead_jobs()" model="ir.cron">
hparfr marked this conversation as resolved.
Show resolved Hide resolved
<field name="name">Jobs Garbage Collector</field>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the same name as the job above.

<field name="interval_number">15</field>
<field name="interval_type">minutes</field>
<field name="numbercall">-1</field>
<field ref="model_queue_job" name="model_id" />
<field name="state">code</field>
<field name="code">model.fail_dead_jobs(240)</field>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to add an advice on how to choose the value somewhere if someone wants to adapt it to its config?
In the Readme or in the methode description ? The ideal value would be the cpu_time limit from the queue job server config right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think a comment here in the code would be nice too 😉

</record>
<!-- Queue-job-related subtypes for messaging / Chatter -->
<record id="mt_job_failed" model="mail.message.subtype">
<field name="name">Job failed</field>
Expand Down
42 changes: 42 additions & 0 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,48 @@
).requeue()
return True

def fail_dead_jobs(self, started_delta):
"""Set as failed job started since too long ago.

Workers can be dead without anyone noticing
Dead workers stuck the channel and provoke
famine.

This function, mark jobs started longtime ago
as failed.

Cause of death can be CPU Time limit reached
a SIGTERM, a power shortage, we can't know, etc.
"""
now = fields.datetime.now()
started_dl = now - timedelta(minutes=started_delta)

Check warning on line 435 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L434-L435

Added lines #L434 - L435 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
started_dl = now - timedelta(minutes=started_delta)
started_dl = fields.Datetime.subtract(now, minutes=started_delta)

if started_delta <= 10:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hardcoded 10 minutes here, perhaps use config['limit_time_real'] seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may have a limit_time_real short for the regular http workers and a long for queue workers on a separate instance.

The idea is to use this cron as a last line of defence against poorly configured instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this would be better served with something like

if started_delta <= int(self.env['ir.config_parameter'].sudo().get_param('queue_job.limit_time_dead', 10)):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an argument to bypass the 10 min limit without having to change the code

raise exceptions.ValidationError(

Check warning on line 437 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L437

Added line #L437 was not covered by tests
_("started_delta is too low. Set at least 10min")
)
domain = [

Check warning on line 440 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L440

Added line #L440 was not covered by tests
"&",
("date_started", "<=", fields.Datetime.to_string(started_dl)),
("state", "=", "started"),
]
job_model = self.env["queue.job"]
stuck_jobs = job_model.search(domain)
msg = {

Check warning on line 447 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L445-L447

Added lines #L445 - L447 were not covered by tests
"exc_info": "",
"exc_name": "Not responding worker. Is it dead ?",
"exc_message": (
"Check for odoo.service.server logs."
"Insestigate logs for CPU time limit reached or check system log"
),
}
for job in stuck_jobs:
# TODO: manage retry:
# if retry < max_retry: retry=+1 and enqueue job instead
# else: set_failed
job_ = Job.load(self.env, job.uuid)
job_.set_failed(**msg)
job_.store()

Check warning on line 461 in queue_job/models/queue_job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/models/queue_job.py#L459-L461

Added lines #L459 - L461 were not covered by tests

def _get_stuck_jobs_domain(self, queue_dl, started_dl):
domain = []
now = fields.datetime.now()
Expand Down
1 change: 1 addition & 0 deletions queue_job/readme/CONTRIBUTORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
* Souheil Bejaoui <[email protected]>
* Eric Antones <[email protected]>
* Simone Orsi <[email protected]>
* Raphaël Reverdy <[email protected]>
Loading