Skip to content

Commit

Permalink
Rename generatorS-params, remove leftover TEST_MODES from generators,…
Browse files Browse the repository at this point in the history
… improve documentation
  • Loading branch information
BAntonellini committed Oct 17, 2023
1 parent 0bf541a commit be4cf6f
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 58 deletions.
67 changes: 43 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,27 @@ The basic structure of these YMLs must consist of:
- `dependencies`: whether the task is dependent on another one(s)
- any `key:value` pair of [Operator arguments](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html)

Basic YML DAG example:
#### Airflow DAG Generators

When a YML Dag `node` is of type `task_group`, **Generators** can be used instead of `Operators`.

They are custom classes that receive YML `key:value` pairs and return one or more tasks for the respective task group. Any pair specified other than `type: task_group` will be passed to the specified `generator`, and it has the responsibility of returning N amount of `task_name = Operator(params)`.

We provide some prebuilt Generators:

- `AirbyteGenerator` creates `AirbyteTriggerSyncOperator` tasks, which are used in Airflow to sync Airbyte connections

- It must receive Airbyte's `host` and `port` and a `connection_ids` list of Airbyte Connections to Sync

- `FivetranGenerator`: creates `FivetranOperator` tasks, which are used to trigger Fivetran sync operations from Airflow
- It must receive Fivetran's `api_key`, `api_secret` and a `connection_ids` list of Fivetran Connectors to Sync. It can optionally receive `wait_for_completion: true` and 2 tasks will be created for each sync: a `FivetranOperator` and it's respective `FivetranSensor` that monitors the status of the sync.
- `AirbyteDbtGenerator` and `FivetranDbtGenerator`: instead of passing them Airbyte or Fivetran connections, they use dbt to discover those IDs. Apart from their parent Generators mandatory fields, they can receive:
- `dbt_project_path`: dbt/project/folder
- `virtualenv_path`: path to a virtualenv in case dbt has to be ran with another Python executable
- `run_dbt_compile`: true/false
- `run_dbt_deps`: true/false

#### Basic YML DAG example:

```yaml
description: "dbt-coves DAG"
Expand All @@ -350,33 +370,21 @@ default_args:
start_date: 2023-01-01
catchup: false
nodes:
sync_airbyte:
type: task_group
tooltip: "Sync Airbyte connections"
generator: AirbyteGenerator
host: http://localhost
port: 8000
connection_ids:
- 28c7ff39-f675-456e-9620-71c4b6754f97
- 8f9e15b0-5f5f-4657-8191-162e28495ad9
task_1:
type: task
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'Hello Task_1'"
task_2:
type: task
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'Hello Task_2'"
dependencies: ["task_1"]
dependencies: ["sync_airbyte"]
```
#### Airflow DAG Generators
When a YML Dag `node` is of type `task_group`, **Generators** can be used instead of `Operators`.

They are custom classes that receive YML `key:value` pairs and return one or more tasks for the respective task group. Any pair specified other than `type: task_group` will be passed to the specified `generator`, and it has the responsibility of returning N amount of `task_name = Operator(params)`.

We provide some Generators:

- `AirbyteGenerator`: must receive Airbyte's `host` and `port` and optionally a `connection_ids` list of Airbyte Connections to Sync
- `FivetranGenerator`: must receive Fivetran's `api_key` and `api_secret` and optionally a `connection_ids` list of Fivetran Connectors to Sync
- `AirbyteDbtGenerator` and `FivetranDbtGenerator`: instead of passing them Airbyte or Fivetran connections, they use dbt to discover those IDs. Apart from their related Generators mandatory fields, they can receive:
- `dbt_project_path`: dbt/project/folder
- `virtualenv_path`: path to a virtualenv in case dbt has to be ran with another Python executable
- `run_dbt_compile`: true/false
- `run_dbt_deps`: true/false

##### Create your custom Generator
You can create your own DAG Generator. Any `key:value` specified in the YML DAG will be passed to it's constructor.
Expand All @@ -386,6 +394,17 @@ This Generator needs:
- a `imports` attribute: a list of _module.class_ Operator of the tasks it outputs
- a `generate_tasks` method that returns the set of `"task_name = Operator()"` strings to write as the task group tasks.

```python
class PostgresGenerator():
def __init__(self) -> None:
""" Any key:value pair in the YML Dag will get here """
self.imports = ["airflow.providers.postgres.operators.postgres.PostgresOperator"]
def generate_tasks(self):
""" Use your custom logic and return N `name = PostgresOperator()` strings """
raise NotImplementedError
```

### airflow-dags generation arguments

`dbt-coves generate airflow-dags` supports the following args:
Expand All @@ -401,7 +420,7 @@ This Generator needs:
--generators-folder
# Path to your Python module with custom Generators

--generator-params
--generators-params
# Object with default values for the desired Generator(s)
# For example: {"AirbyteGenerator": {"host": "http://localhost", "port": "8000"}}

