Skip to content

Commit

Permalink
Merge pull request #68 from Koed00/dev
Browse files Browse the repository at this point in the history
Adds a wait option for results.
  • Loading branch information
Koed00 committed Sep 18, 2015
2 parents a30e5d2 + 16c28a8 commit e60414d
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 12 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ Use `async` from your code to quickly offload tasks:
task_result = result(task_id)
# result returns None if the task has not been executed yet
# so in most cases you will want to use a hook:
# you can wait for it
task_result = result(task_id, 200)
# but in most cases you will want to use a hook:
async('math.modf', 2.5, hook='hooks.print_result')
Expand Down
3 changes: 3 additions & 0 deletions django_q/brokers/aws_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def acknowledge(self, task_id):
def queue_size(self):
return int(self.queue.attributes['ApproximateNumberOfMessages'])

def lock_size(self):
return int(self.queue.attributes['ApproximateNumberOfMessagesNotVisible'])

def delete(self, task_id):
message = self.sqs.Message(self.queue.url, task_id)
message.delete()
Expand Down
3 changes: 1 addition & 2 deletions django_q/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def monitor(run_once=False, broker=None):
# bottom bar
i += 1
queue_size = broker.queue_size()
if Conf.ORM:
if hasattr(broker, 'lock_size'):
queue_size = '{}({})'.format(queue_size, broker.lock_size())
print(term.move(i, 0) + term.white_on_cyan(term.center(broker.info(), width=col_width * 2)))
print(term.move(i, 2 * col_width) + term.black_on_cyan(term.center(_('Queued'), width=col_width)))
Expand Down Expand Up @@ -183,4 +183,3 @@ def info(broker=None):
term.white('{0:.4f}'.format(exec_time))
)
return True

27 changes: 23 additions & 4 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.utils import timezone

# local
import time
import signing
import cluster
from django_q.conf import Conf, logger
Expand Down Expand Up @@ -82,16 +83,25 @@ def schedule(func, *args, **kwargs):
)


def result(task_id):
def result(task_id, wait=0):
"""
Return the result of the named task.
:type task_id: str or uuid
:param task_id: the task name or uuid
:type wait: int
:param wait: number of milliseconds to wait for a result
:return: the result object of this task
:rtype: object
"""
return Task.get_result(task_id)
start = time.time()
while True:
r = Task.get_result(task_id)
if r:
return r
if (time.time() - start) * 1000 >= wait:
break
time.sleep(0.01)


def result_group(group_id, failures=False):
Expand All @@ -105,16 +115,25 @@ def result_group(group_id, failures=False):
return Task.get_result_group(group_id, failures)


def fetch(task_id):
def fetch(task_id, wait=0):
"""
Return the processed task.
:param task_id: the task name or uuid
:type task_id: str or uuid
:param wait: the number of milliseconds to wait for a result
:type wait: int
:return: the full task object
:rtype: Task
"""
return Task.get_task(task_id)
start = time.time()
while True:
t = Task.get_task(task_id)
if t:
return t
if (time.time() - start) * 1000 >= wait:
break
time.sleep(0.01)


def fetch_group(group_id, failures=True):
Expand Down
1 change: 1 addition & 0 deletions django_q/tests/test_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def test_sqs():
broker.acknowledge(task[0])
# duplicate acknowledge
broker.acknowledge(task[0])
assert broker.lock_size() == 0
# delete queue
broker.enqueue('test')
broker.purge_queue()
Expand Down
2 changes: 2 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def test_async(broker, admin_user):
assert result_j.group_delete(tasks=True) is None
# task k should not have been saved
assert fetch(k) is None
assert fetch(k, 100) is None
assert result(k, 100) is None
broker.delete_queue()


Expand Down
5 changes: 5 additions & 0 deletions docs/brokers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ You can override this class if you want to contribute and support your own broke
Returns the amount of messages in the brokers queue.

.. py:method:: lock_size()
Optional method that returns the number of messages currently awaiting acknowledgement.
Only implemented on brokers that support it.

.. py:method:: ping()
Returns True if the broker can be reached.
Expand Down
13 changes: 9 additions & 4 deletions docs/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ Use :func:`async` from your code to quickly offload tasks to the :class:`Cluster
task_result = result(task_id)
# result returns None if the task has not been executed yet
# so in most cases you will want to use a hook:
# you can wait for it
task_result = result(task_id, 200)
# but in most cases you will want to use a hook:
async('math.modf', 2.5, hook='hooks.print_result')
Expand Down Expand Up @@ -208,19 +211,21 @@ Reference
:returns: The uuid of the task
:rtype: str

.. py:function:: result(task_id)
.. py:function:: result(task_id, wait=0)
Gets the result of a previously executed task

:param str task_id: the uuid or name of the task
:param int wait: optional milliseconds to wait for a result
:returns: The result of the executed task

.. py:function:: fetch(task_id)
.. py:function:: fetch(task_id, wait=0)
Returns a previously executed task

:param str name: the uuid or name of the task
:returns: The task if any
:param int wait: optional milliseconds to wait for a result
:returns: A task object
:rtype: Task

.. versionchanged:: 0.2.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
arrow==0.6.0
blessed==1.9.5
boto3==1.1.3
botocore==1.2.2 # via boto3
botocore==1.2.4 # via boto3
django-picklefield==0.3.2
django-redis==4.2.0
docutils==0.12 # via botocore
Expand Down

0 comments on commit e60414d

Please sign in to comment.