Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 1, 2024
2 parents 3c721c6 + e609ff8 commit 0e13c2e
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 55 deletions.
5 changes: 4 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@

looker_common = {
# Looker Python SDK
"looker-sdk==23.0.0",
"looker-sdk>=23.0.0",
# This version of lkml contains a fix for parsing lists in
# LookML files with spaces between an item and the following comma.
# See https://github.com/joshtemple/lkml/issues/73.
Expand Down Expand Up @@ -278,6 +278,9 @@

threading_timeout_common = {
"stopit==1.1.2",
# stopit uses pkg_resources internally, which means there's an implied
# dependency on setuptools.
"setuptools",
}

abs_base = {
Expand Down
113 changes: 85 additions & 28 deletions metadata-ingestion/src/datahub/api/entities/forms/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
)
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
make_domain_urn,
make_group_urn,
make_tag_urn,
make_term_urn,
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -36,6 +40,16 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

FILTER_CRITERION_TYPES = "_entityType"
FILTER_CRITERION_SUB_TYPES = "typeNames.keyword"
FILTER_CRITERION_PLATFORMS = "platform.keyword"
FILTER_CRITERION_PLATFORM_INSTANCES = "dataPlatformInstance.keyword"
FILTER_CRITERION_DOMAINS = "domains.keyword"
FILTER_CRITERION_CONTAINERS = "container.keyword"
FILTER_CRITERION_OWNERS = "owners.keyword"
FILTER_CRITERION_TAGS = "tags.keyword"
FILTER_CRITERION_GLOSSARY_TERMS = "glossaryTerms.keyword"


class PromptType(Enum):
STRUCTURED_PROPERTY = "STRUCTURED_PROPERTY"
Expand Down Expand Up @@ -73,9 +87,14 @@ def has_value(cls, value):

class Filters(ConfigModel):
types: Optional[List[str]] = None
sub_types: Optional[List[str]] = None
platforms: Optional[List[str]] = None
platform_instances: Optional[List[str]] = None
domains: Optional[List[str]] = None
containers: Optional[List[str]] = None
owners: Optional[List[str]] = None
tags: Optional[List[str]] = None
terms: Optional[List[str]] = None


class Entities(ConfigModel):
Expand Down Expand Up @@ -105,7 +124,8 @@ class Forms(ConfigModel):
@validator("urn", pre=True, always=True)
def urn_must_be_present(cls, v, values):
if not v:
assert values.get("id") is not None, "Form id must be present if urn is not"
if values.get("id") is None:
raise ValueError("Form id must be present if urn is not")
return f"urn:li:form:{values['id']}"
return v

Expand Down Expand Up @@ -249,36 +269,78 @@ def create_form_filters(self, emitter: DataHubGraph) -> Union[None, Exception]:
# Loop through each entity and assign a filter for it
if self.entities and self.entities.filters:
filters = self.entities.filters

if filters.types:
filters_raw.append(
Forms.format_form_filter("_entityType", filters.types)
Forms.format_form_filter(FILTER_CRITERION_TYPES, filters.types)
)

if filters.sub_types:
filters_raw.append(
Forms.format_form_filter(
FILTER_CRITERION_SUB_TYPES, filters.sub_types
)
)

if filters.platforms:
urns = [
make_data_platform_urn(platform) for platform in filters.platforms
]
filters_raw.append(Forms.format_form_filter("platform", urns))
if filters.domains:
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_PLATFORMS, urns)
)

if filters.platform_instances:
urns = []
for domain in filters.domains:
domain_urn = Forms.validate_domain_urn(domain)
if domain_urn:
urns.append(domain_urn)
filters_raw.append(Forms.format_form_filter("domains", urns))
for platform_instance in filters.platform_instances:
platform_instance_urn = Forms.validate_platform_instance_urn(
platform_instance
)
if platform_instance_urn:
urns.append(platform_instance_urn)
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_PLATFORM_INSTANCES, urns)
)

if filters.domains:
urns = [make_domain_urn(domain) for domain in filters.domains]
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_DOMAINS, urns)
)

if filters.containers:
urns = []
for container in filters.containers:
container_urn = Forms.validate_container_urn(container)
if container_urn:
urns.append(container_urn)
filters_raw.append(Forms.format_form_filter("container", urns))
urns = [
make_container_urn(container) for container in filters.containers
]
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_CONTAINERS, urns)
)

if filters.owners:
urns = [make_user_urn(owner) for owner in filters.owners]
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_OWNERS, urns)
)

if filters.tags:
urns = [make_tag_urn(tag) for tag in filters.tags]
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_TAGS, urns)
)

if filters.terms:
urns = [make_term_urn(term) for term in filters.terms]
filters_raw.append(
Forms.format_form_filter(FILTER_CRITERION_GLOSSARY_TERMS, urns)
)

filters_str = ", ".join(item for item in filters_raw)
result = emitter.execute_graphql(
query=CREATE_DYNAMIC_FORM_ASSIGNMENT.format(
form_urn=self.urn, filters=filters_str
)
)

