Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] [federation-demo] dbt cloud side #25536

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ run_complete:
run_federated_airflow_defs:
dagster dev -m dbt_example.dagster_defs.federated_airflow_defs -p 3333

run_dbt_cloud_defs:
dagster dev -m dbt_example.dagster_defs.dbt_cloud_airflow -p 3333

wipe: ## Wipe out all the files created by the Makefile
rm -rf $(AIRFLOW_HOME) $(DAGSTER_HOME)
rm -rf $(AIRFLOW_HOME) $(FEDERATED_AIRFLOW_HOME) $(DAGSTER_HOME)

wipe_dagster: ## Wipe out all the files created by the Makefile
rm -rf $$DAGSTER_HOME
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
ASSETS_PATH = Path(__file__).parent / "defs"
PROXIED_STATE_PATH = Path(__file__).parent / "proxied_state"
DBT_DAG_ASSET_KEY = AssetKey([AIRFLOW_INSTANCE_NAME, "dag", "dbt_dag"])
UPLOAD_SOURCE_DATA_ASSET_KEY = AssetKey([FEDERATED_INSTANCE_NAME, "dag", "upload_source_data"])
DBT_MODEL_TO_DAG = {"model.test_environment.cleaned_corpus": UPLOAD_SOURCE_DATA_ASSET_KEY}


def dbt_project_path() -> Path:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dagster import Definitions, ScheduleDefinition

from dbt_example.dagster_defs.lakehouse import lakehouse_assets_def, lakehouse_existence_check
from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS

from .dbt_cloud_assets import get_dbt_cloud_assets
from .federated_airflow import get_federated_airflow_assets, get_federated_airflow_sensor
from .jaffle_shop import jaffle_shop_assets, jaffle_shop_resource

daily_schedule = ScheduleDefinition(name="daily_schedule", cron_schedule="0 0 * * *", target="*")

federated_airflow_dags = get_federated_airflow_assets()
linked_dbt_cloud_assets = get_dbt_cloud_assets()
defs = Definitions(
assets=[
lakehouse_assets_def(csv_path=CSV_PATH, duckdb_path=DB_PATH, columns=IRIS_COLUMNS),
jaffle_shop_assets,
*federated_airflow_dags,
linked_dbt_cloud_assets,
],
asset_checks=[lakehouse_existence_check(csv_path=CSV_PATH, duckdb_path=DB_PATH)],
schedules=[daily_schedule],
sensors=[get_federated_airflow_sensor()],
resources={"dbt": jaffle_shop_resource()},
)
Original file line number Diff line number Diff line change
@@ -1 +1,34 @@
from dagster_dlift.project import DBTCloudProjectEnvironment, DbtCloudCredentials
from dagster import AssetsDefinition, multi_asset

from .constants import DBT_MODEL_TO_DAG
from .dbt_cloud_utils import (
EXPECTED_TAG,
add_deps,
eager,
filter_specs_by_tag,
get_project,
relevant_check_specs,
)


def get_dbt_cloud_assets() -> AssetsDefinition:
# dags that are upstream of models.
dbt_cloud_project = get_project()
filtered_specs = filter_specs_by_tag(dbt_cloud_project.get_asset_specs(), EXPECTED_TAG)
specs_with_airflow_deps = add_deps(DBT_MODEL_TO_DAG, filtered_specs)
# Dbt cloud assets will run every time the upstream dags run.
specs_with_eager_automation = eager(specs_with_airflow_deps)

@multi_asset(
specs=specs_with_eager_automation,
check_specs=relevant_check_specs(
specs_with_eager_automation, dbt_cloud_project.get_check_specs()
),
)
def _dbt_cloud_assets():
client = dbt_cloud_project.get_client()
run = client.cli(["build", "--select", f"tag:{EXPECTED_TAG}"])
run.wait_for_success()
yield from run.get_asset_events()

return _dbt_cloud_assets
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Dict, Sequence

from dagster import (
AssetCheckSpec,
AssetKey,
AssetSpec,
_check as check,
)
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster_dlift.project import DbtCloudClient, DbtCloudCredentials, DBTCloudProjectEnvironment
from dagster_dlift.test.utils import get_env_var

ENV_NAME = "test"
EXPECTED_TAG = "demo"


def get_unscoped_client() -> DbtCloudClient:
return DbtCloudClient(
account_id=int(get_env_var("KS_DBT_CLOUD_ACCOUNT_ID")),
token=get_env_var("KS_DBT_CLOUD_TOKEN"),
access_url=get_env_var("KS_DBT_CLOUD_ACCESS_URL"),
discovery_api_url=get_env_var("KS_DBT_CLOUD_DISCOVERY_API_URL"),
)


def get_environment_id() -> int:
return get_unscoped_client().get_environment_id_by_name(ENV_NAME)


def get_project_id() -> int:
return int(get_env_var("KS_DBT_CLOUD_PROJECT_ID"))


def get_project() -> DBTCloudProjectEnvironment:
return DBTCloudProjectEnvironment(
credentials=DbtCloudCredentials(
account_id=int(get_env_var("KS_DBT_CLOUD_ACCOUNT_ID")),
token=get_env_var("KS_DBT_CLOUD_TOKEN"),
access_url=get_env_var("KS_DBT_CLOUD_ACCESS_URL"),
discovery_api_url=get_env_var("KS_DBT_CLOUD_DISCOVERY_API_URL"),
),
project_id=get_project_id(),
environment_id=get_environment_id(),
)


def filter_specs_by_tag(specs: Sequence[AssetSpec], tag: str) -> Dict[AssetKey, AssetSpec]:
return {
spec.key: spec for spec in specs if tag in check.not_none(spec.metadata)["raw_data"]["tags"]
}


