Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the ability to choose the serializer #283

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ popular a tweet is a few times during the course of the day, we could do somethi

**IMPORTANT**: You should always use UTC datetime when working with `RQ Scheduler`_.

The following additional ``kwargs`` are accepted by the ``enqueue_at`` and ``enqueue_in`` methods and passed to the create function of the job class in `RQ`_:

* ``timeout``
* ``job_id``
* ``job_ttl``
* ``job_results_ttl``
* ``depends_on``
* ``meta``
* ``queue_name``
* ``on_success``
* ``on_failure``
* ``serializer``
* ``at_front``


------------------------
Periodic & Repeated Jobs
------------------------
Expand Down Expand Up @@ -142,6 +157,23 @@ This is how you do it
use_local_timezone=False # Interpret hours in the local timezone
)


The following additional ``kwargs`` are accepted by the ``schedule`` and ``cron`` methods and passed to the create function of the job class in `RQ`_:

* ``timeout``
* ``id`` (denotes the ``job_id``)
* ``description``
* ``queue_name``
* ``result_ttl``
* ``ttl``
* ``meta``
* ``depends_on``
* ``on_success``
* ``on_failure``
* ``serializer``
* ``at_front``


-------------------------
Retrieving scheduled jobs
-------------------------
Expand Down
25 changes: 17 additions & 8 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def stop(signum, frame):

def _create_job(self, func, args=None, kwargs=None, commit=True,
result_ttl=None, ttl=None, id=None, description=None,
queue_name=None, timeout=None, meta=None, depends_on=None, on_success=None, on_failure=None):
queue_name=None, timeout=None, meta=None, depends_on=None, on_success=None, on_failure=None, serializer=None):
"""
Creates an RQ job and saves it to Redis. The job is assigned to the
given queue name if not None else it is assigned to scheduler queue by
Expand All @@ -144,6 +144,7 @@ def _create_job(self, func, args=None, kwargs=None, commit=True,
kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id,
description=description, timeout=timeout, meta=meta,
depends_on=depends_on,on_success=on_success,on_failure=on_failure,
serializer=serializer
)
if queue_name:
job.origin = queue_name
Expand Down Expand Up @@ -198,14 +199,17 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs):
queue_name = kwargs.pop('queue_name', None)
on_success = kwargs.pop('on_success', None)
on_failure = kwargs.pop('on_failure', None)
serializer = kwargs.pop('serializer', None)
at_front = kwargs.pop('at_front', None)

job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout,
id=job_id, result_ttl=job_result_ttl, ttl=job_ttl,
description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on,
on_success=on_success, on_failure=on_failure)
on_success=on_success, on_failure=on_failure, serializer=serializer)

if at_front:
job.enqueue_at_front = True

self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(scheduled_time)})
return job
Expand All @@ -226,23 +230,27 @@ def enqueue_in(self, time_delta, func, *args, **kwargs):
queue_name = kwargs.pop('queue_name', None)
on_success = kwargs.pop('on_success', None)
on_failure = kwargs.pop('on_failure', None)
serializer = kwargs.pop('serializer', None)
at_front = kwargs.pop('at_front', False)

job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout,
id=job_id, result_ttl=job_result_ttl, ttl=job_ttl,
description=job_description, meta=meta, queue_name=queue_name,
depends_on=depends_on, on_success=on_success, on_failure=on_failure)
depends_on=depends_on, on_success=on_success, on_failure=on_failure, serializer=serializer)

if at_front:
job.enqueue_at_front = True

self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(datetime.utcnow() + time_delta)})
return job

def schedule(self, scheduled_time, func, args=None, kwargs=None,
interval=None, repeat=None, result_ttl=None, ttl=None,
timeout=None, id=None, description=None,
queue_name=None, meta=None, depends_on=None, on_success=None,
on_failure=None, at_front=None):
queue_name=None, meta=None, depends_on=None, on_success=None,
on_failure=None, serializer=None, at_front=None):

"""
Schedule a job to be periodically executed, at a certain interval.
"""
Expand All @@ -253,7 +261,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None,
result_ttl=result_ttl, ttl=ttl, id=id,
description=description, queue_name=queue_name,
timeout=timeout, meta=meta, depends_on=depends_on,
on_success=on_success, on_failure=on_failure)
on_success=on_success, on_failure=on_failure, serializer=serializer)

if interval is not None:
job.meta['interval'] = int(interval)
Expand All @@ -270,7 +278,8 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None,

def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False,
depends_on=None, on_success=None, on_failure=None, at_front: bool = False):
depends_on=None, on_success=None, on_failure=None, serializer=None, at_front: bool = False):

"""
Schedule a cronjob
"""
Expand All @@ -279,7 +288,7 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
job = self._create_job(func, args=args, kwargs=kwargs, commit=False,
result_ttl=result_ttl, ttl=ttl, id=id, queue_name=queue_name,
description=description, timeout=timeout, meta=meta, depends_on=depends_on,
on_success=on_success, on_failure=on_failure)
on_success=on_success, on_failure=on_failure, serializer=serializer)

job.meta['cron_string'] = cron_string
job.meta['use_local_timezone'] = use_local_timezone
Expand Down