Skip to content

Commit

Permalink
feat(ingest/powerbi): improve reporting around m-query parser (datahu…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 31, 2024
1 parent 4634bbc commit e609ff8
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 19 deletions.
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@

threading_timeout_common = {
"stopit==1.1.2",
# stopit uses pkg_resources internally, which means there's an implied
# dependency on setuptools.
"setuptools",
}

abs_base = {
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,6 +191,15 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
filtered_dashboards: List[str] = dataclass_field(default_factory=list)
filtered_charts: List[str] = dataclass_field(default_factory=list)

m_query_parse_timer: PerfTimer = dataclass_field(default_factory=PerfTimer)
m_query_parse_attempts: int = 0
m_query_parse_successes: int = 0
m_query_parse_timeouts: int = 0
m_query_parse_validation_errors: int = 0
m_query_parse_unexpected_character_errors: int = 0
m_query_parse_unknown_errors: int = 0
m_query_resolver_errors: int = 0

def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def get_upstream_tables(
)

try:
parse_tree: Tree = _parse_expression(table.expression)
with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(table.expression)

valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=config.native_query_parsing
Expand All @@ -87,10 +89,12 @@ def get_upstream_tables(
message="DataAccess function is not present in M-Query expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
reporter.m_query_parse_validation_errors += 1
return []
except KeyboardInterrupt:
raise
except TimeoutException:
reporter.m_query_parse_timeouts += 1
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
Expand All @@ -102,8 +106,10 @@ def get_upstream_tables(
) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
error_type = "Unexpected Character Error"
reporter.m_query_parse_unexpected_character_errors += 1
else:
error_type = "Unknown Parsing Error"
reporter.m_query_parse_unknown_errors += 1

reporter.warning(
title="Unable to extract lineage from M-Query expression",
Expand All @@ -112,10 +118,10 @@ def get_upstream_tables(
exc=e,
)
return []
reporter.m_query_parse_successes += 1

lineage: List[resolver.Lineage] = []
try:
lineage = resolver.MQueryResolver(
lineage: List[resolver.Lineage] = resolver.MQueryResolver(
table=table,
parse_tree=parse_tree,
reporter=reporter,
Expand All @@ -126,14 +132,14 @@ def get_upstream_tables(
platform_instance_resolver=platform_instance_resolver,
)

return lineage

except BaseException as e:
reporter.m_query_resolver_errors += 1
reporter.warning(
title="Unknown M-Query Pattern",
message="Encountered a unknown M-Query Expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}",
exc=e,
)

logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)

return lineage
return []
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,9 @@ def _process_invoke_expression(
)
if arg_list is None:
self.reporter.report_warning(
f"{self.table.full_name}-arg-list",
f"Argument list not found for data-access-function {data_access_func}",
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (missing argument list)",
context=f"{self.table.full_name}: argument list not found for data-access-function {data_access_func}",
)
return None

Expand All @@ -377,8 +378,9 @@ def _process_invoke_expression(
f"Function invocation without argument in expression = {invoke_expression.pretty()}"
)
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
"Function invocation without argument",
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)",
context=f"{self.table.full_name}: function invocation without argument",
)
return None

Expand All @@ -403,8 +405,9 @@ def _process_invoke_expression(
f"Either list_expression or type_expression is not found = {invoke_expression.pretty()}"
)
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
"Function argument expression is not supported",
title="M-Query Resolver Error",
message="Unable to extract lineage from parsed M-Query expression (function argument expression is not supported)",
context=f"{self.table.full_name}: function argument expression is not supported",
)
return None

Expand Down
23 changes: 21 additions & 2 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ def schema_count(self) -> int:
)[0][0]
)

def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str:
def get_urn_for_table(
self, table: _TableName, lower: bool = False, mixed: bool = False
) -> str:
# TODO: Validate that this is the correct 2/3 layer hierarchy for the platform.

table_name = ".".join(
Expand All @@ -101,7 +103,10 @@ def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str:

if lower:
table_name = table_name.lower()
platform_instance = platform_instance.lower() if platform_instance else None
if not mixed:
platform_instance = (
platform_instance.lower() if platform_instance else None
)

if self.platform == "bigquery":
# Normalize shard numbers and other BigQuery weirdness.
Expand Down Expand Up @@ -131,6 +136,20 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
if schema_info:
return urn_lower, schema_info

# Our treatment of platform instances when lowercasing urns
# is inconsistent. In some places (e.g. Snowflake), we lowercase
# the table names but not the platform instance. In other places
# (e.g. Databricks), we lowercase everything because it happens
# via the automatic lowercasing helper.
# See https://github.com/datahub-project/datahub/pull/8928.
# While we have this sort of inconsistency, we should also
# check the mixed case urn, as a last resort.
urn_mixed = self.get_urn_for_table(table, lower=True, mixed=True)
if urn_mixed not in {urn, urn_lower}:
schema_info = self._resolve_schema_info(urn_mixed)
if schema_info:
return urn_mixed, schema_info

if self._prefers_urn_lower():
return urn_lower, None
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ def _clearinghouse_worker(self) -> None: # noqa: C901
def _handle_batch_completion(
batch: List[_BatchPartitionWorkItem], future: Future
) -> None:
nonlocal workers_available
workers_available += 1

with clearinghouse_state_lock:
for item in batch:
keys_no_longer_in_flight.add(item.key)
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/utilities/perf_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
class PerfTimer(AbstractContextManager):
"""
A context manager that gives easy access to elapsed time for performance measurement.
"""

def __init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ def test_get_urn_for_table_lowercase():
== "urn:li:dataset:(urn:li:dataPlatform:mssql,uppercased-instance.database.dataset.table,PROD)"
)

assert (
schema_resolver.get_urn_for_table(table=table, lower=True, mixed=True)
== "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.database.dataset.table,PROD)"
)


def test_get_urn_for_table_not_lower_should_keep_capital_letters():
schema_resolver = SchemaResolver(
Expand Down
18 changes: 15 additions & 3 deletions metadata-ingestion/tests/unit/utilities/test_perf_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
approx = partial(pytest.approx, rel=2e-2)


def test_perf_timer_simple():
def test_perf_timer_simple() -> None:
with PerfTimer() as timer:
time.sleep(0.4)
assert approx(timer.elapsed_seconds()) == 0.4

assert approx(timer.elapsed_seconds()) == 0.4


def test_perf_timer_paused_timer():
def test_perf_timer_paused_timer() -> None:
with PerfTimer() as current_timer:
time.sleep(0.5)
assert approx(current_timer.elapsed_seconds()) == 0.5
Expand All @@ -29,7 +29,7 @@ def test_perf_timer_paused_timer():
assert approx(current_timer.elapsed_seconds()) == 0.7


def test_generator_with_paused_timer():
def test_generator_with_paused_timer() -> None:
n = 4

def generator_function():
Expand All @@ -46,3 +46,15 @@ def generator_function():
seq = generator_function()
list([i for i in seq])
assert approx(outer_timer.elapsed_seconds()) == 1 + 0.2 * n + 0.2 * n


def test_perf_timer_reuse() -> None:
timer = PerfTimer()

with timer:
time.sleep(0.2)

with timer:
time.sleep(0.3)

assert approx(timer.elapsed_seconds()) == 0.5

0 comments on commit e609ff8

Please sign in to comment.