Skip to content

Commit

Permalink
feat: skeleton for PythonOperator, related to issue #12
Browse files Browse the repository at this point in the history
wrote few tests for testing builtin and importable with failure
- `__call__` is unimplemented
  • Loading branch information
YeungOnion committed Feb 25, 2024
1 parent 35d6fbc commit 171fd84
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/fasthep_flow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .base import Operator
from .bash import BashOperator, LocalBashOperator
from .python import PythonOperator, LocalPythonOperator

# only Operator is exposed to the user, everything else is imported directly by the workflow
__all__ = ["BashOperator", "LocalBashOperator", "Operator"]
__all__ = ["BashOperator", "LocalBashOperator", "PythonOperator", "LocalPythonOperator", "Operator"]
49 changes: 49 additions & 0 deletions src/fasthep_flow/operators/python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from typing import Any

from .base import Operator

from importlib import import_module

class LocalPythonOperator(Operator):
"""A local python operator. This operator runs python callables on the local machine."""

python_callable: str
arguments: list[str]

def __init__(self, **kwargs: Any):
self.configure(**kwargs)

def configure(self, **kwargs: Any) -> None:
"""Configure the operator."""
self.python_callable = kwargs.pop("callable")
self.arguments = kwargs.pop("arguments")

for module, alias in kwargs.pop("aliases"):
if self.python_callable.startswith(alias + '.'):
self.python_callable = self.python_callable.replace(alias, module, 1)

# verify valid callable
try:
module_name, callable_name = self.python_callable.split('.', 1)
module = import_module(module_name) # could throw ImportError

if not callable(getattr(module, callable_name)): # could throw AttributeError
return # how to fail

except ImportError:
return # how should this fail?

except AttributeError:
return

return

def __call__(self, **kwargs: Any) -> dict[str, Any]:
stdout, stderr, exit_code = "", "", 0
return {"stdout": stdout, "stderr": stderr, "exit_code": exit_code}

def __repr__(self) -> str:
return f"LocalPythonOperator(callable={self.python_callable}, arguments={self.arguments})"

34 changes: 34 additions & 0 deletions tests/operators/test_py.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from fasthep_flow.operators.python import PythonOperator


def test_python_operator():
operator = PythonOperator(callable="print", arguments=["Hello World!"])
result = operator()
assert result["stdout"] == "Hello World!\n"
assert result["stderr"] == ""

def test_python_operator_aliased():
operator = PythonOperator(callable="lambda x: print(np.count_nonzero(x))", arguments=["[0,1,1]"], aliases={'numpy': 'np'})
result = operator()
assert result["stdout"] == "2\n"
assert result["stderr"] == ""

def test_python_operator_import_error():
operator = PythonOperator(callable="lambda x: print(np.count_nonzero(x))", arguments=["[0,1,1]"])
result = operator()
assert result["stdout"] == ""
assert result["stderr"] == "ImportError"

def test_python_operator_attrib_error():
operator = PythonOperator(callable="lambda x: print(np.nonimplemented(x))", arguments=["[0,1,1]"])
result = operator()
assert result["stdout"] == ""
assert result["stderr"] == "AttributeError"

def test_python_operator_callable_failure():
operator = PythonOperator(callable="print(\"Hello World!\n\")")
result = operator()
assert result["stdout"] == ""
assert result["stderr"] == "AttributeError, not callable"

0 comments on commit 171fd84

Please sign in to comment.