diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 6579761cc5c14..b10143565efb4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -278,6 +278,9 @@ threading_timeout_common = { "stopit==1.1.2", + # stopit uses pkg_resources internally, which means there's an implied + # dependency on setuptools. + "setuptools", } abs_base = { diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 0aec9a589cf27..09ade4c905bea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -20,6 +20,7 @@ StatefulIngestionConfigBase, ) from datahub.utilities.lossy_collections import LossyList +from datahub.utilities.perf_timer import PerfTimer logger = logging.getLogger(__name__) @@ -190,6 +191,15 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): filtered_dashboards: List[str] = dataclass_field(default_factory=list) filtered_charts: List[str] = dataclass_field(default_factory=list) + m_query_parse_timer: PerfTimer = dataclass_field(default_factory=PerfTimer) + m_query_parse_attempts: int = 0 + m_query_parse_successes: int = 0 + m_query_parse_timeouts: int = 0 + m_query_parse_validation_errors: int = 0 + m_query_parse_unexpected_character_errors: int = 0 + m_query_parse_unknown_errors: int = 0 + m_query_resolver_errors: int = 0 + def report_dashboards_scanned(self, count: int = 1) -> None: self.dashboards_scanned += count diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 086ce2c263b0c..a61ad8d289b9d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -74,7 +74,9 @@ def get_upstream_tables( ) try: - parse_tree: Tree = _parse_expression(table.expression) + with reporter.m_query_parse_timer: + reporter.m_query_parse_attempts += 1 + parse_tree: Tree = _parse_expression(table.expression) valid, message = validator.validate_parse_tree( parse_tree, native_query_enabled=config.native_query_parsing @@ -87,10 +89,12 @@ def get_upstream_tables( message="DataAccess function is not present in M-Query expression", context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}", ) + reporter.m_query_parse_validation_errors += 1 return [] except KeyboardInterrupt: raise except TimeoutException: + reporter.m_query_parse_timeouts += 1 reporter.warning( title="M-Query Parsing Timeout", message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", @@ -102,8 +106,10 @@ def get_upstream_tables( ) as e: # TODO: Debug why BaseException is needed here and below. if isinstance(e, lark.exceptions.UnexpectedCharacters): error_type = "Unexpected Character Error" + reporter.m_query_parse_unexpected_character_errors += 1 else: error_type = "Unknown Parsing Error" + reporter.m_query_parse_unknown_errors += 1 reporter.warning( title="Unable to extract lineage from M-Query expression", @@ -112,10 +118,10 @@ def get_upstream_tables( exc=e, ) return [] + reporter.m_query_parse_successes += 1 - lineage: List[resolver.Lineage] = [] try: - lineage = resolver.MQueryResolver( + lineage: List[resolver.Lineage] = resolver.MQueryResolver( table=table, parse_tree=parse_tree, reporter=reporter, @@ -126,14 +132,14 @@ def get_upstream_tables( platform_instance_resolver=platform_instance_resolver, ) + return lineage + except BaseException as e: + reporter.m_query_resolver_errors += 1 reporter.warning( title="Unknown M-Query Pattern", message="Encountered a unknown M-Query Expression", context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}", exc=e, ) - - logger.debug(f"Stack trace for {table.full_name}:", exc_info=e) - - return lineage + return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index a5fb6fd2673ac..cc51fcee14104 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -356,8 +356,9 @@ def _process_invoke_expression( ) if arg_list is None: self.reporter.report_warning( - f"{self.table.full_name}-arg-list", - f"Argument list not found for data-access-function {data_access_func}", + title="M-Query Resolver Error", + message="Unable to extract lineage from parsed M-Query expression (missing argument list)", + context=f"{self.table.full_name}: argument list not found for data-access-function {data_access_func}", ) return None @@ -377,8 +378,9 @@ def _process_invoke_expression( f"Function invocation without argument in expression = {invoke_expression.pretty()}" ) self.reporter.report_warning( - f"{self.table.full_name}-variable-statement", - "Function invocation without argument", + title="M-Query Resolver Error", + message="Unable to extract lineage from parsed M-Query expression (function invocation without argument)", + context=f"{self.table.full_name}: function invocation without argument", ) return None @@ -403,8 +405,9 @@ def _process_invoke_expression( f"Either list_expression or type_expression is not found = {invoke_expression.pretty()}" ) self.reporter.report_warning( - f"{self.table.full_name}-variable-statement", - "Function argument expression is not supported", + title="M-Query Resolver Error", + message="Unable to extract lineage from parsed M-Query expression (function argument expression is not supported)", + context=f"{self.table.full_name}: function argument expression is not supported", ) return None diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index c358bee6daae8..e7b0527d30d97 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -90,7 +90,9 @@ def schema_count(self) -> int: )[0][0] ) - def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str: + def get_urn_for_table( + self, table: _TableName, lower: bool = False, mixed: bool = False + ) -> str: # TODO: Validate that this is the correct 2/3 layer hierarchy for the platform. table_name = ".".join( @@ -101,7 +103,10 @@ def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str: if lower: table_name = table_name.lower() - platform_instance = platform_instance.lower() if platform_instance else None + if not mixed: + platform_instance = ( + platform_instance.lower() if platform_instance else None + ) if self.platform == "bigquery": # Normalize shard numbers and other BigQuery weirdness. @@ -131,6 +136,20 @@ def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: if schema_info: return urn_lower, schema_info + # Our treatment of platform instances when lowercasing urns + # is inconsistent. In some places (e.g. Snowflake), we lowercase + # the table names but not the platform instance. In other places + # (e.g. Databricks), we lowercase everything because it happens + # via the automatic lowercasing helper. + # See https://github.com/datahub-project/datahub/pull/8928. + # While we have this sort of inconsistency, we should also + # check the mixed case urn, as a last resort. + urn_mixed = self.get_urn_for_table(table, lower=True, mixed=True) + if urn_mixed not in {urn, urn_lower}: + schema_info = self._resolve_schema_info(urn_mixed) + if schema_info: + return urn_mixed, schema_info + if self._prefers_urn_lower(): return urn_lower, None else: diff --git a/metadata-ingestion/src/datahub/utilities/partition_executor.py b/metadata-ingestion/src/datahub/utilities/partition_executor.py index 92413eeb674f5..237ffc6dc611b 100644 --- a/metadata-ingestion/src/datahub/utilities/partition_executor.py +++ b/metadata-ingestion/src/datahub/utilities/partition_executor.py @@ -294,6 +294,9 @@ def _clearinghouse_worker(self) -> None: # noqa: C901 def _handle_batch_completion( batch: List[_BatchPartitionWorkItem], future: Future ) -> None: + nonlocal workers_available + workers_available += 1 + with clearinghouse_state_lock: for item in batch: keys_no_longer_in_flight.add(item.key) diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 18384420bfefb..9488683d6d8ca 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -9,7 +9,6 @@ class PerfTimer(AbstractContextManager): """ A context manager that gives easy access to elapsed time for performance measurement. - """ def __init__(self) -> None: diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py index 5a33034f274dc..e5fa980bec452 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py @@ -41,6 +41,11 @@ def test_get_urn_for_table_lowercase(): == "urn:li:dataset:(urn:li:dataPlatform:mssql,uppercased-instance.database.dataset.table,PROD)" ) + assert ( + schema_resolver.get_urn_for_table(table=table, lower=True, mixed=True) + == "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.database.dataset.table,PROD)" + ) + def test_get_urn_for_table_not_lower_should_keep_capital_letters(): schema_resolver = SchemaResolver( diff --git a/metadata-ingestion/tests/unit/utilities/test_perf_timer.py b/metadata-ingestion/tests/unit/utilities/test_perf_timer.py index 1de76a32fb708..166e40ef54308 100644 --- a/metadata-ingestion/tests/unit/utilities/test_perf_timer.py +++ b/metadata-ingestion/tests/unit/utilities/test_perf_timer.py @@ -8,7 +8,7 @@ approx = partial(pytest.approx, rel=2e-2) -def test_perf_timer_simple(): +def test_perf_timer_simple() -> None: with PerfTimer() as timer: time.sleep(0.4) assert approx(timer.elapsed_seconds()) == 0.4 @@ -16,7 +16,7 @@ def test_perf_timer_simple(): assert approx(timer.elapsed_seconds()) == 0.4 -def test_perf_timer_paused_timer(): +def test_perf_timer_paused_timer() -> None: with PerfTimer() as current_timer: time.sleep(0.5) assert approx(current_timer.elapsed_seconds()) == 0.5 @@ -29,7 +29,7 @@ def test_perf_timer_paused_timer(): assert approx(current_timer.elapsed_seconds()) == 0.7 -def test_generator_with_paused_timer(): +def test_generator_with_paused_timer() -> None: n = 4 def generator_function(): @@ -46,3 +46,15 @@ def generator_function(): seq = generator_function() list([i for i in seq]) assert approx(outer_timer.elapsed_seconds()) == 1 + 0.2 * n + 0.2 * n + + +def test_perf_timer_reuse() -> None: + timer = PerfTimer() + + with timer: + time.sleep(0.2) + + with timer: + time.sleep(0.3) + + assert approx(timer.elapsed_seconds()) == 0.5