diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 512571c..cff8ee9 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -68,7 +68,7 @@ def stop(signum, frame): signal.signal(signal.SIGTERM, stop) def _create_job(self, func, args=None, kwargs=None, commit=True, - result_ttl=None): + result_ttl=None, depends_on=None): """ Creates an RQ job and saves it to Redis. """ @@ -81,7 +81,7 @@ def _create_job(self, func, args=None, kwargs=None, commit=True, if kwargs is None: kwargs = {} job = Job.create(func, args=args, connection=self.connection, - kwargs=kwargs, result_ttl=result_ttl) + kwargs=kwargs, result_ttl=result_ttl, depends_on=depends_on) job.origin = self.queue_name if commit: job.save() @@ -133,7 +133,7 @@ def enqueue_periodic(self, scheduled_time, interval, repeat, func, interval=interval, repeat=repeat) def schedule(self, scheduled_time, func, args=None, kwargs=None, - interval=None, repeat=None, result_ttl=None, timeout=None): + interval=None, repeat=None, result_ttl=None, timeout=None, depends_on=None): """ Schedule a job to be periodically executed, at a certain interval. """ @@ -141,7 +141,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, if interval is not None and result_ttl is None: result_ttl = -1 job = self._create_job(func, args=args, kwargs=kwargs, commit=False, - result_ttl=result_ttl) + result_ttl=result_ttl, depends_on=depends_on) if interval is not None: job.meta['interval'] = int(interval) if repeat is not None: @@ -296,8 +296,13 @@ def enqueue_jobs(self): jobs = self.get_jobs_to_queue() for job in jobs: - self.enqueue_job(job) - + if job.dependency is None: # if there is no dependency, then go ahead schedule it + self.enqueue_job(job) + else: # if there is a dependency, then check if that job is done + parent_job = job.dependency + if parent_job.is_finished: + self.enqueue_job(job) + # Refresh scheduler key's expiry self.connection.expire(self.scheduler_key, self._interval + 10) return jobs