Expand Down
4 changes: 2 additions & 2 deletions dbt_coves/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GenerateAirflowDagsModel(BaseModel):
from_path: Optional[str] = ""
validate_operators: Optional[bool] = False
generators_folder: Optional[str] = "dbt_coves.tasks.generate.airflow_generators"
generator_params: Optional[Dict[str, Any]] = {}
generators_params: Optional[Dict[str, Any]] = {}
secrets_path: Optional[str] = ""
secrets_manager: Optional[str] = ""
secrets_url: Optional[str] = ""
Expand Down Expand Up @@ -191,7 +191,7 @@ class DbtCovesConfig:
"generate.airflow_dags.from_path",
"generate.airflow_dags.validate_operators",
"generate.airflow_dags.generators_folder",
"generate.airflow_dags.generator_params",
"generate.airflow_dags.generators_params",
"generate.airflow_dags.secrets_path",
"generate.airflow_dags.secrets_manager",
"generate.airflow_dags.secrets_url",
Expand Down
12 changes: 6 additions & 6 deletions dbt_coves/tasks/generate/airflow_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def register_parser(cls, sub_parsers, base_subparser):
help="Custom DAG generators folder",
)
subparser.add_argument(
"--generator-params",
"--generators-params",
help="Object with default values for the desired Generator(s), i.e {'AirbyteDbtGenerator' "
"{'host': 'http://localhost'}}",
type=str,
Expand Down Expand Up @@ -206,16 +206,16 @@ def get_generator_class(self, generator: str):

def _merge_generator_configs(self, tg_conf: Dict[str, Any], generator: str) -> Dict[str, Any]:
"""
Merge the generator configs between YML Dag and dbt-coves `generator_params` config
Merge the generator configs between YML Dag and dbt-coves `generators_params` config
For example.
generator_params:
generators_params:
AirbyteDbtGenerator:
host: "http://localhost"
port: 8000
"""
generator_params = self.get_config_value("generator_params")
coves_config_generator_params = generator_params.get(generator, {})
return deep_merge(tg_conf, coves_config_generator_params)
generators_params = self.get_config_value("generators_params")
coves_config_generators_params = generators_params.get(generator, {})
return deep_merge(tg_conf, coves_config_generators_params)

def generate_task_group(self, tg_name: str, tg_conf: Dict[str, Any]):
"""
Expand Down
14 changes: 3 additions & 11 deletions dbt_coves/tasks/generate/airflow_generators/airbyte.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from os import environ
from typing import Any, Dict, List

from slugify import slugify
Expand All @@ -9,8 +8,6 @@
)
from dbt_coves.utils.api_caller import AirbyteApiCaller

TEST_MODE = bool(environ.get("TEST_MODE"))


class AirbyteGeneratorException(Exception):
pass
Expand All @@ -28,14 +25,9 @@ def __init__(
self.connection_ids = connection_ids
self.ignored_source_tables = []
self.imports = ["airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator"]

if TEST_MODE:
self.airbyte_connections = []
self.connections_should_exist = False
else:
self.api_caller = AirbyteApiCaller(self.host, self.port)
self.airbyte_connections = self.api_caller.airbyte_connections_list
self.connections_should_exist = True
self.api_caller = AirbyteApiCaller(self.host, self.port)
self.airbyte_connections = self.api_caller.airbyte_connections_list
self.connections_should_exist = True

def validate_ids_in_airbyte(self, connection_ids):
"""
Expand Down
16 changes: 4 additions & 12 deletions dbt_coves/tasks/generate/airflow_generators/fivetran.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from os import environ
from typing import Any, Dict, List

from slugify import slugify
Expand All @@ -7,8 +6,6 @@

from .base import BaseDbtCovesTaskGenerator, BaseDbtGenerator

TEST_MODE = bool(environ.get("TEST_MODE"))


class FivetranGeneratorException(Exception):
pass
Expand All @@ -29,15 +26,10 @@ def __init__(
self.connection_ids = connection_ids
self.wait_for_completion = wait_for_completion
self.ignored_source_tables = ["fivetran_audit", "fivetran_audit_warning"]
if TEST_MODE:
self.fivetran_data = {}
self.fivetran_groups = {}
self.connectors_should_exist = False
else:
self.fivetran_api = FivetranApiCaller(api_key, api_secret)
self.fivetran_data = self.fivetran_api.fivetran_data
self.fivetran_groups = self.fivetran_api.fivetran_groups
self.connectors_should_exist = True
self.fivetran_api = FivetranApiCaller(api_key, api_secret)
self.fivetran_data = self.fivetran_api.fivetran_data
self.fivetran_groups = self.fivetran_api.fivetran_groups
self.connectors_should_exist = True

def _get_fivetran_connector_name_for_id(self, connector_id):
"""
Expand Down
6 changes: 3 additions & 3 deletions dbt_coves/utils/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self, cli_parser: ArgumentParser) -> None:
"from_path": None,
"validate_operators": False,
"generators_folder": None,
"generator_params": None,
"generators_params": None,
"secrets_path": None,
"secrets_manager": None,
"secrets_url": None,
Expand Down Expand Up @@ -271,8 +271,8 @@ def parse_args(self, cli_args: List[str] = list()) -> None:
] = self.args.validate_operators
if self.args.generators_folder:
self.generate["airflow_dags"]["generators_folder"] = self.args.generators_folder
if self.args.generator_params:
self.generate["airflow_dags"]["generator_params"] = self.args.generator_params
if self.args.generators_params:
self.generate["airflow_dags"]["generators_params"] = self.args.generators_params
if self.args.secrets_path:
self.generate["airflow_dags"]["secrets_path"] = self.args.secrets_path
if self.args.secrets_manager:
Expand Down

0 comments on commit be4cf6f

Please sign in to comment.