Skip to content

Commit

Permalink
fix(ingest/dbt): Prevent lineage cycles when parsing sql of dbt models (
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Oct 18, 2024
1 parent 011e5b9 commit dcf4793
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,11 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
time=mce_builder.get_sys_time(),
actor=_DEFAULT_ACTOR,
)
sibling_urn = node.get_urn(
self.config.target_platform,
self.config.env,
self.config.target_platform_instance,
)
return UpstreamLineageClass(
upstreams=[
UpstreamClass(
Expand All @@ -1997,6 +2002,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
auditStamp=auditStamp,
)
for upstream in upstream_urns
if not (node.node_type == "model" and upstream == sibling_urn)
],
fineGrainedLineages=(
(cll or None) if self.config.include_column_lineage else None
Expand Down
42 changes: 42 additions & 0 deletions metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.dbt import dbt_cloud
from datahub.ingestion.source.dbt.dbt_cloud import DBTCloudConfig
from datahub.ingestion.source.dbt.dbt_common import DBTNode
from datahub.ingestion.source.dbt.dbt_core import (
DBTCoreConfig,
DBTCoreSource,
Expand Down Expand Up @@ -253,6 +254,47 @@ def test_dbt_config_prefer_sql_parser_lineage():
assert config.prefer_sql_parser_lineage is True


def test_dbt_prefer_sql_parser_lineage_no_self_reference():
ctx = PipelineContext(run_id="test-run-id")
config = DBTCoreConfig.parse_obj(
{
**create_base_dbt_config(),
"skip_sources_in_lineage": True,
"prefer_sql_parser_lineage": True,
}
)
source: DBTCoreSource = DBTCoreSource(config, ctx, "dbt")
all_nodes_map = {
"model1": DBTNode(
name="model1",
database=None,
schema=None,
alias=None,
comment="",
description="",
language=None,
raw_code=None,
dbt_adapter="postgres",
dbt_name="model1",
dbt_file_path=None,
dbt_package_name=None,
node_type="model",
materialization="table",
max_loaded_at=None,
catalog_type=None,
missing_from_catalog=False,
owner=None,
compiled_code="SELECT d FROM results WHERE d > (SELECT MAX(d) FROM model1)",
),
}
source._infer_schemas_and_update_cll(all_nodes_map)
upstream_lineage = source._create_lineage_aspect_for_dbt_node(
all_nodes_map["model1"], all_nodes_map
)
assert upstream_lineage is not None
assert len(upstream_lineage.upstreams) == 1


def test_dbt_s3_config():
# test missing aws config
config_dict: dict = {
Expand Down

0 comments on commit dcf4793

Please sign in to comment.