From 85d873576666ca38aa2aa8040125c6330b4ea533 Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Thu, 2 Sep 2021 14:24:27 +0200 Subject: [PATCH 01/10] Replace use of eval() by ast.parse() + ast.literal_eval() --- django_q/cluster.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 9bce4f5e..b71521f3 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -598,10 +598,15 @@ def scheduler(broker: Broker = None): # get args, kwargs and hook if s.kwargs: try: - # eval should be safe here because dict() - kwargs = eval(f"dict({s.kwargs})") - except SyntaxError: - kwargs = {} + # first try the dict syntax + kwargs = ast.literal_eval(s.kwargs) + except (SyntaxError, ValueError): + # else use the kwargs syntax + try: + parsed_kwargs = ast.parse(f"f({s.kwargs})").body[0].value.keywords + kwargs = {kwarg.arg: ast.literal_eval(kwarg.value) for kwarg in parsed_kwargs} + except (SyntaxError, ValueError): + kwargs = {} if s.args: args = ast.literal_eval(s.args) # single value won't eval to tuple, so: From 2e329000f20bbb3300629475c584dddc4316345f Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Thu, 2 Sep 2021 14:15:29 +0200 Subject: [PATCH 02/10] Fix connection issues with CONN_MAX_AGE > 0 --- django_q/cluster.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index b71521f3..dda99cca 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -213,8 +213,9 @@ def reincarnate(self, process): :param process: the process to reincarnate :type process: Process or None """ + # close connections before spawning new process if not Conf.SYNC: - db.connections.close_all() # Close any old connections + db.connections.close_all() if process == self.monitor: self.monitor = self.spawn_monitor() logger.error(_(f"reincarnated monitor {process.name} after sudden death")) @@ -238,8 +239,9 @@ def reincarnate(self, process): def spawn_cluster(self): self.pool = [] Stat(self).save() + # close connections before spawning new process if not Conf.SYNC: - db.connection.close() + db.connections.close_all() # spawn worker pool for __ in range(self.pool_size): self.spawn_worker() @@ -697,7 +699,7 @@ def close_old_django_connections(): logger.warning( "Preserving django database connections because sync=True. Beware " "that tasks are now injected in the calling context/transactions " - "which may result in unexpected bahaviour." + "which may result in unexpected behaviour." ) else: db.close_old_connections() From cf091e20a45f65ba0b9580d30bc37e29fefbd973 Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Sun, 5 Sep 2021 18:58:41 +0200 Subject: [PATCH 03/10] Add the group column to OrmQ admin page --- django_q/admin.py | 2 +- django_q/models.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/django_q/admin.py b/django_q/admin.py index 53aaa16f..1de165ef 100644 --- a/django_q/admin.py +++ b/django_q/admin.py @@ -86,7 +86,7 @@ class ScheduleAdmin(admin.ModelAdmin): class QueueAdmin(admin.ModelAdmin): """queue admin for ORM broker""" - list_display = ("id", "key", "task_id", "name", "func", "lock") + list_display = ("id", "key", "name", "group", "func", "lock", "task_id") def save_model(self, request, obj, form, change): obj.save(using=Conf.ORM) diff --git a/django_q/models.py b/django_q/models.py index d83e0392..6c20b4c0 100644 --- a/django_q/models.py +++ b/django_q/models.py @@ -246,6 +246,9 @@ def task_id(self): def name(self): return self.task()["name"] + def group(self): + return self.task().get("group") + class Meta: app_label = "django_q" verbose_name = _("Queued task") From 5c5fa5b83b032df93bad71f18c345c748d39bee6 Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Sun, 5 Sep 2021 18:59:25 +0200 Subject: [PATCH 04/10] Add group field to tasks list and search_field --- django_q/admin.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/django_q/admin.py b/django_q/admin.py index 1de165ef..f4818efb 100644 --- a/django_q/admin.py +++ b/django_q/admin.py @@ -10,7 +10,7 @@ class TaskAdmin(admin.ModelAdmin): """model admin for success tasks.""" - list_display = ("name", "func", "started", "stopped", "time_taken", "group") + list_display = ("name", "group", "func", "started", "stopped", "time_taken") def has_add_permission(self, request): """Don't allow adds.""" @@ -43,14 +43,14 @@ def retry_failed(FailAdmin, request, queryset): class FailAdmin(admin.ModelAdmin): """model admin for failed tasks.""" - list_display = ("name", "func", "started", "stopped", "short_result") + list_display = ("name", "group", "func", "started", "stopped", "short_result") def has_add_permission(self, request): """Don't allow adds.""" return False actions = [retry_failed] - search_fields = ("name", "func") + search_fields = ("name", "func", "group") list_filter = ("group",) readonly_fields = [] From 3e7ca6577fec60e367f1d86692a2cf73082c3df9 Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Thu, 2 Sep 2021 13:48:14 +0200 Subject: [PATCH 05/10] Add search by name to Schedule admin page --- django_q/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/django_q/admin.py b/django_q/admin.py index f4818efb..6212faca 100644 --- a/django_q/admin.py +++ b/django_q/admin.py @@ -79,7 +79,7 @@ class ScheduleAdmin(admin.ModelAdmin): readonly_fields = ("cron",) list_filter = ("next_run", "schedule_type", "cluster") - search_fields = ("func",) + search_fields = ("name", "func",) list_display_links = ("id", "name") From df90c8c4cd2dd17c08ddf3bc16ecc9e34d8cd03c Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Thu, 2 Sep 2021 14:45:49 +0200 Subject: [PATCH 06/10] Limit succesful task history per schedule with SAVE_LIMIT_GROUP config --- django_q/cluster.py | 22 +++++++++++++++------- django_q/conf.py | 8 ++++++-- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index dda99cca..d20361d1 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -476,12 +476,21 @@ def save_task(task, broker: Broker): # SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning close_old_django_connections() try: - with db.transaction.atomic(): - last = Success.objects.select_for_update().last() - if task["success"] and 0 < Conf.SAVE_LIMIT <= Success.objects.count(): - last.delete() + if task["success"]: + # first apply per group success history limit + if "group" in task: + with db.transaction.atomic(): + qs = Success.objects.filter(group=task["group"]) + last = qs.select_for_update().last() + if Conf.SAVE_LIMIT_PER_GROUP <= qs.count(): + last.delete() + # then apply global success history limit + with db.transaction.atomic(): + last = Success.objects.select_for_update().last() + if Conf.SAVE_LIMIT <= Success.objects.count(): + last.delete() # check if this task has previous results - if Task.objects.filter(id=task["id"], name=task["name"]).exists(): + try: existing_task = Task.objects.get(id=task["id"], name=task["name"]) # only update the result if it hasn't succeeded yet if not existing_task.success: @@ -496,8 +505,7 @@ def save_task(task, broker: Broker): and existing_task.attempt_count >= Conf.MAX_ATTEMPTS ): broker.acknowledge(task["ack_id"]) - - else: + except Task.DoesNotExist: func = task["func"] # convert func to string if inspect.isfunction(func): diff --git a/django_q/conf.py b/django_q/conf.py index 3a8f3982..6436eafb 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -86,6 +86,10 @@ class Conf: # Failures are always saved SAVE_LIMIT = conf.get("save_limit", 250) + # Maximum number of successful tasks of the same group kept in the database. 0 saves everything. -1 saves none + # Failures are always saved + SAVE_LIMIT_PER_GROUP = conf.get("save_limit_per_group", 5) + # Guard loop sleep in seconds. Should be between 0 and 60 seconds. GUARD_CYCLE = conf.get("guard_cycle", 0.5) @@ -137,8 +141,8 @@ class Conf: # Verify if retry and timeout settings are correct if not TIMEOUT or (TIMEOUT > RETRY): warn( - """Retry and timeout are misconfigured. Set retry larger than timeout, - failure to do so will cause the tasks to be retriggered before completion. + """Retry and timeout are misconfigured. Set retry larger than timeout, + failure to do so will cause the tasks to be retriggered before completion. See https://django-q.readthedocs.io/en/latest/configure.html#retry for details.""" ) From d9266c0987bfe5e2382c0b3aa0ce1daf60f5d7d3 Mon Sep 17 00:00:00 2001 From: Marc Sabatier Date: Sun, 5 Sep 2021 18:49:02 +0200 Subject: [PATCH 07/10] Improve ScheduleAdmin list view performance --- django_q/admin.py | 32 +++++++++++++++++++++++++++++--- django_q/models.py | 3 +++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/django_q/admin.py b/django_q/admin.py index 6212faca..9a34115a 100644 --- a/django_q/admin.py +++ b/django_q/admin.py @@ -1,9 +1,12 @@ """Admin module for Django.""" +from django.urls import reverse +from django.utils.html import format_html from django.contrib import admin from django.utils.translation import gettext_lazy as _ +from django.db.models.expressions import OuterRef, Subquery from django_q.conf import Conf, croniter -from django_q.models import Failure, OrmQ, Schedule, Success +from django_q.models import Failure, OrmQ, Schedule, Success, Task from django_q.tasks import async_task @@ -70,8 +73,8 @@ class ScheduleAdmin(admin.ModelAdmin): "repeats", "cluster", "next_run", - "last_run", - "success", + "get_last_run", + "get_success", ) # optional cron strings @@ -82,6 +85,29 @@ class ScheduleAdmin(admin.ModelAdmin): search_fields = ("name", "func",) list_display_links = ("id", "name") + def get_queryset(self, request): + qs = super().get_queryset(request) + task_query = Task.objects.filter(id=OuterRef('task')).values('id', 'name', 'success') + qs = qs.annotate(task_id=Subquery(task_query.values('id')), task_name=Subquery(task_query.values('name')), + task_success=Subquery(task_query.values('success'))) + return qs + + def get_success(self, obj): + return obj.task_success + get_success.boolean = True + get_success.short_description = _("success") + + def get_last_run(self, obj): + if obj.task_name is not None: + if obj.task_success: + url = reverse("admin:django_q_success_change", args=(obj.task_id,)) + else: + url = reverse("admin:django_q_failure_change", args=(obj.task_id,)) + return format_html(f'[{obj.task_name}]') + return None + get_last_run.allow_tags = True + get_last_run.short_description = _("last_run") + class QueueAdmin(admin.ModelAdmin): """queue admin for ORM broker""" diff --git a/django_q/models.py b/django_q/models.py index 6c20b4c0..de9ef27e 100644 --- a/django_q/models.py +++ b/django_q/models.py @@ -220,7 +220,10 @@ def __str__(self): return self.func success.boolean = True + success.short_description = _("success") last_run.allow_tags = True + last_run.short_description = _("last_run") + class Meta: app_label = "django_q" From 2f9b2f712992d374d9e82f723a791388362866be Mon Sep 17 00:00:00 2001 From: Jason McVetta Date: Tue, 3 Mar 2020 17:40:16 +0700 Subject: [PATCH 08/10] do not decrement when `repeats == -1` A negative value for `Schedule.repeat` means repeat forever. --- django_q/cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index d20361d1..5f6b933b 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -662,7 +662,8 @@ def scheduler(broker: Broker = None): if settings.USE_TZ else next_run.datetime.replace(tzinfo=None) ) - s.repeats += -1 + if s.repeats > 0: + s.repeats -= 1 # send it to the cluster scheduled_broker = broker try: From 45f5d255c1ed203736beefbeaf384f3f4132985f Mon Sep 17 00:00:00 2001 From: Rob Harvey Date: Tue, 8 Mar 2022 08:54:13 -0800 Subject: [PATCH 09/10] Fix async_task with method argument --- django_q/cluster.py | 10 +++++---- django_q/tests/test_cluster.py | 41 ++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/django_q/cluster.py b/django_q/cluster.py index 5f6b933b..0fd6b429 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -509,16 +509,18 @@ def save_task(task, broker: Broker): func = task["func"] # convert func to string if inspect.isfunction(func): - func = f"{func.__module__}.{func.__name__}" - elif inspect.ismethod(func): - func = ( + func_name = f"{func.__module__}.{func.__name__}" + elif inspect.ismethod(func) and hasattr(func.__self__, '__name__'): + func_name = ( f"{func.__self__.__module__}." f"{func.__self__.__name__}.{func.__name__}" ) + else: + func_name = str(func) Task.objects.create( id=task["id"], name=task["name"], - func=func, + func=func_name, hook=task.get("hook"), args=task["args"], kwargs=task["kwargs"], diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 1755932e..1656cdfb 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -304,6 +304,47 @@ def test_enqueue(broker, admin_user): broker.delete_queue() +class TestAsyncSelf: + # __name__ = "TestSelf" + + def run(self): + return 5 + + @pytest.mark.django_db + def test_async_self_method(self, broker): + broker.list_key = "cluster_test:q" + broker.delete_queue() + b = async_task( + self.run, + broker=broker, + ) + assert isinstance(b, str) + + # run through async + task_queue = Queue() + stop_event = Event() + stop_event.set() + pusher(task_queue, stop_event, broker=broker) + assert broker.queue_size() == 0 + assert task_queue.qsize() == 1 + task_queue.put("STOP") + + result_queue = Queue() + worker(task_queue, result_queue, Value("f", -1)) + assert result_queue.qsize() == 1 + result_queue.put("STOP") + + monitor(result_queue) + assert result_queue.qsize() == 0 + + # check results + result_b = fetch(b) + assert result_b is not None + assert result_b.success is True + assert result(b) == 5 + broker.delete_queue() + + @pytest.mark.django_db @pytest.mark.parametrize( "cluster_config_timeout, async_task_kwargs", From 81f96395609c665654c64b51988b090fc705e534 Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Tue, 12 Jul 2022 14:02:02 -0700 Subject: [PATCH 10/10] Updates the list of parent project PRs included in this fork --- CHANGELOG.md | 7 +++++++ IDEAS.md | 12 ------------ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e41edd84..f30b0509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,13 @@ - Dropped support for Django 2.2 - Removed Django 2.2 testing - Added testing for Python 3.10 +- Applied the following parent fork PRs: + - https://github.com/Koed00/django-q/pull/603 + - https://github.com/Koed00/django-q/pull/604 + - https://github.com/Koed00/django-q/pull/605 + - https://github.com/Koed00/django-q/pull/423 + - https://github.com/Koed00/django-q/pull/672 + - https://github.com/Koed00/django-q/pull/659 Full Changes: https://github.com/Koed00/django-q/compare/master...paperless-ngx:paperless-main diff --git a/IDEAS.md b/IDEAS.md index 08ef9800..2c6a30f5 100644 --- a/IDEAS.md +++ b/IDEAS.md @@ -4,12 +4,10 @@ - [Cons](#cons) - [Simplify Human Hash](#simplify-human-hash) - [Event Driven vs Polling](#event-driven-vs-polling) - - [Add .editorconfig](#add-editorconfig) - [Add .pre-commit-config.yaml](#add-pre-commit-configyaml) - [Sub-classing Process](#sub-classing-process) - [Better Process Naming](#better-process-naming) - [File Organization](#file-organization) - - [Useful PRs](#useful-prs) # Ideas @@ -64,10 +62,6 @@ task in the future, instead of polling continuously? Set the future timeout in This would be a very large change in the architecture, but if it (or even potions of it) are possible, it would be a much better solution than consistently polling and eating up cycles. -## Add .editorconfig - -Pretty easy, adding an .editorconfig file will help keep the styling consistent across multiple authors. - ## Add .pre-commit-config.yaml Along the same idea as above, adding a pre-commit configuration will help enforce styling and formatting, @@ -93,9 +87,3 @@ an idea of how many times a task has been re-incarnated or recycled) The `cluster.py` file contains a lot more than Cluster. Simplify the file by moving other classes to their own files and common functionality to an appropriate file as well. -## Useful PRs - -- https://github.com/Koed00/django-q/pull/603 -- https://github.com/Koed00/django-q/pull/604 -- https://github.com/Koed00/django-q/pull/605 -- https://github.com/Koed00/django-q/pull/423