Skip to content

Commit

Permalink
Merge branch 'main' into feat/datasets/delay-connection
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored Aug 22, 2023
2 parents 82d50eb + 1831245 commit 8b87b2c
Show file tree
Hide file tree
Showing 64 changed files with 1,870 additions and 520 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/check-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,34 @@ jobs:
- name: Run linter
run: make plugin=${{ inputs.plugin }} lint

RTD-build:
if: inputs.plugin == 'kedro-datasets'
defaults:
run:
shell: bash
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Cache python packages
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{inputs.plugin}}-ubuntu-latest-python-"3.8"
restore-keys: ${{inputs.plugin}}
- name: Install dependencies
run: |
cd ${{ inputs.plugin }}
pip install ".[docs]"
pip install ".[test]"
- name: RTD build for kedro-datasets
run: |
make rtd
e2e-tests:
if: inputs.plugin != 'kedro-datasets'
defaults:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/kedro-airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ on:
- "kedro-datasets/**"
- "kedro-docker/**"
- "kedro-telemetry/**"
schedule:
# Run every day at midnight (UTC time)
- cron: '0 0 * * *'

jobs:
airflow-test:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/kedro-datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ on:
- "kedro-airflow/**"
- "kedro-docker/**"
- "kedro-telemetry/**"
schedule:
# Run every day at midnight (UTC time)
- cron: '0 0 * * *'

jobs:
datasets-test:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/kedro-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ on:
- "kedro-airflow/**"
- "kedro-datasets/**"
- "kedro-telemetry/**"
schedule:
# Run every day at midnight (UTC time)
- cron: '0 0 * * *'

jobs:
docker-test:
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/kedro-telemetry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ on:
- "kedro-airflow/**"
- "kedro-datasets/**"
- "kedro-docker/**"
schedule:
# Run every day at midnight (UTC time)
- cron: '0 0 * * *'

jobs:
telemetry-test:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ test-no-spark-sequential:
# kedro-datasets/snowflake tests skipped from default scope
test-snowflake-only:
cd kedro-datasets && pytest tests --no-cov --numprocesses 1 --dist loadfile -m snowflake

rtd:
cd kedro-datasets && python -m sphinx -WETan -j auto -D language=en -b linkcheck -d _build/doctrees docs/source _build/linkcheck
88 changes: 87 additions & 1 deletion kedro-airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Please visit the guide to [deploy Kedro as a Python package](https://kedro.readt

#### What if my DAG file is in a different directory to my project folder?

By default the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.
By default, the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.

#### What if I want to use a different Jinja2 template?

Expand All @@ -56,6 +56,92 @@ You can use the additional command line argument `--jinja-file` (alias `-j`) to
kedro airflow create --jinja-file=./custom/template.j2
```

#### How can I pass arguments to the Airflow DAGs dynamically?

`kedro-airflow` picks up configuration from `airflow.yml` in `conf/base` or `conf/local` of your Kedro project.
Or it could be in a folder starting with `airflow`.
The [parameters](https://docs.kedro.org/en/stable/configuration/parameters.html) are read by Kedro.
Arguments can be specified globally, or per pipeline:

```yaml
# Global parameters
default:
start_date: [2023, 1, 1]
max_active_runs: 3
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once"
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 1
retry_delay: 5

# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
```
Arguments can also be passed via `--params` in the command line:

```bash
kedro airflow create --params "schedule_interval='@weekly'"
```

These variables are passed to the Jinja2 template that creates an Airflow DAG from your pipeline.

### What if I want to use a configuration pattern other than `airflow*` and `airflow**`?

In order to configure the config loader, update the `settings.py` file in your Kedro project.
For instance, if you would like to use the name `scheduler`, then change the file as follows:

```python
CONFIG_LOADER_ARGS = {
"config_patterns": {"airflow": ["scheduler*", "scheduler/**"]}
}
Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader]
#### What if I want to pass different arguments?
In order to pass arguments other than those specified in the default template, simply pass a custom template (see: _"What if I want to use a different Jinja2 template?"_)
The syntax for arguments is:
```
{{ argument_name }}
```

In order to make arguments optional, one can use:
```
{{ argument_name | default("default_value") }}
```

For examples, please have a look at the default template (`airflow_dag_template.j2`).

### What if I want to use a configuration file other than `airflow.yml`?

The default configuration pattern is `["airflow*", "airflow/**"]`.
In order to configure the `OmegaConfigLoader`, update the `settings.py` file in your Kedro project as follows:

```python
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
# other args
"config_patterns": {"airflow": ["airflow*", "airflow/**"]} # configure the pattern for configuration files
}
```

Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader]

#### How can I use Airflow runtime parameters?

It is possible to pass parameters when triggering an Airflow DAG from the user interface.
In order to use this feature, create a custom template using the [Params syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html).
See ["What if I want to use a different Jinja2 template?"](#what-if-i-want-to-use-a-different-jinja2-template) for instructions on using custom templates.

#### What if I want to use a different Airflow Operator?

Which Airflow Operator to use depends on the environment your project is running in.
Expand Down
13 changes: 12 additions & 1 deletion kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# Upcoming release 0.5.2
# Upcoming Release

# Release 0.6.0
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.
* Migrate all project metadata to static `pyproject.toml`.
* Configure DAG kwargs via `airflow.yml`.
* The generated DAG file now contains the pipeline name.
* Included help for CLI arguments (see `kedro airflow create --help`).
* Added additional CLI argument `--params` to pass configuration to the Jinja2 template.

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:

* [sbrugman](https://github.com/sbrugman)

# Release 0.5.1
* Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template.
Expand Down
8 changes: 4 additions & 4 deletions kedro-airflow/features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def install_kedro(context, version):
if version == "latest":
cmd = [context.pip, "install", "-U", "kedro[pandas]"]
else:
cmd = [context.pip, "install", "kedro[pandas]=={}".format(version)]
cmd = [context.pip, "install", f"kedro[pandas]=={version}"]
res = run(cmd, env=context.env)

if res.returncode != OK_EXIT_CODE:
Expand Down Expand Up @@ -121,7 +121,7 @@ def check_message_printed(context, msg):
stdout = context.result.stdout
assert msg in stdout, (
"Expected the following message segment to be printed on stdout: "
"{exp_msg},\nbut got {actual_msg}".format(exp_msg=msg, actual_msg=stdout)
f"{msg},\nbut got {stdout}"
)


Expand Down Expand Up @@ -187,6 +187,6 @@ def check_status_code(context):
if context.result.returncode != OK_EXIT_CODE:
print(context.result.stdout)
print(context.result.stderr)
assert False, "Expected exit code {}" " but got {}".format(
OK_EXIT_CODE, context.result.returncode
raise AssertionError(
f"Expected exit code {OK_EXIT_CODE} but got {context.result.returncode}"
)
2 changes: 1 addition & 1 deletion kedro-airflow/kedro_airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Kedro plugin for running a project with Airflow."""

__version__ = "0.5.1"
__version__ = "0.6.0"
61 changes: 29 additions & 32 deletions kedro-airflow/kedro_airflow/airflow_dag_template.j2
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path

Expand All @@ -10,14 +11,13 @@ from kedro.framework.project import configure_project


class KedroOperator(BaseOperator):

@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
project_path: str | Path,
env: str,
*args, **kwargs
) -> None:
Expand All @@ -35,46 +35,43 @@ class KedroOperator(BaseOperator):
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])


# Kedro settings required to run your pipeline
env = "{{ env }}"
pipeline_name = "{{ pipeline_name }}"
project_path = Path.cwd()
package_name = "{{ package_name }}"

# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
"{{ dag_name | safe | slugify }}",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

tasks = {}
{% for node in pipeline.nodes %}
tasks["{{ node.name | safe | slugify }}"] = KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
dag_id="{{ dag_name | safe | slugify }}",
start_date=datetime({{ start_date | default([2023, 1, 1]) | join(",")}}),
max_active_runs={{ max_active_runs | default(3) }},
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval="{{ schedule_interval | default('@once') }}",
catchup={{ catchup | default(False) }},
# Default settings applied to all tasks
default_args=dict(
owner="{{ owner | default('airflow') }}",
depends_on_past={{ depends_on_past | default(False) }},
email_on_failure={{ email_on_failure | default(False) }},
email_on_retry={{ email_on_retry | default(False) }},
retries={{ retries | default(1) }},
retry_delay=timedelta(minutes={{ retry_delay | default(5) }})
)
{% endfor %}
) as dag:
tasks = {
{% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
),
{% endfor %} }

{% for parent_node, child_nodes in dependencies.items() -%}
{% for child in child_nodes %}
tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% endfor %}
{%- endfor %}
Loading

0 comments on commit 8b87b2c

Please sign in to comment.