def add_dep_to_spec(spec: AssetSpec, dep: AssetKey) -> AssetSpec:
return spec._replace(deps=[*spec.deps, AssetDep(dep)])


def key_for_uid(specs: Sequence[AssetSpec], uid: str) -> AssetKey:
return next(spec.key for spec in specs if spec.metadata["raw_data"]["uniqueId"] == uid)


def relevant_check_specs(
specs: Sequence[AssetSpec], check_specs: Sequence[AssetCheckSpec]
) -> Sequence[AssetCheckSpec]:
spec_map = {spec.key: spec for spec in specs}
return [spec for spec in check_specs if spec.key.asset_key in spec_map]


def add_deps(
uid_to_dep_mapping: Dict[str, AssetKey], specs: Dict[AssetKey, AssetSpec]
) -> Sequence[AssetSpec]:
specs = dict(specs)
for uid, dep in uid_to_dep_mapping.items():
specs[key_for_uid(list(specs.values()), uid)] = add_dep_to_spec(
specs[key_for_uid(list(specs.values()), uid)], dep
)
return list(specs.values())


def eager(specs: Sequence[AssetSpec]) -> Sequence[AssetSpec]:
return [spec._replace(automation_condition=AutomationCondition.eager()) for spec in specs]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
}

upload_seeds_dag = DAG(
dag_id="upload_seeds",
dag_id="upload_source_data",
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
from dagster import AssetKey, AssetsDefinition, DagsterInstance, materialize
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.test_utils import environ
from dagster._time import get_current_datetime
from dagster_airlift.core import AirflowInstance
Expand Down Expand Up @@ -46,6 +47,8 @@ def dagster_dev_cmd_fixture(stage_and_fn: Tuple[str, Callable[[], AirflowInstanc
dagster_dev_module = stage_and_fn[0]
if dagster_dev_module.endswith("federated_airflow_defs"):
cmd = ["make", "run_federated_airflow_defs"]
elif dagster_dev_module.endswith("dbt_cloud_airflow"):
cmd = ["make", "run_dbt_cloud_defs"]
else:
raise ValueError(f"Unknown stage: {dagster_dev_module}")
return cmd + ["-C", str(makefile_dir())]
Expand Down Expand Up @@ -73,12 +76,25 @@ def federated_airflow_instance() -> AirflowInstance:
return af_instance


def get_federated_defs() -> Definitions:
from dbt_example.dagster_defs.federated_airflow_defs import defs

return defs


def get_dbt_cloud_defs() -> Definitions:
from dbt_example.dagster_defs.dbt_cloud_airflow import defs

return defs


@pytest.mark.parametrize(
"stage_and_fn",
[
("federated_airflow_defs", federated_airflow_instance),
("dbt_cloud_airflow", federated_airflow_instance),
],
ids=["federated_airflow_defs"],
ids=["federated_airflow_defs", "dbt_cloud_airflow"],
indirect=True,
)
def test_dagster_materializes(
Expand All @@ -89,7 +105,12 @@ def test_dagster_materializes(
) -> None:
"""Test that assets can load properly, and that materializations register."""
# Attempt to run all original completed assets.
from dbt_example.dagster_defs.federated_airflow_defs import defs
if stage_and_fn[0] == "federated_airflow_defs":
defs = get_federated_defs()
elif stage_and_fn[0] == "dbt_cloud_airflow":
defs = get_dbt_cloud_defs()
else:
raise ValueError(f"Unknown stage: {stage_and_fn[0]}")

assert defs.assets
materializable_assets = [
Expand All @@ -105,7 +126,7 @@ def test_dagster_materializes(
assert instance.get_latest_materialization_event(asset_key=spec.key)
dagster_dev_module, af_instance_fn = stage_and_fn
af_instance = af_instance_fn()
for dag_id in ["upload_seeds", "run_scrapers_daily"]:
for dag_id in ["upload_source_data", "run_scrapers_daily"]:
run_id = af_instance.trigger_dag(dag_id=dag_id)
af_instance.wait_for_run_completion(dag_id=dag_id, run_id=run_id, timeout=60)
dagster_instance = DagsterInstance.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_version() -> str:
f"dagster{pin}",
f"dagster-webserver{pin}",
f"dagster-airlift[dbt,core,in-airflow]{pin}",
f"dagster-dlift",
"dagster-dlift",
"dbt-duckdb",
"pandas",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ passenv =
CI_*
COVERALLS_REPO_TOKEN
BUILDKITE*
KS_DBT_CLOUD*
install_command = uv pip install {opts} {packages}
deps =
-e ../../../../../python_modules/dagster[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Mapping, Sequence, Union, cast

from dagster import AssetCheckResult, AssetCheckSpec, AssetMaterialization, AssetSpec
from dagster._core.storage.tags import KIND_PREFIX
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes

Expand Down Expand Up @@ -68,13 +69,15 @@ def get_model_spec(self, data: DbtCloudContentData) -> AssetSpec:
return AssetSpec(
key=clean_asset_name(data.properties["uniqueId"]),
metadata={"raw_data": data.properties},
tags={f"{KIND_PREFIX}dbt": ""},
)

def get_source_spec(self, data: DbtCloudContentData) -> AssetSpec:
# This is obviously a placeholder implementation
return AssetSpec(
key=clean_asset_name(data.properties["uniqueId"]),
metadata={"raw_data": data.properties},
tags={f"{KIND_PREFIX}dbt": ""},
)

def get_test_spec(self, data: DbtCloudContentData) -> AssetCheckSpec:
Expand Down