Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish celery/remote transformation strategy #240

Merged
merged 10 commits into from
Mar 7, 2023
78 changes: 64 additions & 14 deletions .github/workflows/ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,84 @@ jobs:
(oteapi/models/transformationconfig.py),[oteapi.models.transformationconfig.TransformationConfig.transformationType]
warnings_as_errors: true

pytest:
name: pytest (${{ matrix.os[1] }}-py${{ matrix.python-version }})
runs-on: ${{ matrix.os[0] }}
pytest-linux:
name: pytest (linux-py${{ matrix.python-version }})
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
os:
- ["ubuntu-latest", "linux"]
- ["windows-latest", "windows"]
python-version: ["3.9", "3.10"]

services:
redis:
image: redis:latest
volumes:
- redis-persist:/data
ports:
- "6379:6379"

env:
OTEAPI_REDIS_HOST: localhost
OTEAPI_REDIST_PORT: 6379

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 2

- name: Set up Python ${{ matrix.python-version}}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version}}

- name: Install linux system dependencies
if: matrix.os[1] == 'linux'
- name: Install system dependencies
run: sudo apt update && sudo apt install -y ghostscript

- name: Install windows system dependencies
if: matrix.os[1] == 'windows'
- name: Install Python dependencies
run: |
python -m pip install -U pip
pip install -U setuptools wheel
pip install -e .[dev]

- name: Test with pytest
run: pytest -vvv --cov-report=xml

- name: Upload coverage to Codecov
if: matrix.python-version == 3.9 && github.repository == 'EMMC-ASBL/oteapi-core'
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: linux

- name: Test with optional libs
run: |
pip install ase numpy
pytest -vvv --cov-report=xml

- name: Upload coverage to Codecov
if: matrix.python-version == 3.9 && github.repository == 'EMMC-ASBL/oteapi-core'
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: linux-extra_libs

pytest-win:
name: pytest (windows-py${{ matrix.python-version }})
runs-on: windows-latest

strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10"]

steps:
- uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version}}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version}}

- name: Install system dependencies
run: |
$url = "https://github.com/ArtifexSoftware/ghostpdl-downloads/releases/download/gs9550/gs9550w64.exe"
$outpath = "${{ github.workspace }}\ghostscript.exe"
Expand All @@ -105,7 +155,7 @@ jobs:
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: ${{ matrix.os[1] }}
flags: windows

- name: Test with optional libs
run: |
Expand All @@ -117,4 +167,4 @@ jobs:
uses: codecov/codecov-action@v3
with:
files: coverage.xml
flags: ${{ matrix.os[1] }}-extra_libs
flags: windows-extra_libs
67 changes: 45 additions & 22 deletions oteapi/strategies/transformation/celery_remote.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Transformation Plugin that uses the Celery framework to call remote workers."""
# pylint: disable=unused-argument
import os
from typing import TYPE_CHECKING, Dict

Expand All @@ -20,29 +19,40 @@

# Connect Celery to the currently running Redis instance

REDIS_HOST = os.environ.get("OTEAPI_REDIS_HOST", "redis")
REDIS_PORT = os.environ.get("OTEAPI_REDIS_PORT", 6379)
REDIS_HOST = os.getenv("OTEAPI_REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("OTEAPI_REDIS_PORT", "6379"))

CELERY_APP = Celery(
broker=f"redis://{REDIS_HOST}:{REDIS_PORT}",
backend=f"redis://{REDIS_HOST}:{REDIS_PORT}",
)


class CeleryConfig(AttrDict):
"""Celery configuration."""
class CeleryConfig(AttrDict, allow_population_by_field_name=True):
"""Celery configuration.

task_name: str = Field(..., description="A task name.")
All fields here (including those added from the session through the `get()` method,
as well as those added "anonymously") will be used as keyword arguments to the
`send_task()` method for the Celery App.

Note:
Using `alias` for the `name` field to favor populating it with `task_name`
arguments, since this is the "original" field name. I.e., this is done for
backwards compatibility.

Setting `allow_population_by_field_name=True` as pydantic model configuration in
order to allow populating it using `name` as well as `task_name`.

