diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 00c4615f..c23118f5 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -55,34 +55,84 @@ jobs: (oteapi/models/transformationconfig.py),[oteapi.models.transformationconfig.TransformationConfig.transformationType] warnings_as_errors: true - pytest: - name: pytest (${{ matrix.os[1] }}-py${{ matrix.python-version }}) - runs-on: ${{ matrix.os[0] }} + pytest-linux: + name: pytest (linux-py${{ matrix.python-version }}) + runs-on: ubuntu-latest strategy: fail-fast: false matrix: - os: - - ["ubuntu-latest", "linux"] - - ["windows-latest", "windows"] python-version: ["3.9", "3.10"] + services: + redis: + image: redis:latest + volumes: + - redis-persist:/data + ports: + - "6379:6379" + + env: + OTEAPI_REDIS_HOST: localhost + OTEAPI_REDIST_PORT: 6379 + steps: - uses: actions/checkout@v3 - with: - fetch-depth: 2 - name: Set up Python ${{ matrix.python-version}} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version}} - - name: Install linux system dependencies - if: matrix.os[1] == 'linux' + - name: Install system dependencies run: sudo apt update && sudo apt install -y ghostscript - - name: Install windows system dependencies - if: matrix.os[1] == 'windows' + - name: Install Python dependencies + run: | + python -m pip install -U pip + pip install -U setuptools wheel + pip install -e .[dev] + + - name: Test with pytest + run: pytest -vvv --cov-report=xml + + - name: Upload coverage to Codecov + if: matrix.python-version == 3.9 && github.repository == 'EMMC-ASBL/oteapi-core' + uses: codecov/codecov-action@v3 + with: + files: coverage.xml + flags: linux + + - name: Test with optional libs + run: | + pip install ase numpy + pytest -vvv --cov-report=xml + + - name: Upload coverage to Codecov + if: matrix.python-version == 3.9 && github.repository == 'EMMC-ASBL/oteapi-core' + uses: codecov/codecov-action@v3 + with: + files: coverage.xml + flags: linux-extra_libs + + pytest-win: + name: pytest (windows-py${{ matrix.python-version }}) + runs-on: windows-latest + + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10"] + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python ${{ matrix.python-version}} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version}} + + - name: Install system dependencies run: | $url = "https://github.com/ArtifexSoftware/ghostpdl-downloads/releases/download/gs9550/gs9550w64.exe" $outpath = "${{ github.workspace }}\ghostscript.exe" @@ -105,7 +155,7 @@ jobs: uses: codecov/codecov-action@v3 with: files: coverage.xml - flags: ${{ matrix.os[1] }} + flags: windows - name: Test with optional libs run: | @@ -117,4 +167,4 @@ jobs: uses: codecov/codecov-action@v3 with: files: coverage.xml - flags: ${{ matrix.os[1] }}-extra_libs + flags: windows-extra_libs diff --git a/oteapi/strategies/transformation/celery_remote.py b/oteapi/strategies/transformation/celery_remote.py index 104ab57e..3e7fe229 100644 --- a/oteapi/strategies/transformation/celery_remote.py +++ b/oteapi/strategies/transformation/celery_remote.py @@ -1,5 +1,4 @@ """Transformation Plugin that uses the Celery framework to call remote workers.""" -# pylint: disable=unused-argument import os from typing import TYPE_CHECKING, Dict @@ -20,8 +19,8 @@ # Connect Celery to the currently running Redis instance -REDIS_HOST = os.environ.get("OTEAPI_REDIS_HOST", "redis") -REDIS_PORT = os.environ.get("OTEAPI_REDIS_PORT", 6379) +REDIS_HOST = os.getenv("OTEAPI_REDIS_HOST", "redis") +REDIS_PORT = int(os.getenv("OTEAPI_REDIS_PORT", "6379")) CELERY_APP = Celery( broker=f"redis://{REDIS_HOST}:{REDIS_PORT}", @@ -29,20 +28,31 @@ ) -class CeleryConfig(AttrDict): - """Celery configuration.""" +class CeleryConfig(AttrDict, allow_population_by_field_name=True): + """Celery configuration. - task_name: str = Field(..., description="A task name.") + All fields here (including those added from the session through the `get()` method, + as well as those added "anonymously") will be used as keyword arguments to the + `send_task()` method for the Celery App. + + Note: + Using `alias` for the `name` field to favor populating it with `task_name` + arguments, since this is the "original" field name. I.e., this is done for + backwards compatibility. + + Setting `allow_population_by_field_name=True` as pydantic model configuration in + order to allow populating it using `name` as well as `task_name`. + + """ + + name: str = Field(..., description="A task name.", alias="task_name") args: list = Field(..., description="List of arguments for the task.") class SessionUpdateCelery(SessionUpdate): - """Class for returning values from XLSXParse.""" + """Class for returning values from a Celery task.""" - data: Dict[str, list] = Field( - ..., - description="A dict with column-name/column-value pairs. The values are lists.", - ) + celery_task_id: str = Field(..., description="A Celery task identifier.") class CeleryStrategyConfig(TransformationConfig): @@ -74,21 +84,16 @@ class CeleryRemoteStrategy: def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery: """Run a job, return a job ID.""" - celery_kwargs = {} if session: self._use_session(session) - celery_kwargs = session.copy() - for field in CeleryConfig.__fields__: - celery_kwargs.pop(field, None) result: "Union[AsyncResult, Any]" = CELERY_APP.send_task( - name=self.transformation_config.configuration.task_name, - args=self.transformation_config.configuration.args, - kwargs=celery_kwargs, + **self.transformation_config.configuration ) - return SessionUpdateCelery(data={"task_id": result.task_id}) + return SessionUpdateCelery(celery_task_id=result.task_id) def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate: + # pylint: disable=unused-argument """Initialize a job.""" return SessionUpdate() @@ -98,11 +103,29 @@ def status(self, task_id: str) -> TransformationStatus: return TransformationStatus(id=task_id, status=result.state) def _use_session(self, session: "Dict[str, Any]") -> None: - """Update the configuration with values from the sesssion.""" - for field in CeleryConfig.__fields__: + """Update the configuration with values from the sesssion. + + Check all fields (non-aliased and aliased) in `CeleryConfig` if they exist in + the session. Override the given field values in the current strategy-specific + configuration (the `CeleryConfig` instance) with the values found in the + session. + + Parameters: + session: The current OTE session. + + """ + alias_mapping: dict[str, str] = { + field.alias: field_name + for field_name, field in CeleryConfig.__fields__.items() + } + + fields = set(CeleryConfig.__fields__) + fields |= {_.alias for _ in CeleryConfig.__fields__.values()} + + for field in fields: if field in session: setattr( self.transformation_config.configuration, - field, + alias_mapping[field], session[field], ) diff --git a/requirements_dev.txt b/requirements_dev.txt index 0c41c2bc..c6965f08 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -2,5 +2,6 @@ pre-commit>=2.21.0,<3; python_version<"3.8" pre-commit~=3.1; python_version>="3.8" pylint~=2.16 pytest~=7.2 +pytest-celery pytest-cov~=4.0 requests-mock~=1.10 diff --git a/tests/strategies/transformation/test_celery_remote.py b/tests/strategies/transformation/test_celery_remote.py new file mode 100644 index 00000000..c0a1f18f --- /dev/null +++ b/tests/strategies/transformation/test_celery_remote.py @@ -0,0 +1,102 @@ +"""Tests the transformation strategy for Celery.""" +# pylint: disable=invalid-name +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from celery import Celery + from celery.contrib.testing.worker import TestWorkController + + +@pytest.fixture(scope="session") +def celery_config() -> dict[str, str]: + """Set Celery fixture configuration.""" + import os + import platform + + import redis + + host = os.getenv("OTEAPI_REDIS_HOST", "localhost") + port = int(os.getenv("OTEAPI_REDIS_PORT", "6379")) + client = redis.Redis(host=host, port=port) + try: + client.ping() + except redis.ConnectionError: + if os.getenv("CI") and platform.system() == "Linux": + # Starting services (like redis) is only supported in GH Actions for the + # Linux OS + pytest.fail("In CI environment - this test MUST run !") + else: + pytest.skip(f"No redis connection at {host}:{port} for testing celery.") + + return { + "broker_url": f"redis://{host}:{port}", + "result_backend": f"redis://{host}:{port}", + } + + +def test_celery_remote( + celery_app: "Celery", + celery_worker: "TestWorkController", + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test `celery/remote` transformation strategy.""" + from time import sleep, time + + from celery.result import AsyncResult + + from oteapi.models.transformationconfig import TransformationConfig + from oteapi.strategies.transformation import celery_remote + + @celery_app.task + def add(x: float, y: float) -> float: + """Simple addition task to test Celery.""" + return x + y + + celery_worker.reload() + + # Use the test celery app instead of the strategy's celery app + # The strategy's celery app has not registered the `add()` task... + monkeypatch.setattr(celery_remote, "CELERY_APP", celery_app) + + config = TransformationConfig( + transformationType="celery/remote", + configuration={ + "task_name": add.name, + "args": [1, 2], + }, + ) + transformation = celery_remote.CeleryRemoteStrategy(config) + + session = transformation.initialize({}) + session = transformation.get(session) + + assert session.get("celery_task_id", "") + + start_time = time() + while ( + transformation.status(session.celery_task_id).status != "SUCCESS" + and time() < start_time + 5 + ): + sleep(1) + + if transformation.status(session.celery_task_id).status != "SUCCESS": + pytest.fail("Status never changed to 'SUCCESS' !") + + result = AsyncResult(id=session.celery_task_id, app=celery_app) + assert result.result == add(1, 2) + + +def test_celery_config_name() -> None: + """Check `CeleryConfig` can be populated with/-out alias use.""" + from oteapi.strategies.transformation.celery_remote import CeleryConfig + + aliased_keys = ("task_name", "args") + non_aliased_keys = ("name", "args") + + values = ("app.add", [1, 2]) + + assert CeleryConfig(**dict(zip(aliased_keys, values))) == CeleryConfig( + **dict(zip(non_aliased_keys, values)) + )