Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Sep 26, 2024
2 parents 9322be8 + 68fb97f commit 9acc15c
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 281 deletions.
3 changes: 1 addition & 2 deletions docker/datahub-ingestion-base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ ARG DEBIAN_REPO_URL=https://deb.debian.org/debian
ARG UBUNTU_REPO_URL=http://ports.ubuntu.com/ubuntu-ports
ARG PIP_MIRROR_URL=https://pypi.python.org/simple

FROM powerman/dockerize:0.19 AS dockerize-binary

FROM ubuntu:22.04 AS base

ARG GITHUB_REPO_URL
Expand Down Expand Up @@ -44,6 +42,7 @@ RUN apt-get update && apt-get upgrade -y \
krb5-config \
libkrb5-dev \
librdkafka-dev \
git \
wget \
curl \
zip \
Expand Down
18 changes: 14 additions & 4 deletions metadata-ingestion/docs/sources/openapi/openapi_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@ source:
name: test_endpoint # this name will appear in DatHub
url: https://test_endpoint.com/
swagger_file: classicapi/doc/swagger.json # where to search for the OpenApi definitions
get_token: # optional, if you need to get an authentication token beforehand

# option 1: bearer token
bearer_token: "<token>"

# option 2: dynamically generated tokens, username/password is mandetory
get_token:
request_type: get
url: api/authentication/login?username={username}&password={password}
username: your_username # optional
password: your_password # optional
url_complement: api/authentication/login?username={username}&password={password}
username: your_username
password: your_password

# option 3: using basic auth
username: your_username
password: your_password

forced_examples: # optionals
/accounts/groupname/{name}: ['test']
/accounts/username/{name}: ['test']
Expand Down
162 changes: 111 additions & 51 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from enum import auto
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

import more_itertools
import pydantic
Expand Down Expand Up @@ -46,6 +46,7 @@
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.dbt.dbt_tests import (
DBTTest,
DBTTestResult,
Expand Down Expand Up @@ -233,6 +234,10 @@ def can_emit_node_type(self, node_type: str) -> bool:

return allowed == EmitDirective.YES

@property
def can_emit_test_definitions(self) -> bool:
return self.test_definitions == EmitDirective.YES

@property
def can_emit_test_results(self) -> bool:
return self.test_results == EmitDirective.YES
Expand Down Expand Up @@ -735,8 +740,8 @@ def get_upstreams_for_test(
all_nodes_map: Dict[str, DBTNode],
platform_instance: Optional[str],
environment: str,
) -> List[str]:
upstream_urns = []
) -> Dict[str, str]:
upstreams = {}

for upstream in test_node.upstream_nodes:
if upstream not in all_nodes_map:
Expand All @@ -747,15 +752,13 @@ def get_upstreams_for_test(

upstream_manifest_node = all_nodes_map[upstream]

upstream_urns.append(
upstream_manifest_node.get_urn(
target_platform=DBT_PLATFORM,
data_platform_instance=platform_instance,
env=environment,
)
upstreams[upstream] = upstream_manifest_node.get_urn(
target_platform=DBT_PLATFORM,
data_platform_instance=platform_instance,
env=environment,
)

return upstream_urns
return upstreams


def make_mapping_upstream_lineage(
Expand Down Expand Up @@ -892,53 +895,63 @@ def __init__(self, config: DBTCommonConfig, ctx: PipelineContext, platform: str)
def create_test_entity_mcps(
self,
test_nodes: List[DBTNode],
custom_props: Dict[str, str],
extra_custom_props: Dict[str, str],
all_nodes_map: Dict[str, DBTNode],
) -> Iterable[MetadataWorkUnit]:
for node in sorted(test_nodes, key=lambda n: n.dbt_name):
assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
{
k: v
for k, v in {
"platform": DBT_PLATFORM,
"name": node.dbt_name,
"instance": self.config.platform_instance,
**(
# Ideally we'd include the env unconditionally. However, we started out
# not including env in the guid, so we need to maintain backwards compatibility
# with existing PROD assertions.
{"env": self.config.env}
if self.config.env != mce_builder.DEFAULT_ENV
and self.config.include_env_in_assertion_guid
else {}
),
}.items()
if v is not None
}
)
)

if self.config.entities_enabled.can_emit_node_type("test"):
yield MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

upstream_urns = get_upstreams_for_test(
upstreams = get_upstreams_for_test(
test_node=node,
all_nodes_map=all_nodes_map,
platform_instance=self.config.platform_instance,
environment=self.config.env,
)

# In case a dbt test depends on multiple tables, we create separate assertions for each.
# TODO: This logic doesn't actually work properly, since we're reusing the same assertion_urn
# across multiple upstream tables, so we're actually only creating one assertion and the last
# upstream_urn gets used. Luckily, most dbt tests are associated with a single table, so this
# doesn't cause major issues in practice.
for upstream_urn in sorted(upstream_urns):
if self.config.entities_enabled.can_emit_node_type("test"):
for upstream_node_name, upstream_urn in upstreams.items():
guid_upstream_part = {}
if len(upstreams) > 1:
# If we depend on multiple upstreams, we need to generate a unique guid for each assertion.
# If there was only one upstream, we want to maintain the original assertion for backwards compatibility.
guid_upstream_part = {
"on_dbt_upstream": upstream_node_name,
}

assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
{
k: v
for k, v in {
"platform": DBT_PLATFORM,
"name": node.dbt_name,
"instance": self.config.platform_instance,
**(
# Ideally we'd include the env unconditionally. However, we started out
# not including env in the guid, so we need to maintain backwards compatibility
# with existing PROD assertions.
{"env": self.config.env}
if self.config.env != mce_builder.DEFAULT_ENV
and self.config.include_env_in_assertion_guid
else {}
),
**guid_upstream_part,
}.items()
if v is not None
}
)
)

