diff --git a/django_q/cluster.py b/django_q/cluster.py index 9bce4f5e..2d56fa0a 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -452,6 +452,16 @@ def worker( break logger.info(_(f"{name} stopped doing work")) +def get_func_repr(func): + # convert func to string + if inspect.isfunction(func): + return f"{func.__module__}.{func.__name__}" + elif inspect.ismethod(func): + return ( + f"{func.__self__.__module__}." + f"{func.__self__.__name__}.{func.__name__}" + ) + return func def save_task(task, broker: Broker): """ @@ -473,10 +483,17 @@ def save_task(task, broker: Broker): ) # SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning close_old_django_connections() + try: + filters = {} + if Conf.SAVE_LIMIT_PER and Conf.SAVE_LIMIT_PER in {"group", "name", "func"} and Conf.SAVE_LIMIT_PER in task: + value = task[Conf.SAVE_LIMIT_PER] + if Conf.SAVE_LIMIT_PER == "func": + value = get_func_repr(value) + filters[Conf.SAVE_LIMIT_PER] = value with db.transaction.atomic(): - last = Success.objects.select_for_update().last() - if task["success"] and 0 < Conf.SAVE_LIMIT <= Success.objects.count(): + last = Success.objects.filter(**filters).select_for_update().last() + if task["success"] and 0 < Conf.SAVE_LIMIT <= Success.objects.filter(**filters).count(): last.delete() # check if this task has previous results if Task.objects.filter(id=task["id"], name=task["name"]).exists(): @@ -496,15 +513,8 @@ def save_task(task, broker: Broker): broker.acknowledge(task["ack_id"]) else: - func = task["func"] # convert func to string - if inspect.isfunction(func): - func = f"{func.__module__}.{func.__name__}" - elif inspect.ismethod(func): - func = ( - f"{func.__self__.__module__}." - f"{func.__self__.__name__}.{func.__name__}" - ) + func = get_func_repr(task["func"]) Task.objects.create( id=task["id"], name=task["name"], diff --git a/django_q/conf.py b/django_q/conf.py index 3a8f3982..9e5ef3c5 100644 --- a/django_q/conf.py +++ b/django_q/conf.py @@ -86,6 +86,9 @@ class Conf: # Failures are always saved SAVE_LIMIT = conf.get("save_limit", 250) + # save-limit can be set per Task's "group" or "name" or "func" + SAVE_LIMIT_PER = conf.get("save_limit_per", None) + # Guard loop sleep in seconds. Should be between 0 and 60 seconds. GUARD_CYCLE = conf.get("guard_cycle", 0.5) diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 1755932e..52bcc250 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -407,6 +407,47 @@ def test_recycle(broker, monkeypatch): assert Success.objects.count() == Conf.SAVE_LIMIT broker.delete_queue() +@pytest.mark.django_db +def test_save_limit_per_func(broker, monkeypatch): + # set up the Sentinel + broker.list_key = "test_recycle_test:q" + async_task("django_q.tests.tasks.hello", broker=broker) + async_task("django_q.tests.tasks.countdown", 2, broker=broker) + async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker) + start_event = Event() + stop_event = Event() + cluster_id = uuidlib.uuid4() + task_queue = Queue() + result_queue = Queue() + # override settings + monkeypatch.setattr(Conf, "RECYCLE", 3) + monkeypatch.setattr(Conf, "WORKERS", 1) + # set a timer to stop the Sentinel + threading.Timer(3, stop_event.set).start() + for i in range(3): + pusher(task_queue, stop_event, broker=broker) + worker(task_queue, result_queue, Value("f", -1)) + s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker) + assert start_event.is_set() + assert s.status() == Conf.STOPPED + # worker should exit on recycle + # check if the work has been done + assert result_queue.qsize() == 3 + # save_limit test + monkeypatch.setattr(Conf, "SAVE_LIMIT", 1) + monkeypatch.setattr(Conf, "SAVE_LIMIT_PER", "func") + result_queue.put("STOP") + # run monitor + monitor(result_queue) + assert Success.objects.count() == 3 + assert set(Success.objects.filter().values_list('func', flat=True)) == { + 'django_q.tests.tasks.countdown', + 'django_q.tests.tasks.hello', + 'django_q.tests.tasks.multiply', + } + broker.delete_queue() + + @pytest.mark.django_db def test_max_rss(broker, monkeypatch): diff --git a/docs/configure.rst b/docs/configure.rst index 14dda2d0..ca536aa9 100644 --- a/docs/configure.rst +++ b/docs/configure.rst @@ -142,6 +142,14 @@ Limits the amount of successful tasks saved to Django. - Defaults to ``250`` - Failures are always saved. +save_limit_per +~~~~~~~~~~~~~ + +The above ``save_limit`` for successful tasks can be fine tuned per task type using + - Set to ``"group"`` to store the tasks per group + - Other possible values are ``"func"``, ``"name"``, ``None`` + - Defaults to ``None`` + guard_cycle ~~~~~~~~~~~