From 5875ac23a2fd17f5bc5cd34c0359afda5eea3b9c Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Thu, 24 Oct 2024 18:02:42 -0700 Subject: [PATCH] [dagster-airlift] [federation-demo] dbt cloud side --- .../examples/dbt-example/Makefile | 5 +- .../dbt_example/dagster_defs/constants.py | 2 + .../dagster_defs/dbt_cloud_airflow.py | 25 ++++++ .../dagster_defs/dbt_cloud_assets.py | 35 +++++++- .../dagster_defs/dbt_cloud_utils.py | 83 +++++++++++++++++++ .../federated_airflow_dags/dags.py | 2 +- .../integration_tests/test_federated.py | 27 +++++- .../examples/dbt-example/setup.py | 2 +- .../examples/dbt-example/tox.ini | 1 + .../dagster-dlift/dagster_dlift/translator.py | 3 + 10 files changed, 178 insertions(+), 7 deletions(-) create mode 100644 examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_airflow.py create mode 100644 examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_utils.py diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile index a3fc6425a08f2..2ed9e935af2a7 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/Makefile +++ b/examples/experimental/dagster-airlift/examples/dbt-example/Makefile @@ -65,8 +65,11 @@ run_complete: run_federated_airflow_defs: dagster dev -m dbt_example.dagster_defs.federated_airflow_defs -p 3333 +run_dbt_cloud_defs: + dagster dev -m dbt_example.dagster_defs.dbt_cloud_airflow -p 3333 + wipe: ## Wipe out all the files created by the Makefile - rm -rf $(AIRFLOW_HOME) $(DAGSTER_HOME) + rm -rf $(AIRFLOW_HOME) $(FEDERATED_AIRFLOW_HOME) $(DAGSTER_HOME) wipe_dagster: ## Wipe out all the files created by the Makefile rm -rf $$DAGSTER_HOME \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py index b2ab51316bfa3..3694534c8f7d0 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/constants.py @@ -19,6 +19,8 @@ ASSETS_PATH = Path(__file__).parent / "defs" PROXIED_STATE_PATH = Path(__file__).parent / "proxied_state" DBT_DAG_ASSET_KEY = AssetKey([AIRFLOW_INSTANCE_NAME, "dag", "dbt_dag"]) +UPLOAD_SOURCE_DATA_ASSET_KEY = AssetKey([FEDERATED_INSTANCE_NAME, "dag", "upload_source_data"]) +DBT_MODEL_TO_DAG = {"model.test_environment.cleaned_corpus": UPLOAD_SOURCE_DATA_ASSET_KEY} def dbt_project_path() -> Path: diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_airflow.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_airflow.py new file mode 100644 index 0000000000000..58a69e9f9765f --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_airflow.py @@ -0,0 +1,25 @@ +from dagster import Definitions, ScheduleDefinition + +from dbt_example.dagster_defs.lakehouse import lakehouse_assets_def, lakehouse_existence_check +from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS + +from .dbt_cloud_assets import get_dbt_cloud_assets +from .federated_airflow import get_federated_airflow_assets, get_federated_airflow_sensor +from .jaffle_shop import jaffle_shop_assets, jaffle_shop_resource + +daily_schedule = ScheduleDefinition(name="daily_schedule", cron_schedule="0 0 * * *", target="*") + +federated_airflow_dags = get_federated_airflow_assets() +linked_dbt_cloud_assets = get_dbt_cloud_assets() +defs = Definitions( + assets=[ + lakehouse_assets_def(csv_path=CSV_PATH, duckdb_path=DB_PATH, columns=IRIS_COLUMNS), + jaffle_shop_assets, + *federated_airflow_dags, + linked_dbt_cloud_assets, + ], + asset_checks=[lakehouse_existence_check(csv_path=CSV_PATH, duckdb_path=DB_PATH)], + schedules=[daily_schedule], + sensors=[get_federated_airflow_sensor()], + resources={"dbt": jaffle_shop_resource()}, +) diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py index 13f7f957fe1e7..9616dc516b750 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_assets.py @@ -1 +1,34 @@ -from dagster_dlift.project import DBTCloudProjectEnvironment, DbtCloudCredentials +from dagster import AssetsDefinition, multi_asset + +from .constants import DBT_MODEL_TO_DAG +from .dbt_cloud_utils import ( + EXPECTED_TAG, + add_deps, + eager, + filter_specs_by_tag, + get_project, + relevant_check_specs, +) + + +def get_dbt_cloud_assets() -> AssetsDefinition: + # dags that are upstream of models. + dbt_cloud_project = get_project() + filtered_specs = filter_specs_by_tag(dbt_cloud_project.get_asset_specs(), EXPECTED_TAG) + specs_with_airflow_deps = add_deps(DBT_MODEL_TO_DAG, filtered_specs) + # Dbt cloud assets will run every time the upstream dags run. + specs_with_eager_automation = eager(specs_with_airflow_deps) + + @multi_asset( + specs=specs_with_eager_automation, + check_specs=relevant_check_specs( + specs_with_eager_automation, dbt_cloud_project.get_check_specs() + ), + ) + def _dbt_cloud_assets(): + client = dbt_cloud_project.get_client() + run = client.cli(["build", "--select", f"tag:{EXPECTED_TAG}"]) + run.wait_for_success() + yield from run.get_asset_events() + + return _dbt_cloud_assets diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_utils.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_utils.py new file mode 100644 index 0000000000000..b99a65a903103 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/dbt_cloud_utils.py @@ -0,0 +1,83 @@ +from typing import Dict, Sequence + +from dagster import ( + AssetCheckSpec, + AssetKey, + AssetSpec, + _check as check, +) +from dagster._core.definitions.asset_dep import AssetDep +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) +from dagster_dlift.project import DbtCloudClient, DbtCloudCredentials, DBTCloudProjectEnvironment +from dagster_dlift.test.utils import get_env_var + +ENV_NAME = "test" +EXPECTED_TAG = "demo" + + +def get_unscoped_client() -> DbtCloudClient: + return DbtCloudClient( + account_id=int(get_env_var("KS_DBT_CLOUD_ACCOUNT_ID")), + token=get_env_var("KS_DBT_CLOUD_TOKEN"), + access_url=get_env_var("KS_DBT_CLOUD_ACCESS_URL"), + discovery_api_url=get_env_var("KS_DBT_CLOUD_DISCOVERY_API_URL"), + ) + + +def get_environment_id() -> int: + return get_unscoped_client().get_environment_id_by_name(ENV_NAME) + + +def get_project_id() -> int: + return int(get_env_var("KS_DBT_CLOUD_PROJECT_ID")) + + +def get_project() -> DBTCloudProjectEnvironment: + return DBTCloudProjectEnvironment( + credentials=DbtCloudCredentials( + account_id=int(get_env_var("KS_DBT_CLOUD_ACCOUNT_ID")), + token=get_env_var("KS_DBT_CLOUD_TOKEN"), + access_url=get_env_var("KS_DBT_CLOUD_ACCESS_URL"), + discovery_api_url=get_env_var("KS_DBT_CLOUD_DISCOVERY_API_URL"), + ), + project_id=get_project_id(), + environment_id=get_environment_id(), + ) + + +def filter_specs_by_tag(specs: Sequence[AssetSpec], tag: str) -> Dict[AssetKey, AssetSpec]: + return { + spec.key: spec for spec in specs if tag in check.not_none(spec.metadata)["raw_data"]["tags"] + } + + +def add_dep_to_spec(spec: AssetSpec, dep: AssetKey) -> AssetSpec: + return spec._replace(deps=[*spec.deps, AssetDep(dep)]) + + +def key_for_uid(specs: Sequence[AssetSpec], uid: str) -> AssetKey: + return next(spec.key for spec in specs if spec.metadata["raw_data"]["uniqueId"] == uid) + + +def relevant_check_specs( + specs: Sequence[AssetSpec], check_specs: Sequence[AssetCheckSpec] +) -> Sequence[AssetCheckSpec]: + spec_map = {spec.key: spec for spec in specs} + return [spec for spec in check_specs if spec.key.asset_key in spec_map] + + +def add_deps( + uid_to_dep_mapping: Dict[str, AssetKey], specs: Dict[AssetKey, AssetSpec] +) -> Sequence[AssetSpec]: + specs = dict(specs) + for uid, dep in uid_to_dep_mapping.items(): + specs[key_for_uid(list(specs.values()), uid)] = add_dep_to_spec( + specs[key_for_uid(list(specs.values()), uid)], dep + ) + return list(specs.values()) + + +def eager(specs: Sequence[AssetSpec]) -> Sequence[AssetSpec]: + return [spec._replace(automation_condition=AutomationCondition.eager()) for spec in specs] diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/federated_airflow_dags/dags.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/federated_airflow_dags/dags.py index f288e71040d44..cba5fe7d689af 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/federated_airflow_dags/dags.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/federated_airflow_dags/dags.py @@ -10,7 +10,7 @@ } upload_seeds_dag = DAG( - dag_id="upload_seeds", + dag_id="upload_source_data", default_args=default_args, schedule_interval=None, is_paused_upon_creation=False, diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_federated.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_federated.py index 277df1b010cb6..b8e5076256001 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_federated.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example_tests/integration_tests/test_federated.py @@ -5,6 +5,7 @@ import pytest from dagster import AssetKey, AssetsDefinition, DagsterInstance, materialize +from dagster._core.definitions.definitions_class import Definitions from dagster._core.test_utils import environ from dagster._time import get_current_datetime from dagster_airlift.core import AirflowInstance @@ -46,6 +47,8 @@ def dagster_dev_cmd_fixture(stage_and_fn: Tuple[str, Callable[[], AirflowInstanc dagster_dev_module = stage_and_fn[0] if dagster_dev_module.endswith("federated_airflow_defs"): cmd = ["make", "run_federated_airflow_defs"] + elif dagster_dev_module.endswith("dbt_cloud_airflow"): + cmd = ["make", "run_dbt_cloud_defs"] else: raise ValueError(f"Unknown stage: {dagster_dev_module}") return cmd + ["-C", str(makefile_dir())] @@ -73,12 +76,25 @@ def federated_airflow_instance() -> AirflowInstance: return af_instance +def get_federated_defs() -> Definitions: + from dbt_example.dagster_defs.federated_airflow_defs import defs + + return defs + + +def get_dbt_cloud_defs() -> Definitions: + from dbt_example.dagster_defs.dbt_cloud_airflow import defs + + return defs + + @pytest.mark.parametrize( "stage_and_fn", [ ("federated_airflow_defs", federated_airflow_instance), + ("dbt_cloud_airflow", federated_airflow_instance), ], - ids=["federated_airflow_defs"], + ids=["federated_airflow_defs", "dbt_cloud_airflow"], indirect=True, ) def test_dagster_materializes( @@ -89,7 +105,12 @@ def test_dagster_materializes( ) -> None: """Test that assets can load properly, and that materializations register.""" # Attempt to run all original completed assets. - from dbt_example.dagster_defs.federated_airflow_defs import defs + if stage_and_fn[0] == "federated_airflow_defs": + defs = get_federated_defs() + elif stage_and_fn[0] == "dbt_cloud_airflow": + defs = get_dbt_cloud_defs() + else: + raise ValueError(f"Unknown stage: {stage_and_fn[0]}") assert defs.assets materializable_assets = [ @@ -105,7 +126,7 @@ def test_dagster_materializes( assert instance.get_latest_materialization_event(asset_key=spec.key) dagster_dev_module, af_instance_fn = stage_and_fn af_instance = af_instance_fn() - for dag_id in ["upload_seeds", "run_scrapers_daily"]: + for dag_id in ["upload_source_data", "run_scrapers_daily"]: run_id = af_instance.trigger_dag(dag_id=dag_id) af_instance.wait_for_run_completion(dag_id=dag_id, run_id=run_id, timeout=60) dagster_instance = DagsterInstance.get() diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/setup.py b/examples/experimental/dagster-airlift/examples/dbt-example/setup.py index c9f76ee4b0ec9..f50369549c451 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/setup.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/setup.py @@ -25,7 +25,7 @@ def get_version() -> str: f"dagster{pin}", f"dagster-webserver{pin}", f"dagster-airlift[dbt,core,in-airflow]{pin}", - f"dagster-dlift", + "dagster-dlift", "dbt-duckdb", "pandas", ], diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini b/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini index c7df0b31440ca..60302806eeb0e 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini +++ b/examples/experimental/dagster-airlift/examples/dbt-example/tox.ini @@ -7,6 +7,7 @@ passenv = CI_* COVERALLS_REPO_TOKEN BUILDKITE* + KS_DBT_CLOUD* install_command = uv pip install {opts} {packages} deps = -e ../../../../../python_modules/dagster[test] diff --git a/examples/experimental/dagster-dlift/dagster_dlift/translator.py b/examples/experimental/dagster-dlift/dagster_dlift/translator.py index 7a65e95ed111d..e4b08a380e6de 100644 --- a/examples/experimental/dagster-dlift/dagster_dlift/translator.py +++ b/examples/experimental/dagster-dlift/dagster_dlift/translator.py @@ -3,6 +3,7 @@ from typing import Any, Mapping, Sequence, Union, cast from dagster import AssetCheckResult, AssetCheckSpec, AssetMaterialization, AssetSpec +from dagster._core.storage.tags import KIND_PREFIX from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @@ -68,6 +69,7 @@ def get_model_spec(self, data: DbtCloudContentData) -> AssetSpec: return AssetSpec( key=clean_asset_name(data.properties["uniqueId"]), metadata={"raw_data": data.properties}, + tags={f"{KIND_PREFIX}dbt": ""}, ) def get_source_spec(self, data: DbtCloudContentData) -> AssetSpec: @@ -75,6 +77,7 @@ def get_source_spec(self, data: DbtCloudContentData) -> AssetSpec: return AssetSpec( key=clean_asset_name(data.properties["uniqueId"]), metadata={"raw_data": data.properties}, + tags={f"{KIND_PREFIX}dbt": ""}, ) def get_test_spec(self, data: DbtCloudContentData) -> AssetCheckSpec: