Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save limit per group/func/name #687

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,16 @@ def worker(
break
logger.info(_(f"{name} stopped doing work"))

def get_func_repr(func):
# convert func to string
if inspect.isfunction(func):
return f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func):
return (
f"{func.__self__.__module__}."
f"{func.__self__.__name__}.{func.__name__}"
)
return func

def save_task(task, broker: Broker):
"""
Expand All @@ -473,10 +483,17 @@ def save_task(task, broker: Broker):
)
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
close_old_django_connections()

try:
filters = {}
if Conf.SAVE_LIMIT_PER and Conf.SAVE_LIMIT_PER in {"group", "name", "func"} and Conf.SAVE_LIMIT_PER in task:
value = task[Conf.SAVE_LIMIT_PER]
if Conf.SAVE_LIMIT_PER == "func":
value = get_func_repr(value)
filters[Conf.SAVE_LIMIT_PER] = value
with db.transaction.atomic():
last = Success.objects.select_for_update().last()
if task["success"] and 0 < Conf.SAVE_LIMIT <= Success.objects.count():
last = Success.objects.filter(**filters).select_for_update().last()
if task["success"] and 0 < Conf.SAVE_LIMIT <= Success.objects.filter(**filters).count():
last.delete()
# check if this task has previous results
if Task.objects.filter(id=task["id"], name=task["name"]).exists():
Expand All @@ -496,15 +513,8 @@ def save_task(task, broker: Broker):
broker.acknowledge(task["ack_id"])

else:
func = task["func"]
# convert func to string
if inspect.isfunction(func):
func = f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func):
func = (
f"{func.__self__.__module__}."
f"{func.__self__.__name__}.{func.__name__}"
)
func = get_func_repr(task["func"])
Task.objects.create(
id=task["id"],
name=task["name"],
Expand Down
3 changes: 3 additions & 0 deletions django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class Conf:
# Failures are always saved
SAVE_LIMIT = conf.get("save_limit", 250)

# save-limit can be set per Task's "group" or "name" or "func"
SAVE_LIMIT_PER = conf.get("save_limit_per", None)

# Guard loop sleep in seconds. Should be between 0 and 60 seconds.
GUARD_CYCLE = conf.get("guard_cycle", 0.5)

Expand Down
41 changes: 41 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,47 @@ def test_recycle(broker, monkeypatch):
assert Success.objects.count() == Conf.SAVE_LIMIT
broker.delete_queue()

@pytest.mark.django_db
def test_save_limit_per_func(broker, monkeypatch):
# set up the Sentinel
broker.list_key = "test_recycle_test:q"
async_task("django_q.tests.tasks.hello", broker=broker)
async_task("django_q.tests.tasks.countdown", 2, broker=broker)
async_task("django_q.tests.tasks.multiply", 2, 2, broker=broker)
start_event = Event()
stop_event = Event()
cluster_id = uuidlib.uuid4()
task_queue = Queue()
result_queue = Queue()
# override settings
monkeypatch.setattr(Conf, "RECYCLE", 3)
monkeypatch.setattr(Conf, "WORKERS", 1)
# set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
for i in range(3):
pusher(task_queue, stop_event, broker=broker)
worker(task_queue, result_queue, Value("f", -1))
s = Sentinel(stop_event, start_event, cluster_id=cluster_id, broker=broker)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
# worker should exit on recycle
# check if the work has been done
assert result_queue.qsize() == 3
# save_limit test
monkeypatch.setattr(Conf, "SAVE_LIMIT", 1)
monkeypatch.setattr(Conf, "SAVE_LIMIT_PER", "func")
result_queue.put("STOP")
# run monitor
monitor(result_queue)
assert Success.objects.count() == 3
assert set(Success.objects.filter().values_list('func', flat=True)) == {
'django_q.tests.tasks.countdown',
'django_q.tests.tasks.hello',
'django_q.tests.tasks.multiply',
}
broker.delete_queue()



@pytest.mark.django_db
def test_max_rss(broker, monkeypatch):
Expand Down
8 changes: 8 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ Limits the amount of successful tasks saved to Django.
- Defaults to ``250``
- Failures are always saved.

save_limit_per
~~~~~~~~~~~~~

The above ``save_limit`` for successful tasks can be fine tuned per task type using
- Set to ``"group"`` to store the tasks per group
- Other possible values are ``"func"``, ``"name"``, ``None``
- Defaults to ``None``

guard_cycle
~~~~~~~~~~~

Expand Down