diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cd4c47f4..68e94071 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -39,6 +39,9 @@ jobs: pip install black black . --check if: matrix.python-version == '3.12' + - run: | + pip install pytest + ADAPTER=tiny HTTPBIN=httpbin.bemisc.com pytest - run: ADAPTER=tiny HTTPBIN=httpbin.bemisc.com python setup.py test build-pypy: name: Build PyPy @@ -63,4 +66,7 @@ jobs: pip install black black . --check if: matrix.python-version == '3.12' + - run: | + pip install pytest + ADAPTER=tiny HTTPBIN=httpbin.bemisc.com pytest - run: ADAPTER=tiny HTTPBIN=httpbin.bemisc.com pypy setup.py test diff --git a/CHANGELOG.md b/CHANGELOG.md index c8e6b817..e29ae8d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -* +* Support for Cron based scheduling using `SchedulerCron` class - [#65](https://github.com/hivesolutions/appier/issues/65) ### Changed diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..a4fbc17c --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +python_files = *.py +testpaths = src/appier/test \ No newline at end of file diff --git a/src/appier/__init__.py b/src/appier/__init__.py index ba86d4f4..e051ee4b 100644 --- a/src/appier/__init__.py +++ b/src/appier/__init__.py @@ -268,7 +268,7 @@ from .queuing import Queue, MemoryQueue, MultiprocessQueue, AMQPQueue from .redisdb import Redis from .request import CODE_STRINGS, Request, MockRequest -from .scheduler import Scheduler +from .scheduler import Scheduler, CronScheduler, SchedulerTask, SchedulerDate, Cron from .serialize import serialize_csv, serialize_ics, build_encoder from .session import ( Session, diff --git a/src/appier/base.py b/src/appier/base.py index e5bdbd0c..5d38ad20 100644 --- a/src/appier/base.py +++ b/src/appier/base.py @@ -68,6 +68,7 @@ from . import settings from . import observer from . import execution +from . import scheduler from . import controller from . import structures from . import exceptions @@ -427,6 +428,7 @@ def __init__( self._user_routes = None self._core_routes = None self._own = self + self._cron = None self._peers = {} self.__routes = [] self.load(level=level, handlers=handlers) @@ -732,6 +734,7 @@ def start(self, refresh=True): self._start_controllers() self._start_models() self._start_supervisor() + self._start_cron() if refresh: self.refresh() self.status = RUNNING @@ -743,6 +746,7 @@ def stop(self, refresh=True): self._print_bye() if refresh: self.refresh() + self._stop_cron() self._stop_supervisor() self._stop_models() self._stop_controllers() @@ -2323,6 +2327,30 @@ def callable_t(): thread.daemon = True thread.start() + def cron(self, job, cron): + """ + Schedule the provided method for regular execution using + the provided Cron like string. + + The method is going to be executed at the provided time + in a separate thread. + + :type job: function + :param job: The function/method that is going to be executed + at the specified time using the cron like string. + :type cron: String/SchedulerDate + :param cron: The cron like string that is going to be used to + define the execution time of the provided method. + :rtype: SchedulerTask + :return: The task that has been scheduled for execution at the + provided time. + """ + + if self._cron == None: + self._cron = scheduler.CronScheduler(self) + self._cron.start() + return self._cron.schedule(job, cron) + def chunks(self, data, size=32768): for index in range(0, len(data), size): yield data[index : index + size] @@ -2620,7 +2648,7 @@ def template( self.template_args(kwargs) # verifies if the target locale for the template has been defined - # and if thtat's the case updates the keyword based arguments for + # and if that's the case updates the keyword based arguments for # the current template render to include that value if locale: kwargs["_locale"] = locale @@ -5358,6 +5386,16 @@ def _start_supervisor(self): def _stop_supervisor(self): pass + def _start_cron(self): + pass + + def _stop_cron(self): + if not self._cron: + return + self._cron.stop() + self._cron.join() + self._cron = None + def _add_route(self, *args, **kwargs): self.__routes.append(args) self.clear_routes() diff --git a/src/appier/base.pyi b/src/appier/base.pyi new file mode 100644 index 00000000..1fdbdb38 --- /dev/null +++ b/src/appier/base.pyi @@ -0,0 +1,63 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Appier Framework +# Copyright (c) 2008-2024 Hive Solutions Lda. +# +# This file is part of Hive Appier Framework. +# +# Hive Appier Framework is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Appier Framework is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Appier Framework. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +from os import PathLike + +from .scheduler import SchedulerTask, JobFunction, Cron + +class App: + def start(self, refresh: bool = ...): ... + def stop(self, refresh: bool = ...): ... + def cron(self, job: JobFunction, cron: Cron) -> SchedulerTask: ... + def url_for( + self, + type: str, + filename: str | None = ..., + prefix: str | None = ..., + query: str | None = ..., + params: str | None = ..., + absolute: bool = ..., + touch: bool = ..., + session: bool = ..., + compress: str | None = ..., + base_url: str | None = ..., + *args, + **kwargs + ) -> str | None: ... + +class APIApp(App): + pass + +class WebApp(App): + pass + +def get_app() -> App: ... +def get_name() -> str | None: ... +def get_base_path() -> PathLike | None: ... diff --git a/src/appier/legacy.py b/src/appier/legacy.py index 8e299c85..dc5e0b7e 100644 --- a/src/appier/legacy.py +++ b/src/appier/legacy.py @@ -30,6 +30,8 @@ import os import sys +import calendar +import datetime import inspect import functools import itertools @@ -152,6 +154,10 @@ def ctx_absolute(): interpreter is at least Python 3 compliant, this is used to take some of the conversion decision for runtime """ +PYTHON_33 = sys.version_info[0] >= 3 and sys.version_info[1] >= 3 +""" Global variable that defines if the current Python +interpreter is at least Python 3.3 compliant """ + PYTHON_35 = sys.version_info[0] >= 3 and sys.version_info[1] >= 5 """ Global variable that defines if the current Python interpreter is at least Python 3.5 compliant """ @@ -538,6 +544,33 @@ def build_opener(*args, **kwargs): return urllib2.build_opener(*args, **kwargs) # @UndefinedVariable +def to_timestamp(date_time): + if PYTHON_33: + return date_time.replace(tzinfo=datetime.timezone.utc).timestamp() + else: + return calendar.timegm(date_time.utctimetuple()) + + +def to_datetime(timestamp): + if PYTHON_33: + return datetime.datetime.fromtimestamp( + timestamp, datetime.timezone.utc + ).replace(tzinfo=None) + else: + return datetime.datetime.utcfromtimestamp(timestamp) + + +def utcfromtimestamp(timestamp): + return to_datetime(timestamp) + + +def utc_now(): + if PYTHON_33: + return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + else: + return datetime.datetime.utcnow() + + def urlparse(*args, **kwargs): return _urlparse.urlparse(*args, **kwargs) diff --git a/src/appier/scheduler.py b/src/appier/scheduler.py index 39e42858..64a1f10e 100644 --- a/src/appier/scheduler.py +++ b/src/appier/scheduler.py @@ -28,11 +28,16 @@ __license__ = "Apache License, Version 2.0" """ The license for the module """ +import time +import heapq +import calendar +import datetime import logging import threading import traceback from . import config +from . import legacy LOOP_TIMEOUT = 60.0 """ The time value to be used to sleep the main sequence @@ -48,7 +53,9 @@ class Scheduler(threading.Thread): The architecture of the logic for the class should be modular in the sense that new task may be added to - it through a queue or other external system. + it through a queue or other external system. For that + a proper preemption mechanism should exist allowing + the scheduler to be stopped and started again. """ def __init__(self, owner, timeout=LOOP_TIMEOUT, daemon=True): @@ -74,8 +81,10 @@ def run(self): self._condition.wait(self.timeout) self._condition.release() - def stop(self): + def stop(self, awake=True): self.running = False + if awake: + self.awake() def tick(self): pass @@ -94,3 +103,185 @@ def logger(self): return self.owner.logger else: return logging.getLogger() + + +class CronScheduler(Scheduler): + """ + Specialized version of the scheduler that runs tasks + based on a cron like configuration. + + The tasks are defined in a cron like format and are + executed based on the current time. + """ + + def __init__(self, owner, timeout=LOOP_TIMEOUT, daemon=True): + Scheduler.__init__(self, owner, timeout=timeout, daemon=daemon) + self._tasks = [] + + def tick(self, now_ts=None): + current_ts = lambda: now_ts if now_ts else time.time() + current_dt = lambda: ( + legacy.to_datetime(now_ts) if now_ts else legacy.utc_now() + ) + + timestamp = current_ts() + 5.0 + + while True: + if not self._tasks: + break + + timestamp, task = self._tasks[0] + if timestamp > current_ts(): + break + + heapq.heappop(self._tasks) + + if task.enabled: + task.job() + heapq.heappush( + self._tasks, (task.next_timestamp(now=current_dt()), task) + ) + + self.timeout = max(0, timestamp - current_ts()) + + def schedule(self, job, cron, now=None): + """ + Schedules the provided job function for execution according + to the provided cron string. + + The optional now parameter may be used to provide the current + time reference for the scheduling operation, meaning that the next + timestamp will be calculated using this value as reference. + + :type job: Function + :param job: The function to be executed as the job. + :type cron: String/SchedulerDate + :param cron: The cron like string defining the schedule. + :type now: datetime + :param now: Optional time reference for the job scheduling. + :rtype: SchedulerTask + :return: The task object that was created for the job. + """ + + task = SchedulerTask(job, cron) + heapq.heappush(self._tasks, (task.next_timestamp(now=now), task)) + self.awake() + return task + + def next_run(self): + timestamp = self.next_timestamp() + if not timestamp: + return None + return legacy.to_datetime(timestamp) + + def next_timestamp(self): + if not self._tasks: + return None + return self._tasks[0][0] + + +class SchedulerTask(object): + + def __init__(self, job, cron): + self.job = job + self.date = SchedulerDate.from_cron(cron) + self._enabled = True + + def enable(self): + self._enabled = True + + def disable(self): + self._enabled = False + + def next_run(self, now=None): + return self.date.next_run(now=now) + + def next_timestamp(self, now=None): + return self.date.next_timestamp(now=now) + + @property + def enabled(self): + return self._enabled + + +class SchedulerDate(object): + + def __init__( + self, minutes="*", hours="*", days_of_month="*", months="*", days_of_week="*" + ): + self.minutes = self._parse_field(minutes, 0, 59) + self.hours = self._parse_field(hours, 0, 23) + self.days_of_month = self._parse_field(days_of_month, 1, 31) + self.months = self._parse_field(months, 1, 12) + self.days_of_week = self._parse_field(days_of_week, 0, 6) + + @classmethod + def from_cron(cls, cron): + if isinstance(cron, cls): + return cron + values = (value.strip().split(",") for value in cron.split(" ")) + return cls(*values) + + def next_timestamp(self, now=None): + date = self.next_run(now=now) + return legacy.to_timestamp(date) + + def next_run(self, now=None): + """ + Calculate the next run time starting from the current time. + This operation is done respecting Cron rules. + + :type now: datetime + :param now: Optional date time to be used as the current time. + :rtype: datetime + :return: The next run time respecting Cron rules. + """ + + now = now or legacy.utc_now() + now_day = datetime.datetime(now.year, now.month, now.day) + now_hour = datetime.datetime(now.year, now.month, now.day, hour=now.hour) + now_minute = datetime.datetime( + now.year, now.month, now.day, hour=now.hour, minute=now.minute + ) + + year = now.year + + while True: + for month in sorted(self.months): + if month < now.month and year < now.year: + continue + + for day in sorted(self.days_of_month): + try: + date = datetime.datetime(year, month, day) + except ValueError: + continue + if self.days_of_week and not date.weekday() in self.days_of_week: + continue + if date < now_day: + continue + + for hour in sorted(self.hours): + if date.replace(hour=hour) < now_hour: + continue + + for minute in sorted(self.minutes): + _date = date.replace( + hour=hour, minute=minute, second=0, microsecond=0 + ) + if _date > now_minute: + return _date + + year += 1 + + def _parse_field(self, field, min_value, max_value): + if field in ("*", ["*"], ("*",)): + return set(range(min_value, max_value + 1)) + elif isinstance(field, (list, tuple)): + return set(int(v) for v in field) + else: + return set((int(field),)) + + +class Cron(object): + pass diff --git a/src/appier/scheduler.pyi b/src/appier/scheduler.pyi new file mode 100644 index 00000000..913a3027 --- /dev/null +++ b/src/appier/scheduler.pyi @@ -0,0 +1,100 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Appier Framework +# Copyright (c) 2008-2024 Hive Solutions Lda. +# +# This file is part of Hive Appier Framework. +# +# Hive Appier Framework is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Appier Framework is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Appier Framework. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +from datetime import datetime +from logging import Logger +from threading import Thread +from typing import Callable + +from .base import App + +LOOP_TIMEOUT: float = ... + +JobFunction = Callable[[], None] +Cron = str | SchedulerDate +CronField = str | int | list | tuple + +class Scheduler(Thread): + owner: App | None + timeout: float + daemon: bool + + def __init__(self, owner: App | None, timeout: float = ..., daemon: bool = ...): ... + def run(self): ... + def stop(self): ... + def tick(self): ... + def load(self): ... + def awake(self): ... + @property + def logger(self) -> Logger: ... + +class CronScheduler(Scheduler): + def __init__(self, owner: App | None, timeout: float = ..., daemon: bool = ...): ... + def tick(self, now_ts: float | None = ...): ... + def schedule( + self, job: JobFunction, cron: Cron, now: datetime | None = ... + ) -> SchedulerTask: ... + def next_run(self) -> datetime | None: ... + def next_timestamp(self) -> float | None: ... + +class SchedulerTask(object): + job: JobFunction + date: SchedulerDate + + def __init__(self, job: JobFunction, cron: Cron): ... + def enable(self): ... + def disable(self): ... + def next_run(self, now: datetime | None = ...) -> datetime: ... + def next_timestamp(self, now: datetime | None = ...) -> float: ... + @property + def enabled(self) -> bool: ... + +class SchedulerDate(object): + minutes: set[int] + hours: set[int] + days_of_month: set[int] + months: set[int] + days_of_week: set[int] + + def __init__( + self, + minutes: CronField = ..., + hours: CronField = ..., + days_of_month: CronField = ..., + months: CronField = ..., + days_of_week: CronField = ..., + ): ... + @classmethod + def from_cron(cls, cron: Cron) -> SchedulerDate: ... + def next_timestamp(self, now: datetime | None = None) -> float: ... + def next_run(self, now: datetime | None = None) -> datetime: ... + def _parse_field( + self, field: CronField, min_value: int, max_value: int + ) -> set[int]: ... diff --git a/src/appier/test/scheduler.py b/src/appier/test/scheduler.py new file mode 100644 index 00000000..ed243ec4 --- /dev/null +++ b/src/appier/test/scheduler.py @@ -0,0 +1,173 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Appier Framework +# Copyright (c) 2008-2024 Hive Solutions Lda. +# +# This file is part of Hive Appier Framework. +# +# Hive Appier Framework is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Appier Framework is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Appier Framework. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +import calendar +import datetime +import unittest + +import appier + + +class CronSchedulerTest(unittest.TestCase): + + def test_basic(self): + state = dict(value=0) + + def increment(): + state["value"] += 1 + + scheduler = appier.CronScheduler(None) + task = scheduler.schedule( + lambda: increment(), + "11", + now=datetime.datetime(2013, 1, 1, hour=1, minute=1), + ) + self.assertNotEqual(task, None) + self.assertEqual(isinstance(task, appier.SchedulerTask), True) + self.assertEqual(task.enabled, True) + self.assertEqual( + scheduler.next_run(), datetime.datetime(2013, 1, 1, hour=1, minute=11) + ) + + scheduler.tick( + now_ts=calendar.timegm( + datetime.datetime(2013, 1, 1, hour=1, minute=1).utctimetuple() + ) + ) + self.assertEqual(state["value"], 0) + self.assertEqual(scheduler.timeout, 600) + + scheduler.tick( + now_ts=calendar.timegm( + datetime.datetime(2013, 1, 1, hour=1, minute=11).utctimetuple() + ) + ) + self.assertEqual(state["value"], 1) + self.assertEqual(scheduler.timeout, 3600) + + scheduler.tick( + now_ts=calendar.timegm( + datetime.datetime(2013, 1, 1, hour=2, minute=11).utctimetuple() + ) + ) + self.assertEqual(state["value"], 2) + self.assertEqual(scheduler.timeout, 3600) + + def test_scheduler_date(self): + state = dict(value=0) + + def increment(): + state["value"] += 1 + + scheduler = appier.CronScheduler(None) + task = scheduler.schedule( + lambda: increment(), + appier.SchedulerDate(minutes=11), + now=datetime.datetime(2013, 1, 1, hour=1, minute=1), + ) + self.assertNotEqual(task, None) + self.assertEqual(isinstance(task, appier.SchedulerTask), True) + self.assertEqual(task.enabled, True) + self.assertEqual( + scheduler.next_run(), datetime.datetime(2013, 1, 1, hour=1, minute=11) + ) + + scheduler.tick( + now_ts=calendar.timegm( + datetime.datetime(2013, 1, 1, hour=1, minute=1).utctimetuple() + ) + ) + self.assertEqual(state["value"], 0) + self.assertEqual(scheduler.timeout, 600) + + scheduler.tick( + now_ts=calendar.timegm( + datetime.datetime(2013, 1, 1, hour=1, minute=11).utctimetuple() + ) + ) + self.assertEqual(state["value"], 1) + self.assertEqual(scheduler.timeout, 3600) + + def test_week_days(self): + state = dict(value=0) + + def increment(): + state["value"] += 1 + + scheduler = appier.CronScheduler(None) + task = scheduler.schedule( + lambda: increment(), + appier.SchedulerDate(minutes=11, days_of_month=10, days_of_week=2), + now=datetime.datetime(2013, 1, 1, hour=1, minute=1), + ) + self.assertNotEqual(task, None) + self.assertEqual(isinstance(task, appier.SchedulerTask), True) + self.assertEqual(task.enabled, True) + self.assertEqual( + scheduler.next_run(), datetime.datetime(2013, 4, 10, hour=0, minute=11) + ) + + +class SchedulerDateTest(unittest.TestCase): + + def test_from_cron(self): + date = appier.SchedulerDate.from_cron("11") + self.assertEqual(date.minutes, set((11,))) + self.assertEqual(date.hours, set(range(0, 24))) + self.assertEqual(date.days_of_month, set(range(1, 32))) + self.assertEqual(date.months, set(range(1, 13))) + self.assertEqual(date.days_of_week, set(range(0, 7))) + + def test_next_run(self): + date = appier.SchedulerDate.from_cron("11") + + value = date.next_run(now=datetime.datetime(2013, 1, 1, hour=1, minute=1)) + self.assertEqual(value, datetime.datetime(2013, 1, 1, hour=1, minute=11)) + + value = date.next_run(now=datetime.datetime(2013, 1, 1, hour=1, minute=12)) + self.assertEqual(value, datetime.datetime(2013, 1, 1, hour=2, minute=11)) + + value = date.next_run(now=datetime.datetime(2013, 12, 31, hour=23, minute=12)) + self.assertEqual(value, datetime.datetime(2014, 1, 1, hour=0, minute=11)) + + def test_next_run_complex(self): + date = appier.SchedulerDate.from_cron("11 3 10,16,20") + value = date.next_run(now=datetime.datetime(2013, 1, 1, hour=1, minute=1)) + self.assertEqual(value, datetime.datetime(2013, 1, 10, hour=3, minute=11)) + + value = date.next_run(now=datetime.datetime(2013, 1, 10, hour=4, minute=1)) + self.assertEqual(value, datetime.datetime(2013, 1, 16, hour=3, minute=11)) + + date = appier.SchedulerDate.from_cron("* 3 10,16,20") + value = date.next_run(now=datetime.datetime(2013, 1, 10, hour=3, minute=1)) + self.assertEqual(value, datetime.datetime(2013, 1, 10, hour=3, minute=2)) + + value = date.next_run(now=datetime.datetime(2013, 1, 10, hour=3, minute=2)) + self.assertEqual(value, datetime.datetime(2013, 1, 10, hour=3, minute=3)) diff --git a/src/appier/test/typesf.py b/src/appier/test/typesf.py index 3e34286a..37edfc4e 100644 --- a/src/appier/test/typesf.py +++ b/src/appier/test/typesf.py @@ -219,7 +219,7 @@ def loads(self, value): elif isinstance(value, datetime.datetime): self._datetime = value elif isinstance(value, (int, float)): - self._datetime = datetime.datetime.utcfromtimestamp(value) + self._datetime = appier.legacy.utcfromtimestamp(value) else: raise appier.OperationalError()