"""

name: str = Field(..., description="A task name.", alias="task_name")
args: list = Field(..., description="List of arguments for the task.")


class SessionUpdateCelery(SessionUpdate):
"""Class for returning values from XLSXParse."""
"""Class for returning values from a Celery task."""

data: Dict[str, list] = Field(
...,
description="A dict with column-name/column-value pairs. The values are lists.",
)
celery_task_id: str = Field(..., description="A Celery task identifier.")


class CeleryStrategyConfig(TransformationConfig):
Expand Down Expand Up @@ -74,21 +84,16 @@ class CeleryRemoteStrategy:

def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdateCelery:
"""Run a job, return a job ID."""
celery_kwargs = {}
if session:
self._use_session(session)
celery_kwargs = session.copy()
CasperWA marked this conversation as resolved.
Show resolved Hide resolved
for field in CeleryConfig.__fields__:
celery_kwargs.pop(field, None)

result: "Union[AsyncResult, Any]" = CELERY_APP.send_task(
name=self.transformation_config.configuration.task_name,
args=self.transformation_config.configuration.args,
kwargs=celery_kwargs,
**self.transformation_config.configuration
)
return SessionUpdateCelery(data={"task_id": result.task_id})
return SessionUpdateCelery(celery_task_id=result.task_id)

def initialize(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
# pylint: disable=unused-argument
"""Initialize a job."""
return SessionUpdate()

Expand All @@ -98,11 +103,29 @@ def status(self, task_id: str) -> TransformationStatus:
return TransformationStatus(id=task_id, status=result.state)

def _use_session(self, session: "Dict[str, Any]") -> None:
"""Update the configuration with values from the sesssion."""
for field in CeleryConfig.__fields__:
"""Update the configuration with values from the sesssion.

Check all fields (non-aliased and aliased) in `CeleryConfig` if they exist in
the session. Override the given field values in the current strategy-specific
configuration (the `CeleryConfig` instance) with the values found in the
session.

Parameters:
session: The current OTE session.

"""
alias_mapping: dict[str, str] = {
field.alias: field_name
for field_name, field in CeleryConfig.__fields__.items()
}

fields = set(CeleryConfig.__fields__)
fields |= {_.alias for _ in CeleryConfig.__fields__.values()}

for field in fields:
if field in session:
setattr(
self.transformation_config.configuration,
field,
alias_mapping[field],
session[field],
)
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pre-commit>=2.21.0,<3; python_version<"3.8"
pre-commit~=3.1; python_version>="3.8"
pylint~=2.16
pytest~=7.2
pytest-celery
pytest-cov~=4.0
requests-mock~=1.10
102 changes: 102 additions & 0 deletions tests/strategies/transformation/test_celery_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Tests the transformation strategy for Celery."""
# pylint: disable=invalid-name
from typing import TYPE_CHECKING

import pytest

if TYPE_CHECKING:
from celery import Celery
from celery.contrib.testing.worker import TestWorkController


@pytest.fixture(scope="session")
def celery_config() -> dict[str, str]:
"""Set Celery fixture configuration."""
import os
import platform

import redis

host = os.getenv("OTEAPI_REDIS_HOST", "localhost")
port = int(os.getenv("OTEAPI_REDIS_PORT", "6379"))
client = redis.Redis(host=host, port=port)
try:
client.ping()
except redis.ConnectionError:
if os.getenv("CI") and platform.system() == "Linux":
# Starting services (like redis) is only supported in GH Actions for the
# Linux OS
pytest.fail("In CI environment - this test MUST run !")
else:
pytest.skip(f"No redis connection at {host}:{port} for testing celery.")

return {
"broker_url": f"redis://{host}:{port}",
"result_backend": f"redis://{host}:{port}",
}


def test_celery_remote(
celery_app: "Celery",
celery_worker: "TestWorkController",
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test `celery/remote` transformation strategy."""
from time import sleep, time

from celery.result import AsyncResult

from oteapi.models.transformationconfig import TransformationConfig
from oteapi.strategies.transformation import celery_remote

@celery_app.task
def add(x: float, y: float) -> float:
"""Simple addition task to test Celery."""
return x + y

celery_worker.reload()

# Use the test celery app instead of the strategy's celery app
# The strategy's celery app has not registered the `add()` task...
monkeypatch.setattr(celery_remote, "CELERY_APP", celery_app)

config = TransformationConfig(
transformationType="celery/remote",
configuration={
"task_name": add.name,
"args": [1, 2],
},
)
transformation = celery_remote.CeleryRemoteStrategy(config)

session = transformation.initialize({})
session = transformation.get(session)

assert session.get("celery_task_id", "")

start_time = time()
while (
transformation.status(session.celery_task_id).status != "SUCCESS"
and time() < start_time + 5
):
sleep(1)

if transformation.status(session.celery_task_id).status != "SUCCESS":
pytest.fail("Status never changed to 'SUCCESS' !")

result = AsyncResult(id=session.celery_task_id, app=celery_app)
assert result.result == add(1, 2)


def test_celery_config_name() -> None:
"""Check `CeleryConfig` can be populated with/-out alias use."""
from oteapi.strategies.transformation.celery_remote import CeleryConfig

aliased_keys = ("task_name", "args")
non_aliased_keys = ("name", "args")

values = ("app.add", [1, 2])

assert CeleryConfig(**dict(zip(aliased_keys, values))) == CeleryConfig(
**dict(zip(non_aliased_keys, values))
)