Skip to content

Commit

Permalink
Merge pull request #407 from datacoves/DCV-1929-implement-dbt-coves-g…
Browse files Browse the repository at this point in the history
…enerate-airflow-dags-from-orchestrate-dags

feat/Dcv 1929 implement dbt coves generate airflow dags from orchestrate dags
  • Loading branch information
mprunell authored Oct 30, 2023
2 parents 9be3641 + a63d382 commit cd9043b
Show file tree
Hide file tree
Showing 17 changed files with 1,052 additions and 49 deletions.
114 changes: 113 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ dbt-coves setup precommit
dbt-coves generate <resource>
```

Where _\<resource\>_ could be _sources_, _properties_, _metadata_ or _docs_.
Where _\<resource\>_ could be _sources_, _properties_, _metadata_, _docs_ or _airflow-dags_.

```console
dbt-coves generate sources
Expand Down Expand Up @@ -319,6 +319,118 @@ You can use dbt-coves to improve the standard dbt docs generation process. It ge
# Mandatory when using --merge-deferred
```

### Generate airflow-dags

```console
dbt-coves generate airflow-dags
```

Translate YML files into their Airflow Python code equivalent. With this, DAGs can be easily written with some `key:value` pairs.

The basic structure of these YMLs must consist of:

- Global configurations (description, schedule_interval, tags, catchup, etc.)
- `default_args`
- `nodes`: where tasks and task groups are defined
- each Node is a nested object, with it's `name` as key and it's configuration as values.
- this configuration must cover:
- `type`: 'task' or 'task_group'
- `operator`: Airflow operator that will run the tasks (full _module.class_ naming)
- `dependencies`: whether the task is dependent on another one(s)
- any `key:value` pair of [Operator arguments](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html)

#### Airflow DAG Generators

When a YML Dag `node` is of type `task_group`, **Generators** can be used instead of `Operators`.

They are custom classes that receive YML `key:value` pairs and return one or more tasks for the respective task group. Any pair specified other than `type: task_group` will be passed to the specified `generator`, and it has the responsibility of returning N amount of `task_name = Operator(params)`.

We provide some prebuilt Generators:

- `AirbyteGenerator` creates `AirbyteTriggerSyncOperator` tasks (one per Airbyte connection)

- It must receive Airbyte's `host` and `port`, `airbyte_conn_id` (Airbyte's connection name on Airflow) and a `connection_ids` list of Airbyte Connections to Sync

- `FivetranGenerator`: creates `FivetranOperator` tasks (one per Fivetran connection)
- It must receive Fivetran's `api_key`, `api_secret` and a `connection_ids` list of Fivetran Connectors to Sync. It can optionally receive `wait_for_completion: true` and 2 tasks will be created for each sync: a `FivetranOperator` and it's respective `FivetranSensor` that monitors the status of the sync.
- `AirbyteDbtGenerator` and `FivetranDbtGenerator`: instead of passing them Airbyte or Fivetran connections, they use dbt to discover those IDs. Apart from their parent Generators mandatory fields, they can receive:
- `dbt_project_path`: dbt/project/folder
- `virtualenv_path`: path to a virtualenv in case dbt has to be ran with another Python executable
- `run_dbt_compile`: true/false
- `run_dbt_deps`: true/false

#### Basic YML DAG example:

```yaml
description: "dbt-coves DAG"
schedule_interval: "@hourly"
tags:
- version_01
default_args:
start_date: 2023-01-01
catchup: false
nodes:
airbyte_dbt:
type: task_group
tooltip: "Sync dbt-related Airbyte connections"
generator: AirbyteDbtGenerator
host: http://localhost
port: 8000
dbt_project_path: /path/to/dbt_project
virtualenv_path: /virtualenvs/dbt_160
run_dbt_compile: true
run_dbt_deps: false
airbyte_conn_id: airbyte_connection
task_1:
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'This runs after airbyte tasks'"
dependencies: ["airbyte_dbt"]
```
##### Create your custom Generator
You can create your own DAG Generator. Any `key:value` specified in the YML DAG will be passed to it's constructor.

This Generator needs:

- a `imports` attribute: a list of _module.class_ Operator of the tasks it outputs
- a `generate_tasks` method that returns the set of `"task_name = Operator()"` strings to write as the task group tasks.

```python
class PostgresGenerator():
def __init__(self) -> None:
""" Any key:value pair in the YML Dag will get here """
self.imports = ["airflow.providers.postgres.operators.postgres.PostgresOperator"]
def generate_tasks(self):
""" Use your custom logic and return N `name = PostgresOperator()` strings """
raise NotImplementedError
```

### airflow-dags generation arguments

`dbt-coves generate airflow-dags` supports the following args:

```console
--from-path
# Path to the folder containing YML Dags, or the single file to generate.