custom_props = {
"dbt_unique_id": node.dbt_name,
"dbt_test_upstream_unique_id": upstream_node_name,
**extra_custom_props,
}

if self.config.entities_enabled.can_emit_test_definitions:
yield MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

yield make_assertion_from_test(
custom_props,
node,
Expand Down Expand Up @@ -1024,12 +1037,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
all_nodes_map,
)

def _is_allowed_node(self, key: str) -> bool:
return self.config.node_name_pattern.allowed(key)

def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
nodes = []
for node in all_nodes:
key = node.dbt_name

if not self.config.node_name_pattern.allowed(key):
if not self._is_allowed_node(key):
self.report.nodes_filtered.append(key)
continue

Expand All @@ -1041,6 +1057,36 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
def _to_schema_info(schema_fields: List[SchemaField]) -> SchemaInfo:
return {column.fieldPath: column.nativeDataType for column in schema_fields}

def _determine_cll_required_nodes(
self, all_nodes_map: Dict[str, DBTNode]
) -> Tuple[Set[str], Set[str]]:
# Based on the filter patterns, we only need to do schema inference and CLL
# for a subset of nodes.
# If a node depends on an ephemeral model, the ephemeral model should also be in the CLL list.
# Invariant: If it's in the CLL list, it will also be in the schema list.
# Invariant: The upstream of any node in the CLL list will be in the schema list.
schema_nodes: Set[str] = set()
cll_nodes: Set[str] = set()

def add_node_to_cll_list(dbt_name: str) -> None:
if dbt_name in cll_nodes:
return
for upstream in all_nodes_map[dbt_name].upstream_nodes:
schema_nodes.add(upstream)

upstream_node = all_nodes_map[upstream]
if upstream_node.is_ephemeral_model():
add_node_to_cll_list(upstream)

cll_nodes.add(dbt_name)
schema_nodes.add(dbt_name)

for dbt_name in all_nodes_map.keys():
if self._is_allowed_node(dbt_name):
add_node_to_cll_list(dbt_name)

return schema_nodes, cll_nodes

def _infer_schemas_and_update_cll( # noqa: C901
self, all_nodes_map: Dict[str, DBTNode]
) -> None:
Expand All @@ -1067,7 +1113,7 @@ def _infer_schemas_and_update_cll( # noqa: C901
)
return

graph = self.ctx.graph
graph: Optional[DataHubGraph] = self.ctx.graph

schema_resolver = SchemaResolver(
platform=self.config.target_platform,
Expand All @@ -1079,7 +1125,7 @@ def _infer_schemas_and_update_cll( # noqa: C901

# Iterate over the dbt nodes in topological order.
# This ensures that we process upstream nodes before downstream nodes.
node_order = topological_sort(
all_node_order = topological_sort(
list(all_nodes_map.keys()),
edges=list(
(upstream, node.dbt_name)
Expand All @@ -1088,7 +1134,17 @@ def _infer_schemas_and_update_cll( # noqa: C901
if upstream in all_nodes_map
),
)
for dbt_name in node_order:
schema_required_nodes, cll_required_nodes = self._determine_cll_required_nodes(
all_nodes_map
)

for dbt_name in all_node_order:
if dbt_name not in schema_required_nodes:
logger.debug(
f"Skipping {dbt_name} because it is filtered out by patterns"
)
continue

node = all_nodes_map[dbt_name]
logger.debug(f"Processing CLL/schemas for {node.dbt_name}")

Expand Down Expand Up @@ -1163,6 +1219,10 @@ def _infer_schemas_and_update_cll( # noqa: C901
# For sources, we generate CLL as a 1:1 mapping.
# We don't support CLL for tests (assertions) or seeds.
pass
elif node.dbt_name not in cll_required_nodes:
logger.debug(
f"Not generating CLL for {node.dbt_name} because we don't need it."
)
elif node.compiled_code:
# Add CTE stops based on the upstreams list.
cte_mapping = {
Expand Down
Loading

0 comments on commit 9acc15c

Please sign in to comment.