Skip to content

Commit

Permalink
Polish celery/remote transformation strategy (#240)
Browse files Browse the repository at this point in the history
Add celery/remote strategy tests.

Update strategy models accordingly.

Add a redis service to CI pytest runs.
Split pytest CI jobs according to OS. Services cannot be run
when using anything other than the Linux OS in CI.

Use `task_name` alias for CeleryConfig (field changed to `name`).

Support setting configuration from both aliased and non-aliased
fields. Both directly and from fields in the session as well.
  • Loading branch information
CasperWA authored Mar 7, 2023
1 parent eaa965f commit 5ef872d
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 36 deletions.
78 changes: 64 additions & 14 deletions .github/workflows/ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,84 @@ jobs:
(oteapi/models/transformationconfig.py),[oteapi.models.transformationconfig.TransformationConfig.transformationType]
warnings_as_errors: true

pytest:
name: pytest (${{ matrix.os[1] }}-py${{ matrix.python-version }})
runs-on: ${{ matrix.os[0] }}
pytest-linux:
name: pytest (linux-py${{ matrix.python-version }})
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
os:
- ["ubuntu-latest", "linux"]
- ["windows-latest", "windows"]
python-version: ["3.9", "3.10"]

services:
redis:
image: redis:latest
volumes:
- redis-persist:/data
ports:
- "6379:6379"

env:
OTEAPI_REDIS_HOST: localhost
OTEAPI_REDIST_PORT: 6379

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 2

- name: Set up Python ${{ matrix.python-version}}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version}}

- name: Install linux system dependencies
if: matrix.os[1] == 'linux'
- name: Install system dependencies
run: sudo apt update && sudo apt install -y ghostscript

- name: Install windows system dependencies
if: matrix.os[1] == 'windows'
- name: Install Python dependencies
run: |
python -m pip install -U pip
pip install -U setuptools wheel
pip install -e .[dev]
- name: Test with pytest
run: pytest -vvv --cov-report=xml

- name: Upload coverage to Codecov
if: matrix.python-version == 3.9 && github.repository == 'EMMC-ASBL/oteapi-core'
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: linux

- name: Test with optional libs
run: |
pip install ase numpy
pytest -vvv --cov-report=xml
- name: Upload coverage to Codecov
if: matrix.python-version == 3.9 && github.repository == 'EMMC-ASBL/oteapi-core'
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: linux-extra_libs

pytest-win:
name: pytest (windows-py${{ matrix.python-version }})
runs-on: windows-latest

strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10"]

steps:
- uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version}}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version}}

- name: Install system dependencies
run: |
$url = "https://github.com/ArtifexSoftware/ghostpdl-downloads/releases/download/gs9550/gs9550w64.exe"
$outpath = "${{ github.workspace }}\ghostscript.exe"
Expand All @@ -105,7 +155,7 @@ jobs:
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: ${{ matrix.os[1] }}
flags: windows