--validate-operators
# Ensure Airflow operators are installed by trying to import them before writing to Python.
# Flag: no value required

--generators-folder
# Path to your Python module with custom Generators

--generators-params
# Object with default values for the desired Generator(s)
# For example: {"AirbyteGenerator": {"host": "http://localhost", "port": "8000"}}

--secrets-path
# Secret files location for DAG configuration, i.e. 'yml_path/secrets/'
# Secret content must match the YML dag spec of `nodes -> node_name -> config`
```

## Extract configuration from Airbyte

```console
Expand Down
30 changes: 28 additions & 2 deletions dbt_coves/config/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Holds config for dbt-coves."""

from pathlib import Path
from typing import List, Optional
from typing import Any, Dict, List, Optional

from pydantic import BaseModel

Expand Down Expand Up @@ -34,7 +34,7 @@ class GenerateSourcesModel(BaseModel):
templates_folder: Optional[str] = ".dbt_coves/templates"
metadata: Optional[str] = ""
no_prompt: Optional[bool] = False
flatten_json_fields: Optional[bool] = False
flatten_json_fields: Optional[str] = "ask"
overwrite_staging_models: Optional[bool] = False
skip_model_props: Optional[bool] = False

Expand All @@ -53,11 +53,26 @@ class GenerateDocsModel(BaseModel):
state: Optional[str] = ""


class GenerateAirflowDagsModel(BaseModel):
from_path: Optional[str] = ""
validate_operators: Optional[bool] = False
generators_folder: Optional[str] = "dbt_coves.tasks.generate.airflow_generators"
generators_params: Optional[Dict[str, Any]] = {}
secrets_path: Optional[str] = ""
secrets_manager: Optional[str] = ""
secrets_url: Optional[str] = ""
secrets_token: Optional[str] = ""
secrets_project: Optional[str] = ""
secrets_tags: Optional[str] = ""
secrets_key: Optional[str] = ""


class GenerateModel(BaseModel):
sources: Optional[GenerateSourcesModel] = GenerateSourcesModel()
properties: Optional[GeneratePropertiesModel] = GeneratePropertiesModel()
metadata: Optional[GenerateMetadataModel] = GenerateMetadataModel()
docs: Optional[GenerateDocsModel] = GenerateDocsModel()
airflow_dags: Optional[GenerateAirflowDagsModel] = GenerateAirflowDagsModel()


class ExtractAirbyteModel(BaseModel):
Expand Down Expand Up @@ -173,6 +188,17 @@ class DbtCovesConfig:
"generate.metadata.no_prompt",
"generate.docs.merge_deferred",
"generate.docs.state",
"generate.airflow_dags.from_path",
"generate.airflow_dags.validate_operators",
"generate.airflow_dags.generators_folder",
"generate.airflow_dags.generators_params",
"generate.airflow_dags.secrets_path",
"generate.airflow_dags.secrets_manager",
"generate.airflow_dags.secrets_url",
"generate.airflow_dags.secrets_token",
"generate.airflow_dags.secrets_project",
"generate.airflow_dags.secrets_tags",
"generate.airflow_dags.secrets_key",
"extract.airbyte.path",
"extract.airbyte.host",
"extract.airbyte.port",
Expand Down
Loading

0 comments on commit cd9043b

Please sign in to comment.