From b0ca997c3d4563f654b318225b5759e673f21d1b Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 1 Aug 2023 12:27:37 +0200 Subject: [PATCH] feat: kedro-airflow DAG kwarg configuration (#233) * feature: kedro-airflow DAG kwarg configuration Signed-off-by: Simon Brugman * feat: `kedro-airflow` DAG kwarg configuration Signed-off-by: Simon Brugman * Consistent use of the __future__ annotations Signed-off-by: Nok * Apply suggestions from code review Signed-off-by: Ankita Katiyar Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> --------- Signed-off-by: Simon Brugman Signed-off-by: Nok Co-authored-by: Ankita Katiyar Co-authored-by: Nok Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> --- kedro-airflow/README.md | 88 ++++++- kedro-airflow/RELEASE.md | 11 +- kedro-airflow/features/steps/cli_steps.py | 8 +- .../kedro_airflow/airflow_dag_template.j2 | 61 +++-- kedro-airflow/kedro_airflow/plugin.py | 76 +++++- kedro-airflow/tests/conftest.py | 82 +++++- kedro-airflow/tests/test_plugin.py | 234 +++++++++++++++--- 7 files changed, 484 insertions(+), 76 deletions(-) diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index 6b1d59815..b61ed141d 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -46,7 +46,7 @@ Please visit the guide to [deploy Kedro as a Python package](https://kedro.readt #### What if my DAG file is in a different directory to my project folder? -By default the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command. +By default, the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command. #### What if I want to use a different Jinja2 template? @@ -56,6 +56,92 @@ You can use the additional command line argument `--jinja-file` (alias `-j`) to kedro airflow create --jinja-file=./custom/template.j2 ``` +#### How can I pass arguments to the Airflow DAGs dynamically? + +`kedro-airflow` picks up configuration from `airflow.yml` in `conf/base` or `conf/local` of your Kedro project. +Or it could be in a folder starting with `airflow`. +The [parameters](https://docs.kedro.org/en/stable/configuration/parameters.html) are read by Kedro. +Arguments can be specified globally, or per pipeline: + +```yaml +# Global parameters +default: + start_date: [2023, 1, 1] + max_active_runs: 3 + # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs + schedule_interval: "@once" + catchup: false + # Default settings applied to all tasks + owner: "airflow" + depends_on_past: false + email_on_failure: false + email_on_retry: false + retries: 1 + retry_delay: 5 + +# Arguments specific to the pipeline (overrides the parameters above) +data_science: + owner: "airflow-ds" +``` + +Arguments can also be passed via `--params` in the command line: + +```bash +kedro airflow create --params "schedule_interval='@weekly'" +``` + +These variables are passed to the Jinja2 template that creates an Airflow DAG from your pipeline. + +### What if I want to use a configuration pattern other than `airflow*` and `airflow**`? + +In order to configure the config loader, update the `settings.py` file in your Kedro project. +For instance, if you would like to use the name `scheduler`, then change the file as follows: + +```python +CONFIG_LOADER_ARGS = { + "config_patterns": {"airflow": ["scheduler*", "scheduler/**"]} +} + +Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader] + +#### What if I want to pass different arguments? + +In order to pass arguments other than those specified in the default template, simply pass a custom template (see: _"What if I want to use a different Jinja2 template?"_) + +The syntax for arguments is: +``` +{{ argument_name }} +``` + +In order to make arguments optional, one can use: +``` +{{ argument_name | default("default_value") }} +``` + +For examples, please have a look at the default template (`airflow_dag_template.j2`). + +### What if I want to use a configuration file other than `airflow.yml`? + +The default configuration pattern is `["airflow*", "airflow/**"]`. +In order to configure the `OmegaConfigLoader`, update the `settings.py` file in your Kedro project as follows: + +```python +from kedro.config import OmegaConfigLoader +CONFIG_LOADER_CLASS = OmegaConfigLoader +CONFIG_LOADER_ARGS = { + # other args + "config_patterns": {"airflow": ["airflow*", "airflow/**"]} # configure the pattern for configuration files +} +``` + +Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader] + +#### How can I use Airflow runtime parameters? + +It is possible to pass parameters when triggering an Airflow DAG from the user interface. +In order to use this feature, create a custom template using the [Params syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html). +See ["What if I want to use a different Jinja2 template?"](#what-if-i-want-to-use-a-different-jinja2-template) for instructions on using custom templates. + #### What if I want to use a different Airflow Operator? Which Airflow Operator to use depends on the environment your project is running in. diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index c2e0615b4..a752fbed9 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,6 +1,15 @@ -# Upcoming release 0.5.2 +# Upcoming release 0.6.0 * Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory. * Migrate all project metadata to static `pyproject.toml`. +* Configure DAG kwargs via `airflow.yml`. +* The generated DAG file now contains the pipeline name. +* Included help for CLI arguments (see `kedro airflow create --help`). +* Added additional CLI argument `--params` to pass configuration to the Jinja2 template. + +## Community contributions +Many thanks to the following Kedroids for contributing PRs to this release: + +* [sbrugman](https://github.com/sbrugman) # Release 0.5.1 * Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template. diff --git a/kedro-airflow/features/steps/cli_steps.py b/kedro-airflow/features/steps/cli_steps.py index 9fbde1df2..c4d23ad24 100644 --- a/kedro-airflow/features/steps/cli_steps.py +++ b/kedro-airflow/features/steps/cli_steps.py @@ -87,7 +87,7 @@ def install_kedro(context, version): if version == "latest": cmd = [context.pip, "install", "-U", "kedro[pandas]"] else: - cmd = [context.pip, "install", "kedro[pandas]=={}".format(version)] + cmd = [context.pip, "install", f"kedro[pandas]=={version}"] res = run(cmd, env=context.env) if res.returncode != OK_EXIT_CODE: @@ -121,7 +121,7 @@ def check_message_printed(context, msg): stdout = context.result.stdout assert msg in stdout, ( "Expected the following message segment to be printed on stdout: " - "{exp_msg},\nbut got {actual_msg}".format(exp_msg=msg, actual_msg=stdout) + f"{msg},\nbut got {stdout}" ) @@ -187,6 +187,6 @@ def check_status_code(context): if context.result.returncode != OK_EXIT_CODE: print(context.result.stdout) print(context.result.stderr) - assert False, "Expected exit code {}" " but got {}".format( - OK_EXIT_CODE, context.result.returncode + raise AssertionError( + f"Expected exit code {OK_EXIT_CODE} but got {context.result.returncode}" ) diff --git a/kedro-airflow/kedro_airflow/airflow_dag_template.j2 b/kedro-airflow/kedro_airflow/airflow_dag_template.j2 index 92c6296e1..7c2f2706e 100644 --- a/kedro-airflow/kedro_airflow/airflow_dag_template.j2 +++ b/kedro-airflow/kedro_airflow/airflow_dag_template.j2 @@ -1,3 +1,4 @@ +from __future__ import annotations from datetime import datetime, timedelta from pathlib import Path @@ -10,14 +11,13 @@ from kedro.framework.project import configure_project class KedroOperator(BaseOperator): - @apply_defaults def __init__( self, package_name: str, pipeline_name: str, node_name: str, - project_path: str, + project_path: str | Path, env: str, *args, **kwargs ) -> None: @@ -35,46 +35,43 @@ class KedroOperator(BaseOperator): env=self.env) as session: session.run(self.pipeline_name, node_names=[self.node_name]) + # Kedro settings required to run your pipeline env = "{{ env }}" pipeline_name = "{{ pipeline_name }}" project_path = Path.cwd() package_name = "{{ package_name }}" -# Default settings applied to all tasks -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) -} - # Using a DAG context manager, you don't have to specify the dag property of each task with DAG( - "{{ dag_name | safe | slugify }}", - start_date=datetime(2019, 1, 1), - max_active_runs=3, - schedule_interval=timedelta(minutes=30), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs - default_args=default_args, - catchup=False # enable if you don't want historical dag runs to run -) as dag: - - tasks = {} - {% for node in pipeline.nodes %} - tasks["{{ node.name | safe | slugify }}"] = KedroOperator( - task_id="{{ node.name | safe | slugify }}", - package_name=package_name, - pipeline_name=pipeline_name, - node_name="{{ node.name | safe }}", - project_path=project_path, - env=env, + dag_id="{{ dag_name | safe | slugify }}", + start_date=datetime({{ start_date | default([2023, 1, 1]) | join(",")}}), + max_active_runs={{ max_active_runs | default(3) }}, + # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs + schedule_interval="{{ schedule_interval | default('@once') }}", + catchup={{ catchup | default(False) }}, + # Default settings applied to all tasks + default_args=dict( + owner="{{ owner | default('airflow') }}", + depends_on_past={{ depends_on_past | default(False) }}, + email_on_failure={{ email_on_failure | default(False) }}, + email_on_retry={{ email_on_retry | default(False) }}, + retries={{ retries | default(1) }}, + retry_delay=timedelta(minutes={{ retry_delay | default(5) }}) ) - {% endfor %} +) as dag: + tasks = { + {% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator( + task_id="{{ node.name | safe | slugify }}", + package_name=package_name, + pipeline_name=pipeline_name, + node_name="{{ node.name | safe }}", + project_path=project_path, + env=env, + ), +{% endfor %} } {% for parent_node, child_nodes in dependencies.items() -%} - {% for child in child_nodes %} - tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"] + {% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"] {% endfor %} {%- endfor %} diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index c1a62b0f3..569e91be2 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -1,15 +1,25 @@ """ Kedro plugin for running a project with Airflow """ +from __future__ import annotations from collections import defaultdict from pathlib import Path +from typing import Any import click import jinja2 from click import secho +from kedro.config import MissingConfigException +from kedro.framework.cli.project import PARAMS_ARG_HELP +from kedro.framework.cli.utils import ENV_HELP, KedroCliError, _split_params +from kedro.framework.context import KedroContext from kedro.framework.project import pipelines -from kedro.framework.startup import ProjectMetadata +from kedro.framework.session import KedroSession +from kedro.framework.startup import ProjectMetadata, bootstrap_project from slugify import slugify +PIPELINE_ARG_HELP = """Name of the registered pipeline to convert. +If not set, the '__default__' pipeline is used.""" + @click.group(name="Kedro-Airflow") def commands(): # pylint: disable=missing-function-docstring @@ -22,15 +32,44 @@ def airflow_commands(): pass +def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]: + # Set the default pattern for `airflow` if not provided in `settings.py` + if "airflow" not in context.config_loader.config_patterns.keys(): + context.config_loader.config_patterns.update( # pragma: no cover + {"airflow": ["airflow*", "airflow/**"]} + ) + + assert "airflow" in context.config_loader.config_patterns.keys() + + # Load the config + try: + config_airflow = context.config_loader["airflow"] + except MissingConfigException: + # File does not exist + return {} + + dag_config = {} + # Load the default config if specified + if "default" in config_airflow: + dag_config.update(config_airflow["default"]) + # Update with pipeline-specific config if present + if pipeline_name in config_airflow: + dag_config.update(config_airflow[pipeline_name]) + return dag_config + + @airflow_commands.command() -@click.option("-p", "--pipeline", "pipeline_name", default="__default__") -@click.option("-e", "--env", default="local") +@click.option( + "-p", "--pipeline", "pipeline_name", default="__default__", help=PIPELINE_ARG_HELP +) +@click.option("-e", "--env", default="local", help=ENV_HELP) @click.option( "-t", "--target-dir", "target_path", type=click.Path(writable=True, resolve_path=True, file_okay=False), default="./airflow_dags/", + help="The directory path to store the generated Airflow dags", ) @click.option( "-j", @@ -39,6 +78,14 @@ def airflow_commands(): exists=True, readable=True, resolve_path=True, file_okay=True, dir_okay=False ), default=Path(__file__).parent / "airflow_dag_template.j2", + help="The template file for the generated Airflow dags", +) +@click.option( + "--params", + type=click.UNPROCESSED, + default="", + help=PARAMS_ARG_HELP, + callback=_split_params, ) @click.pass_obj def create( @@ -47,8 +94,18 @@ def create( env, target_path, jinja_file, + params, ): # pylint: disable=too-many-locals,too-many-arguments """Create an Airflow DAG for a project""" + project_path = Path.cwd().resolve() + bootstrap_project(project_path) + with KedroSession.create(project_path=project_path, env=env) as session: + context = session.load_context() + dag_config = _load_config(context, pipeline_name) + + # Update with params if provided + dag_config.update(params) + jinja_file = Path(jinja_file).resolve() loader = jinja2.FileSystemLoader(jinja_file.parent) jinja_env = jinja2.Environment(autoescape=True, loader=loader, lstrip_blocks=True) @@ -56,7 +113,11 @@ def create( template = jinja_env.get_template(jinja_file.name) package_name = metadata.package_name - dag_filename = f"{package_name}_dag.py" + dag_filename = ( + f"{package_name}_dag.py" + if pipeline_name == "__default__" + else f"{package_name}_{pipeline_name}_dag.py" + ) target_path = Path(target_path) target_path = target_path / dag_filename @@ -64,6 +125,8 @@ def create( target_path.parent.mkdir(parents=True, exist_ok=True) pipeline = pipelines.get(pipeline_name) + if pipeline is None: + raise KedroCliError(f"Pipeline {pipeline_name} not found.") dependencies = defaultdict(list) for node, parent_nodes in pipeline.node_dependencies.items(): @@ -77,6 +140,7 @@ def create( pipeline_name=pipeline_name, package_name=package_name, pipeline=pipeline, + **dag_config, ).dump(str(target_path)) secho("") @@ -84,7 +148,8 @@ def create( secho(str(target_path)) secho("This file should be copied to your Airflow DAG folder.", fg="yellow") secho( - "The Airflow configuration can be customized by editing this file.", fg="green" + "The Airflow configuration can be customized by editing this file.", + fg="green", ) secho("") secho( @@ -101,4 +166,3 @@ def create( "And all local paths in both the data catalog and log config must be absolute paths.", fg="yellow", ) - secho("") diff --git a/kedro-airflow/tests/conftest.py b/kedro-airflow/tests/conftest.py index c23cc5916..ea285bb2c 100644 --- a/kedro-airflow/tests/conftest.py +++ b/kedro-airflow/tests/conftest.py @@ -4,16 +4,21 @@ discover them automatically. More info here: https://docs.pytest.org/en/latest/fixture.html """ +from __future__ import annotations + +import os from pathlib import Path from shutil import copyfile from click.testing import CliRunner +from cookiecutter.main import cookiecutter from kedro import __version__ as kedro_version +from kedro.framework.cli.starters import TEMPLATE_PATH from kedro.framework.startup import ProjectMetadata from pytest import fixture -@fixture(name="cli_runner") +@fixture(name="cli_runner", scope="session") def cli_runner(): runner = CliRunner() cwd = Path.cwd() @@ -23,10 +28,79 @@ def cli_runner(): yield runner -@fixture -def metadata(cli_runner): # pylint: disable=unused-argument +def _create_kedro_settings_py(file_name: Path, patterns: list[str]): + patterns = ", ".join([f'"{p}"' for p in patterns]) + content = f"""from kedro.config import OmegaConfigLoader +CONFIG_LOADER_CLASS = OmegaConfigLoader +CONFIG_LOADER_ARGS = {{ + "config_patterns": {{ + "airflow": [{patterns}], # configure the pattern for configuration files + }} +}} +""" + file_name.write_text(content) + + +@fixture(scope="session") +def kedro_project(cli_runner): # pylint: disable=unused-argument + tmp_path = Path().cwd() + # From `kedro-mlflow.tests.conftest.py` + config = { + "output_dir": tmp_path, + "kedro_version": kedro_version, + "project_name": "This is a fake project", + "repo_name": "fake-project", + "python_package": "fake_project", + "include_example": True, + } + + cookiecutter( + str(TEMPLATE_PATH), + output_dir=config["output_dir"], + no_input=True, + extra_context=config, + ) + + pipeline_registry_py = """ +from kedro.pipeline import Pipeline, node + + +def identity(arg): + return arg + + +def register_pipelines(): + pipeline = Pipeline( + [ + node(identity, ["input"], ["intermediate"], name="node0"), + node(identity, ["intermediate"], ["output"], name="node1"), + ], + tags="pipeline0", + ) + return { + "__default__": pipeline, + "ds": pipeline, + } + """ + + project_path = tmp_path / "fake-project" + (project_path / "src" / "fake_project" / "pipeline_registry.py").write_text( + pipeline_registry_py + ) + + settings_file = project_path / "src" / "fake_project" / "settings.py" + _create_kedro_settings_py( + settings_file, ["airflow*", "airflow/**", "scheduler*", "scheduler/**"] + ) + + os.chdir(project_path) + return project_path + + +@fixture(scope="session") +def metadata(kedro_project): # pylint: disable=unused-argument # cwd() depends on ^ the isolated filesystem, created by CliRunner() - project_path = Path.cwd() + project_path = kedro_project return ProjectMetadata( project_path / "pyproject.toml", "hello_world", diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 77c051ff5..4b67ff840 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -1,50 +1,228 @@ +from __future__ import annotations + from pathlib import Path +from typing import Any import pytest -from kedro.framework.project import pipelines -from kedro.pipeline import node -from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline +import yaml from kedro_airflow.plugin import commands -def identity(arg): - return arg - - @pytest.mark.parametrize( "dag_name,pipeline_name,command", [ # Test normal execution - ("hello_world_dag", "__default__", ["airflow", "create"]), + ("hello_world", "__default__", ["airflow", "create"]), # Test execution with alternate pipeline name - ("hello_world_dag", "ds", ["airflow", "create", "--pipeline", "ds"]), - # Test execution with different dir and filename for Jinja2 Template - ( - "hello_world_dag", - "__default__", - ["airflow", "create", "-j", "airflow_dag.j2"], - ), + ("hello_world", "ds", ["airflow", "create", "--pipeline", "ds"]), ], ) -def test_create_airflow_dag( - dag_name, pipeline_name, command, mocker, cli_runner, metadata -): +def test_create_airflow_dag(dag_name, pipeline_name, command, cli_runner, metadata): """Check the generation and validity of a simple Airflow DAG.""" - dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}.py" - mock_pipeline = modular_pipeline( - [ - node(identity, ["input"], ["intermediate"], name="node0"), - node(identity, ["intermediate"], ["output"], name="node1"), - ], - tags="pipeline0", + dag_file = ( + Path.cwd() + / "airflow_dags" + / ( + f"{dag_name}_dag.py" + if pipeline_name == "__default__" + else f"{dag_name}_{pipeline_name}_dag.py" + ) + ) + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + + expected_airflow_dag = 'tasks["node0"] >> tasks["node1"]' + with dag_file.open(encoding="utf-8") as f: + dag_code = [line.strip() for line in f.read().splitlines()] + assert expected_airflow_dag in dag_code + dag_file.unlink() + + +def _create_kedro_airflow_yml(file_name: Path, content: dict[str, Any]): + file_name.parent.mkdir(parents=True, exist_ok=True) + with file_name.open("w") as fp: + yaml.dump(content, fp) + + +def test_airflow_config_params( + cli_runner, metadata +): # pylint: disable=too-many-statements + """Check if config variables are picked up""" + dag_name = "hello_world" + template_name = "airflow_params.j2" + content = "{{ owner | default('hello')}}" + + _create_kedro_airflow_jinja_template(Path.cwd(), template_name, content) + + # default + default_content = "hello" + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == default_content + dag_file.unlink() + + # "--params" + expected_content = "testme" + command = ["airflow", "create", "--params", "owner=testme", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + + # airflow.yml + expected_content = "someone else" + file_name = Path.cwd() / "conf" / "base" / "airflow.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + file_name.unlink() + dag_file.unlink() + + # ../airflow.yml + expected_content = "yet someone else" + file_name = Path.cwd() / "conf" / "base" / "airflow" / "default.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + file_name.unlink() + + # random.yml + expected_content = "yet someone else again" + file_name = Path.cwd() / "conf" / "base" / "random.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == default_content + dag_file.unlink() + + # scheduler.yml + file_name = Path.cwd() / "conf" / "base" / "scheduler.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + file_name.unlink() + + # env + expected_content = "again someone else" + file_name = Path.cwd() / "conf" / "local" / "airflow.yml" + _create_kedro_airflow_yml(file_name, {"default": {"owner": expected_content}}) + command = ["airflow", "create", "-j", template_name, "-e", "local"] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + + # custom pipeline name + expected_content = "finally someone else" + file_name = Path.cwd() / "conf" / "base" / "airflow.yml" + _create_kedro_airflow_yml( + file_name, {"default": {"owner": "foobar"}, "ds": {"owner": expected_content}} ) - mocker.patch.dict(pipelines, {pipeline_name: mock_pipeline}) + command = ["airflow", "create", "-j", template_name, "-p", "ds"] + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_ds_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + dag_file.unlink() + + +def _create_kedro_airflow_jinja_template(path: Path, name: str, content: str): + (path / name).write_text(content) + + +def test_custom_template_exists(cli_runner, metadata): + """Test execution with different dir and filename for Jinja2 Template""" + dag_name = "hello_world" + template_name = "custom_template.j2" + command = ["airflow", "create", "-j", template_name] + content = "print('my custom dag')" + # because there are no jinja variables + expected_content = content + + _create_kedro_airflow_jinja_template(Path.cwd(), template_name, content) + + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" result = cli_runner.invoke(commands, command, obj=metadata) - assert result.exit_code == 0 + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert dag_file.exists() + assert dag_file.read_text() == expected_content + + +def test_custom_template_nonexistent(cli_runner, metadata): + """Test execution with different dir and filename for Jinja2 Template""" + template_name = "non_existent_custom_template.j2" + command = ["airflow", "create", "-j", template_name] + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 2 + assert ( + f"Error: Invalid value for '-j' / '--jinja-file': File '{template_name}' does not exist." + in result.stdout + ) + + +def _kedro_create_env(project_root: Path): + (project_root / "conf" / "remote").mkdir(parents=True) + + +def test_create_airflow_dag_env_parameter_exists(cli_runner, metadata): + """Test the `env` parameter""" + dag_name = "hello_world" + command = ["airflow", "create", "--env", "remote"] + + _kedro_create_env(Path.cwd()) + + dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}_dag.py" + result = cli_runner.invoke(commands, command, obj=metadata) + + assert result.exit_code == 0, (result.exit_code, result.stdout) assert dag_file.exists() expected_airflow_dag = 'tasks["node0"] >> tasks["node1"]' - with open(dag_file, "r", encoding="utf-8") as f: + with dag_file.open(encoding="utf-8") as f: dag_code = [line.strip() for line in f.read().splitlines()] assert expected_airflow_dag in dag_code + + +def test_create_airflow_dag_nonexistent_pipeline(cli_runner, metadata): + """Test executing with a non-existing pipeline""" + command = ["airflow", "create", "--pipeline", "de"] + result = cli_runner.invoke(commands, command, obj=metadata) + assert result.exit_code == 1 + assert ( + "kedro.framework.cli.utils.KedroCliError: Pipeline de not found." + in result.stdout + )