- name: Test with optional libs
run: |
Expand All @@ -117,4 +167,4 @@ jobs:
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: ${{ matrix.os[1] }}-extra_libs
flags: windows-extra_libs
67 changes: 45 additions & 22 deletions oteapi/strategies/transformation/celery_remote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Transformation Plugin that uses the Celery framework to call remote workers."""
# pylint: disable=unused-argument
import os
from typing import TYPE_CHECKING, Dict

Expand All @@ -20,29 +19,40 @@

# Connect Celery to the currently running Redis instance

REDIS_HOST = os.environ.get("OTEAPI_REDIS_HOST", "redis")
REDIS_PORT = os.environ.get("OTEAPI_REDIS_PORT", 6379)
REDIS_HOST = os.getenv("OTEAPI_REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("OTEAPI_REDIS_PORT", "6379"))

CELERY_APP = Celery(
broker=f"redis://{REDIS_HOST}:{REDIS_PORT}",
backend=f"redis://{REDIS_HOST}:{REDIS_PORT}",
)


class CeleryConfig(AttrDict):
"""Celery configuration."""
class CeleryConfig(AttrDict, allow_population_by_field_name=True):
"""Celery configuration.
task_name: str = Field(..., description="A task name.")
All fields here (including those added from the session through the `get()` method,
as well as those added "anonymously") will be used as keyword arguments to the
`send_task()` method for the Celery App.
Note:
Using `alias` for the `name` field to favor populating it with `task_name`
arguments, since this is the "original" field name. I.e., this is done for
backwards compatibility.
Setting `allow_population_by_field_name=True` as pydantic model configuration in
order to allow populating it using `name` as well as `task_name`.
"""

name: str = Field(..., description="A task name.", alias="task_name")
args: list = Field(..., description="List of arguments for the task.")


class SessionUpdateCelery(SessionUpdate):
"""Class for returning values from XLSXParse."""
"""Class for returning values from a Celery task."""

data: Dict[str, list] = Field(
...,
description="A dict with column-name/column-value pairs. The values are lists.",
)
celery_task_id: str = Field(..., description="A Celery task identifier.")


class CeleryStrategyConfig(TransformationConfig):
Expand Down Expand Up @@ -74,21 +84,16 @@ class CeleryRemoteStrategy:

def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
"""Run a job, return a job ID."""
celery_kwargs = {}
if session:
self._use_session(session)
celery_kwargs = session.copy()
for field in CeleryConfig.__fields__:
celery_kwargs.pop(field, None)

result: "Union[AsyncResult, Any]" = CELERY_APP.send_task(
name=self.transformation_config.configuration.task_name,
args=self.transformation_config.configuration.args,
kwargs=celery_kwargs,
**self.transformation_config.configuration
)
return SessionUpdateCelery(data={"task_id": result.task_id})
return SessionUpdateCelery(celery_task_id=result.task_id)

def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
# pylint: disable=unused-argument
"""Initialize a job."""
return SessionUpdate()

Expand All @@ -98,11 +103,29 @@ def status(self, task_id: str) -> TransformationStatus:
return TransformationStatus(id=task_id, status=result.state)

def _use_session(self, session: "Dict[str, Any]") -> None:
"""Update the configuration with values from the sesssion."""
for field in CeleryConfig.__fields__:
"""Update the configuration with values from the sesssion.
Check all fields (non-aliased and aliased) in `CeleryConfig` if they exist in
the session. Override the given field values in the current strategy-specific
configuration (the `CeleryConfig` instance) with the values found in the
session.
Parameters:
session: The current OTE session.
"""
alias_mapping: dict[str, str] = {
field.alias: field_name
for field_name, field in CeleryConfig.__fields__.items()
}

fields = set(CeleryConfig.__fields__)
fields |= {_.alias for _ in CeleryConfig.__fields__.values()}

for field in fields:
if field in session:
setattr(
self.transformation_config.configuration,
field,
alias_mapping[field],
session[field],
)
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pre-commit>=2.21.0,<3; python_version<"3.8"
pre-commit~=3.1; python_version>="3.8"
pylint~=2.16
pytest~=7.2
pytest-celery
pytest-cov~=4.0
requests-mock~=1.10
102 changes: 102 additions & 0 deletions tests/strategies/transformation/test_celery_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Tests the transformation strategy for Celery."""
# pylint: disable=invalid-name
from typing import TYPE_CHECKING

import pytest

if TYPE_CHECKING:
from celery import Celery
from celery.contrib.testing.worker import TestWorkController


@pytest.fixture(scope="session")
def celery_config() -> dict[str, str]:
"""Set Celery fixture configuration."""
import os
import platform

import redis

host = os.getenv("OTEAPI_REDIS_HOST", "localhost")
port = int(os.getenv("OTEAPI_REDIS_PORT", "6379"))
client = redis.Redis(host=host, port=port)
try:
client.ping()
except redis.ConnectionError:
if os.getenv("CI") and platform.system() == "Linux":
# Starting services (like redis) is only supported in GH Actions for the
# Linux OS
pytest.fail("In CI environment - this test MUST run !")
else:
pytest.skip(f"No redis connection at {host}:{port} for testing celery.")

return {
"broker_url": f"redis://{host}:{port}",
"result_backend": f"redis://{host}:{port}",
}


def test_celery_remote(
celery_app: "Celery",
celery_worker: "TestWorkController",
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test `celery/remote` transformation strategy."""
from time import sleep, time

from celery.result import AsyncResult

from oteapi.models.transformationconfig import TransformationConfig
from oteapi.strategies.transformation import celery_remote

@celery_app.task
def add(x: float, y: float) -> float:
"""Simple addition task to test Celery."""
return x + y

celery_worker.reload()

# Use the test celery app instead of the strategy's celery app
# The strategy's celery app has not registered the `add()` task...
monkeypatch.setattr(celery_remote, "CELERY_APP", celery_app)

config = TransformationConfig(
transformationType="celery/remote",
configuration={
"task_name": add.name,
"args": [1, 2],
},
)
transformation = celery_remote.CeleryRemoteStrategy(config)

session = transformation.initialize({})
session = transformation.get(session)

assert session.get("celery_task_id", "")

start_time = time()
while (
transformation.status(session.celery_task_id).status != "SUCCESS"
and time() < start_time + 5
):
sleep(1)

if transformation.status(session.celery_task_id).status != "SUCCESS":
pytest.fail("Status never changed to 'SUCCESS' !")

result = AsyncResult(id=session.celery_task_id, app=celery_app)
assert result.result == add(1, 2)


def test_celery_config_name() -> None:
"""Check `CeleryConfig` can be populated with/-out alias use."""
from oteapi.strategies.transformation.celery_remote import CeleryConfig

aliased_keys = ("task_name", "args")
non_aliased_keys = ("name", "args")

values = ("app.add", [1, 2])

assert CeleryConfig(**dict(zip(aliased_keys, values))) == CeleryConfig(
**dict(zip(non_aliased_keys, values))
)

0 comments on commit 5ef872d

Please sign in to comment.