if not result:
return Exception(
f"Could not bulk upload urns or filters for form {self.urn}."
Expand Down Expand Up @@ -314,25 +376,20 @@ def format_form_filter(field: str, urns: List[str]) -> str:
return FIELD_FILTER_TEMPLATE.format(field=field, values=formatted_urns)

@staticmethod
def validate_domain_urn(domain: str) -> Union[str, None]:
if domain.startswith("urn:li:domain:"):
return domain
def validate_platform_instance_urn(instance: str) -> Union[str, None]:
if instance.startswith("urn:li:dataPlatformInstance:"):
return instance

logger.warning(f"{domain} is not an urn. Unable to create domain filter.")
return None

@staticmethod
def validate_container_urn(container: str) -> Union[str, None]:
if container.startswith("urn:li:container:"):
return container

logger.warning(f"{container} is not an urn. Unable to create container filter.")
logger.warning(
f"{instance} is not an urn. Unable to create platform instance filter."
)
return None

@staticmethod
def from_datahub(graph: DataHubGraph, urn: str) -> "Forms":
form: Optional[FormInfoClass] = graph.get_aspect(urn, FormInfoClass)
assert form is not None
if form is None:
raise Exception("FormInfo aspect is None. Unable to create form.")
prompts = []
for prompt_raw in form.prompts:
prompts.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ def fqn(self) -> str:
@validator("urn", pre=True, always=True)
def urn_must_be_present(cls, v, values):
if not v:
assert "id" in values, "id must be present if urn is not"
if "id" not in values:
raise ValueError("id must be present if urn is not")
return f"urn:li:structuredProperty:{values['id']}"
return v

Expand Down Expand Up @@ -154,7 +155,10 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "StructuredProperties":
structured_property: Optional[
StructuredPropertyDefinitionClass
] = graph.get_aspect(urn, StructuredPropertyDefinitionClass)
assert structured_property is not None
if structured_property is None:
raise Exception(
"StructuredPropertyDefinition aspect is None. Unable to create structured property."
)
return StructuredProperties(
urn=urn,
qualified_name=structured_property.qualifiedName,
Expand Down
12 changes: 8 additions & 4 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,15 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str:


def make_container_urn(guid: Union[str, "DatahubKey"]) -> str:
from datahub.emitter.mcp_builder import DatahubKey
if isinstance(guid, str) and guid.startswith("urn:li:container"):
return guid
else:
from datahub.emitter.mcp_builder import DatahubKey

if isinstance(guid, DatahubKey):
guid = guid.guid()

if isinstance(guid, DatahubKey):
guid = guid.guid()
return f"urn:li:container:{guid}"
return f"urn:li:container:{guid}"


def container_urn_to_key(guid: str) -> Optional[ContainerKeyClass]:
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,6 +191,15 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
filtered_dashboards: List[str] = dataclass_field(default_factory=list)
filtered_charts: List[str] = dataclass_field(default_factory=list)

m_query_parse_timer: PerfTimer = dataclass_field(default_factory=PerfTimer)
m_query_parse_attempts: int = 0
m_query_parse_successes: int = 0
m_query_parse_timeouts: int = 0
m_query_parse_validation_errors: int = 0
m_query_parse_unexpected_character_errors: int = 0
m_query_parse_unknown_errors: int = 0
m_query_resolver_errors: int = 0

def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def get_upstream_tables(
)

try:
parse_tree: Tree = _parse_expression(table.expression)
with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(table.expression)

valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=config.native_query_parsing
Expand All @@ -87,10 +89,12 @@ def get_upstream_tables(
message="DataAccess function is not present in M-Query expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
reporter.m_query_parse_validation_errors += 1
return []
except KeyboardInterrupt:
raise
except TimeoutException:
reporter.m_query_parse_timeouts += 1
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
Expand All @@ -102,8 +106,10 @@ def get_upstream_tables(
) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
error_type = "Unexpected Character Error"
reporter.m_query_parse_unexpected_character_errors += 1
else:
error_type = "Unknown Parsing Error"
reporter.m_query_parse_unknown_errors += 1

reporter.warning(
title="Unable to extract lineage from M-Query expression",
Expand All @@ -112,10 +118,10 @@ def get_upstream_tables(
exc=e,
)
return []
reporter.m_query_parse_successes += 1

lineage: List[resolver.Lineage] = []
try:
lineage = resolver.MQueryResolver(
lineage: List[resolver.Lineage] = resolver.MQueryResolver(
table=table,
parse_tree=parse_tree,
reporter=reporter,
Expand All @@ -126,14 +132,14 @@ def get_upstream_tables(
platform_instance_resolver=platform_instance_resolver,
)

return lineage

except BaseException as e:
reporter.m_query_resolver_errors += 1
reporter.warning(
title="Unknown M-Query Pattern",
message="Encountered a unknown M-Query Expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}",
exc=e,
)

logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)

return lineage
return []
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,9 @@ def _process_invoke_expression(
)
if arg_list is None:
self.reporter.report_warning(
f"{self.table.full_name}-arg-list",
f"Argument list not found for data-access-function {data_access_func}",
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (missing argument list)",
context=f"{self.table.full_name}: argument list not found for data-access-function {data_access_func}",
)
return None

Expand All @@ -377,8 +378,9 @@ def _process_invoke_expression(
f"Function invocation without argument in expression = {invoke_expression.pretty()}"
)
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
"Function invocation without argument",
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)",
context=f"{self.table.full_name}: function invocation without argument",
)
return None

Expand All @@ -403,8 +405,9 @@ def _process_invoke_expression(
f"Either list_expression or type_expression is not found = {invoke_expression.pretty()}"
)
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
"Function argument expression is not supported",
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (function argument expression is not supported)",
context=f"{self.table.full_name}: function argument expression is not supported",
)
return None

Expand Down
Loading

0 comments on commit 0e13c2e

Please sign in to comment.