From 29f2142a2c128f7f165f9011eff3bc647ae92185 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Wed, 3 Jan 2024 09:48:43 +0530 Subject: [PATCH] feat(databricks): add hive metastore analyze profiling (#9511) --- metadata-ingestion/setup.py | 4 +- .../ingestion/source/sql/sql_config.py | 8 +- .../datahub/ingestion/source/unity/config.py | 16 +- .../source/unity/hive_metastore_proxy.py | 109 ++++++++- .../datahub/ingestion/source/unity/proxy.py | 125 +++++++--- .../ingestion/source/unity/proxy_profiling.py | 50 ++-- .../ingestion/source/unity/proxy_types.py | 24 +- .../datahub/ingestion/source/unity/report.py | 6 + .../datahub/ingestion/source/unity/source.py | 27 ++- .../datahub/ingestion/source/unity/usage.py | 5 +- .../unity/test_unity_catalog_ingest.py | 104 +++++++- .../unity/unity_catalog_mces_golden.json | 228 ++++++++++++++++-- 12 files changed, 600 insertions(+), 106 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 8e4791e253c7c..10db019b51381 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -251,9 +251,7 @@ databricks = { # 0.1.11 appears to have authentication issues with azure databricks - # 0.16.0 added py.typed support which caused mypy to fail. The databricks sdk is pinned until we resolve mypy issues. - # https://github.com/databricks/databricks-sdk-py/pull/483 - "databricks-sdk>=0.9.0,<0.16.0", + "databricks-sdk>=0.9.0", "pyspark~=3.3.0", "requests", # Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py index 54edab6f3b84b..c0dc70301ba34 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py @@ -112,7 +112,13 @@ def ensure_profiling_pattern_is_passed_to_profiling( cls, values: Dict[str, Any] ) -> Dict[str, Any]: profiling: Optional[GEProfilingConfig] = values.get("profiling") - if profiling is not None and profiling.enabled: + # Note: isinstance() check is required here as unity-catalog source reuses + # SQLCommonConfig with different profiling config than GEProfilingConfig + if ( + profiling is not None + and isinstance(profiling, GEProfilingConfig) + and profiling.enabled + ): profiling._allow_deny_patterns = values["profile_pattern"] return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 96971faeea69f..df36153af9d83 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -95,14 +95,6 @@ class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig): description="Number of worker threads to use for profiling. Set to 1 to disable.", ) - @pydantic.root_validator(skip_on_failure=True) - def warehouse_id_required_for_profiling( - cls, values: Dict[str, Any] - ) -> Dict[str, Any]: - if values.get("enabled") and not values.get("warehouse_id"): - raise ValueError("warehouse_id must be set when profiling is enabled.") - return values - @property def include_columns(self): return not self.profile_table_level_only @@ -254,6 +246,7 @@ class UnityCatalogSourceConfig( description="Generate usage statistics.", ) + # TODO: Remove `type:ignore` by refactoring config profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore default=UnityCatalogGEProfilerConfig(), description="Data profiling configuration", @@ -316,7 +309,9 @@ def include_metastore_warning(cls, v: bool) -> bool: @pydantic.root_validator(skip_on_failure=True) def set_warehouse_id_from_profiling(cls, values: Dict[str, Any]) -> Dict[str, Any]: - profiling: Optional[UnityCatalogProfilerConfig] = values.get("profiling") + profiling: Optional[ + Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] + ] = values.get("profiling") if not values.get("warehouse_id") and profiling and profiling.warehouse_id: values["warehouse_id"] = profiling.warehouse_id if ( @@ -337,6 +332,9 @@ def set_warehouse_id_from_profiling(cls, values: Dict[str, Any]) -> Dict[str, An if values.get("warehouse_id") and profiling and not profiling.warehouse_id: profiling.warehouse_id = values["warehouse_id"] + if profiling and profiling.enabled and not profiling.warehouse_id: + raise ValueError("warehouse_id must be set when profiling is enabled.") + return values @pydantic.validator("schema_pattern", always=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index 99b2ff998662c..814d86a2f3234 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -12,11 +12,14 @@ from datahub.ingestion.source.unity.proxy_types import ( Catalog, Column, + ColumnProfile, CustomCatalogType, HiveTableType, Metastore, Schema, Table, + TableProfile, + TableReference, ) logger = logging.getLogger(__name__) @@ -38,6 +41,18 @@ "binary": ColumnTypeName.BINARY, } +NUM_NULLS = "num_nulls" +DISTINCT_COUNT = "distinct_count" +MIN = "min" +MAX = "max" +AVG_COL_LEN = "avg_col_len" +MAX_COL_LEN = "max_col_len" +VERSION = "version" + +ROWS = "rows" +BYTES = "bytes" +TABLE_STAT_LIST = {ROWS, BYTES} + class HiveMetastoreProxy(Closeable): # TODO: Support for view lineage using SQL parsing @@ -67,7 +82,7 @@ def get_inspector(sqlalchemy_url: str, options: dict) -> Inspector: def hive_metastore_catalog(self, metastore: Optional[Metastore]) -> Catalog: return Catalog( - id=HIVE_METASTORE, + id=f"{metastore.id}.{HIVE_METASTORE}" if metastore else HIVE_METASTORE, name=HIVE_METASTORE, comment=None, metastore=metastore, @@ -95,9 +110,14 @@ def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]: continue yield self._get_table(schema, table_name, False) - def _get_table(self, schema: Schema, table_name: str, is_view: bool) -> Table: + def _get_table( + self, + schema: Schema, + table_name: str, + is_view: bool = False, + ) -> Table: columns = self._get_columns(schema, table_name) - detailed_info = self._get_table_info(schema, table_name) + detailed_info = self._get_table_info(schema.name, table_name) comment = detailed_info.pop("Comment", None) storage_location = detailed_info.pop("Location", None) @@ -129,6 +149,74 @@ def _get_table(self, schema: Schema, table_name: str, is_view: bool) -> Table: comment=comment, ) + def get_table_profile( + self, ref: TableReference, include_column_stats: bool = False + ) -> TableProfile: + columns = self._get_columns( + Schema( + id=ref.schema, + name=ref.schema, + # This is okay, as none of this is used in profiling + catalog=self.hive_metastore_catalog(None), + comment=None, + owner=None, + ), + ref.table, + ) + detailed_info = self._get_table_info(ref.schema, ref.table) + + table_stats = ( + self._get_cached_table_statistics(detailed_info["Statistics"]) + if detailed_info.get("Statistics") + else {} + ) + + return TableProfile( + num_rows=int(table_stats[ROWS]) + if table_stats.get(ROWS) is not None + else None, + total_size=int(table_stats[BYTES]) + if table_stats.get(BYTES) is not None + else None, + num_columns=len(columns), + column_profiles=[ + self._get_column_profile(column.name, ref) for column in columns + ] + if include_column_stats + else [], + ) + + def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile: + + props = self._column_describe_extended(ref.schema, ref.table, column) + col_stats = {} + for prop in props: + col_stats[prop[0]] = prop[1] + return ColumnProfile( + name=column, + null_count=int(col_stats[NUM_NULLS]) + if col_stats.get(NUM_NULLS) is not None + else None, + distinct_count=int(col_stats[DISTINCT_COUNT]) + if col_stats.get(DISTINCT_COUNT) is not None + else None, + min=col_stats.get(MIN), + max=col_stats.get(MAX), + avg_len=col_stats.get(AVG_COL_LEN), + max_len=col_stats.get(MAX_COL_LEN), + version=col_stats.get(VERSION), + ) + + def _get_cached_table_statistics(self, statistics: str) -> dict: + # statistics is in format "xx bytes" OR "1382 bytes, 2 rows" + table_stats = dict() + for prop in statistics.split(","): + value_key_list = prop.strip().split(" ") # value_key_list -> [value, key] + if len(value_key_list) == 2 and value_key_list[1] in TABLE_STAT_LIST: + table_stats[value_key_list[1]] = value_key_list[0] + + return table_stats + def _get_created_at(self, created_at: Optional[str]) -> Optional[datetime]: return ( datetime.strptime(created_at, "%a %b %d %H:%M:%S %Z %Y") @@ -171,8 +259,8 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType: else: return HiveTableType.UNKNOWN - def _get_table_info(self, schema: Schema, table_name: str) -> dict: - rows = self._describe_extended(schema.name, table_name) + def _get_table_info(self, schema_name: str, table_name: str) -> dict: + rows = self._describe_extended(schema_name, table_name) index = rows.index(("# Detailed Table Information", "", "")) rows = rows[index + 1 :] @@ -235,6 +323,17 @@ def _describe_extended(self, schema_name: str, table_name: str) -> List[Row]: """ return self._execute_sql(f"DESCRIBE EXTENDED `{schema_name}`.`{table_name}`") + def _column_describe_extended( + self, schema_name: str, table_name: str, column_name: str + ) -> List[Row]: + """ + Rows are structured as shown in examples here + https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-describe-table.html#examples + """ + return self._execute_sql( + f"DESCRIBE EXTENDED `{schema_name}`.`{table_name}` {column_name}" + ) + def _execute_sql(self, sql: str) -> List[Row]: return self.inspector.bind.execute(sql).fetchall() diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 13baa8b57a639..b414f3f188c23 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -4,7 +4,7 @@ import dataclasses import logging from datetime import datetime, timezone -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union, cast from unittest.mock import patch from databricks.sdk import WorkspaceClient @@ -49,16 +49,19 @@ logger: logging.Logger = logging.getLogger(__name__) +@dataclasses.dataclass class TableInfoWithGeneration(TableInfo): generation: Optional[int] = None - @classmethod def as_dict(self) -> dict: return {**super().as_dict(), "generation": self.generation} @classmethod def from_dict(cls, d: Dict[str, Any]) -> "TableInfoWithGeneration": - table_info = super().from_dict(d) + table_info: TableInfoWithGeneration = cast( + TableInfoWithGeneration, + super().from_dict(d), + ) table_info.generation = d.get("generation") return table_info @@ -72,7 +75,10 @@ def as_dict(self) -> dict: @classmethod def from_dict(cls, d: Dict[str, Any]) -> "QueryFilterWithStatementTypes": - v = super().from_dict(d) + v: QueryFilterWithStatementTypes = cast( + QueryFilterWithStatementTypes, + super().from_dict(d), + ) v.statement_types = d["statement_types"] return v @@ -104,7 +110,7 @@ def __init__( def check_basic_connectivity(self) -> bool: return bool(self._workspace_client.catalogs.list()) - def assigned_metastore(self) -> Metastore: + def assigned_metastore(self) -> Optional[Metastore]: response = self._workspace_client.metastores.summary() return self._create_metastore(response) @@ -117,7 +123,9 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: logger.info("Catalogs not found") return [] for catalog in response: - yield self._create_catalog(metastore, catalog) + optional_catalog = self._create_catalog(metastore, catalog) + if optional_catalog: + yield optional_catalog def catalog( self, catalog_name: str, metastore: Optional[Metastore] @@ -126,7 +134,11 @@ def catalog( if not response: logger.info(f"Catalog {catalog_name} not found") return None - return self._create_catalog(metastore, response) + optional_catalog = self._create_catalog(metastore, response) + if optional_catalog: + return optional_catalog + + return None def schemas(self, catalog: Catalog) -> Iterable[Schema]: if ( @@ -140,7 +152,9 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]: logger.info(f"Schemas not found for catalog {catalog.id}") return [] for schema in response: - yield self._create_schema(catalog, schema) + optional_schema = self._create_schema(catalog, schema) + if optional_schema: + yield optional_schema def tables(self, schema: Schema) -> Iterable[Table]: if ( @@ -158,28 +172,38 @@ def tables(self, schema: Schema) -> Iterable[Table]: return [] for table in response: try: - yield self._create_table(schema, table) + optional_table = self._create_table( + schema, cast(TableInfoWithGeneration, table) + ) + if optional_table: + yield optional_table except Exception as e: logger.warning(f"Error parsing table: {e}") self.report.report_warning("table-parse", str(e)) def service_principals(self) -> Iterable[ServicePrincipal]: for principal in self._workspace_client.service_principals.list(): - yield self._create_service_principal(principal) + optional_sp = self._create_service_principal(principal) + if optional_sp: + yield optional_sp def workspace_notebooks(self) -> Iterable[Notebook]: for obj in self._workspace_client.workspace.list("/", recursive=True): - if obj.object_type == ObjectType.NOTEBOOK: + if obj.object_type == ObjectType.NOTEBOOK and obj.object_id and obj.path: yield Notebook( id=obj.object_id, path=obj.path, language=obj.language, created_at=datetime.fromtimestamp( obj.created_at / 1000, tz=timezone.utc - ), + ) + if obj.created_at + else None, modified_at=datetime.fromtimestamp( obj.modified_at / 1000, tz=timezone.utc - ), + ) + if obj.modified_at + else None, ) def query_history( @@ -204,7 +228,9 @@ def query_history( ) for query_info in self._query_history(filter_by=filter_by): try: - yield self._create_query(query_info) + optional_query = self._create_query(query_info) + if optional_query: + yield optional_query except Exception as e: logger.warning(f"Error parsing query: {e}") self.report.report_warning("query-parse", str(e)) @@ -229,15 +255,16 @@ def _query_history( "max_results": max_results, # Max batch size } - response: dict = self._workspace_client.api_client.do( + response: dict = self._workspace_client.api_client.do( # type: ignore method, path, body={**body, "filter_by": filter_by.as_dict()} ) + # we use default raw=False in above request, therefore will always get dict while True: if "res" not in response or not response["res"]: return for v in response["res"]: yield QueryInfo.from_dict(v) - response = self._workspace_client.api_client.do( + response = self._workspace_client.api_client.do( # type: ignore method, path, body={**body, "page_token": response["next_page_token"]} ) @@ -245,7 +272,7 @@ def list_lineages_by_table( self, table_name: str, include_entity_lineage: bool ) -> dict: """List table lineage by table name.""" - return self._workspace_client.api_client.do( + return self._workspace_client.api_client.do( # type: ignore method="GET", path="/api/2.0/lineage-tracking/table-lineage", body={ @@ -256,7 +283,7 @@ def list_lineages_by_table( def list_lineages_by_column(self, table_name: str, column_name: str) -> dict: """List column lineage by table name and column name.""" - return self._workspace_client.api_client.do( + return self._workspace_client.api_client.do( # type: ignore "GET", "/api/2.0/lineage-tracking/column-lineage", body={"table_name": table_name, "column_name": column_name}, @@ -325,7 +352,9 @@ def _escape_sequence(value: str) -> str: @staticmethod def _create_metastore( obj: Union[GetMetastoreSummaryResponse, MetastoreInfo] - ) -> Metastore: + ) -> Optional[Metastore]: + if not obj.name: + return None return Metastore( name=obj.name, id=UnityCatalogApiProxy._escape_sequence(obj.name), @@ -339,7 +368,10 @@ def _create_metastore( def _create_catalog( self, metastore: Optional[Metastore], obj: CatalogInfo - ) -> Catalog: + ) -> Optional[Catalog]: + if not obj.name: + self.report.num_catalogs_missing_name += 1 + return None catalog_name = self._escape_sequence(obj.name) return Catalog( name=obj.name, @@ -350,7 +382,10 @@ def _create_catalog( type=obj.catalog_type, ) - def _create_schema(self, catalog: Catalog, obj: SchemaInfo) -> Schema: + def _create_schema(self, catalog: Catalog, obj: SchemaInfo) -> Optional[Schema]: + if not obj.name: + self.report.num_schemas_missing_name += 1 + return None return Schema( name=obj.name, id=f"{catalog.id}.{self._escape_sequence(obj.name)}", @@ -359,11 +394,14 @@ def _create_schema(self, catalog: Catalog, obj: SchemaInfo) -> Schema: owner=obj.owner, ) - def _create_column(self, table_id: str, obj: ColumnInfo) -> Column: + def _create_column(self, table_id: str, obj: ColumnInfo) -> Optional[Column]: + if not obj.name: + self.report.num_columns_missing_name += 1 + return None return Column( name=obj.name, id=f"{table_id}.{self._escape_sequence(obj.name)}", - type_text=obj.type_text, + type_text=obj.type_text or "", type_name=obj.type_name, type_scale=obj.type_scale, type_precision=obj.type_precision, @@ -372,7 +410,12 @@ def _create_column(self, table_id: str, obj: ColumnInfo) -> Column: comment=obj.comment, ) - def _create_table(self, schema: Schema, obj: TableInfoWithGeneration) -> Table: + def _create_table( + self, schema: Schema, obj: TableInfoWithGeneration + ) -> Optional[Table]: + if not obj.name: + self.report.num_tables_missing_name += 1 + return None table_id = f"{schema.id}.{self._escape_sequence(obj.name)}" return Table( name=obj.name, @@ -381,26 +424,40 @@ def _create_table(self, schema: Schema, obj: TableInfoWithGeneration) -> Table: schema=schema, storage_location=obj.storage_location, data_source_format=obj.data_source_format, - columns=[ - self._create_column(table_id, column) for column in obj.columns or [] - ], + columns=list(self._extract_columns(obj.columns, table_id)) + if obj.columns + else [], view_definition=obj.view_definition or None, properties=obj.properties or {}, owner=obj.owner, generation=obj.generation, - created_at=datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc), + created_at=datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) + if obj.created_at + else None, created_by=obj.created_by, updated_at=datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) if obj.updated_at + else None + if obj.updated_at else None, updated_by=obj.updated_by, table_id=obj.table_id, comment=obj.comment, ) + def _extract_columns( + self, columns: List[ColumnInfo], table_id: str + ) -> Iterable[Column]: + for column in columns: + optional_column = self._create_column(table_id, column) + if optional_column: + yield optional_column + def _create_service_principal( self, obj: DatabricksServicePrincipal - ) -> ServicePrincipal: + ) -> Optional[ServicePrincipal]: + if not obj.display_name or not obj.application_id: + return None return ServicePrincipal( id=f"{obj.id}.{self._escape_sequence(obj.display_name)}", display_name=obj.display_name, @@ -408,8 +465,14 @@ def _create_service_principal( active=obj.active, ) - @staticmethod - def _create_query(info: QueryInfo) -> Query: + def _create_query(self, info: QueryInfo) -> Optional[Query]: + if ( + not info.query_text + or not info.query_start_time_ms + or not info.query_end_time_ms + ): + self.report.num_queries_missing_info += 1 + return None return Query( query_id=info.query_id, query_text=info.query_text, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py index ab38119d01a9b..5992f103ccac3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py @@ -14,6 +14,10 @@ StatementStatus, ) +from datahub.ingestion.source.unity.hive_metastore_proxy import ( + HIVE_METASTORE, + HiveMetastoreProxy, +) from datahub.ingestion.source.unity.proxy_types import ( ColumnProfile, TableProfile, @@ -30,6 +34,7 @@ class UnityCatalogProxyProfilingMixin: _workspace_client: WorkspaceClient report: UnityCatalogReport warehouse_id: str + hive_metastore_proxy: Optional[HiveMetastoreProxy] def check_profiling_connectivity(self): self._workspace_client.warehouses.get(self.warehouse_id) @@ -136,6 +141,8 @@ def _analyze_table( def _check_analyze_table_statement_status( self, execute_response: ExecuteStatementResponse, max_wait_secs: int ) -> bool: + if not execute_response.statement_id or not execute_response.status: + return False statement_id: str = execute_response.statement_id status: StatementStatus = execute_response.status @@ -152,13 +159,15 @@ def _check_analyze_table_statement_status( statement_id ) self._raise_if_error(response, "get-statement") - status = response.status + status = response.status # type: ignore return status.state == StatementState.SUCCEEDED def _get_table_profile( self, ref: TableReference, include_columns: bool ) -> TableProfile: + if self.hive_metastore_proxy and ref.catalog == HIVE_METASTORE: + return self.hive_metastore_proxy.get_table_profile(ref, include_columns) table_info = self._workspace_client.tables.get(ref.qualified_table_name) return self._create_table_profile(table_info, include_columns=include_columns) @@ -166,7 +175,12 @@ def _create_table_profile( self, table_info: TableInfo, include_columns: bool ) -> TableProfile: # Warning: this implementation is brittle -- dependent on properties that can change - columns_names = [column.name for column in table_info.columns] + columns_names = ( + [column.name for column in table_info.columns if column.name] + if table_info.columns + else [] + ) + return TableProfile( num_rows=self._get_int(table_info, "spark.sql.statistics.numRows"), total_size=self._get_int(table_info, "spark.sql.statistics.totalSize"), @@ -182,6 +196,7 @@ def _create_table_profile( def _create_column_profile( self, column: str, table_info: TableInfo ) -> ColumnProfile: + tblproperties = table_info.properties or {} return ColumnProfile( name=column, null_count=self._get_int( @@ -190,25 +205,18 @@ def _create_column_profile( distinct_count=self._get_int( table_info, f"spark.sql.statistics.colStats.{column}.distinctCount" ), - min=table_info.properties.get( - f"spark.sql.statistics.colStats.{column}.min" - ), - max=table_info.properties.get( - f"spark.sql.statistics.colStats.{column}.max" - ), - avg_len=table_info.properties.get( - f"spark.sql.statistics.colStats.{column}.avgLen" - ), - max_len=table_info.properties.get( - f"spark.sql.statistics.colStats.{column}.maxLen" - ), - version=table_info.properties.get( + min=tblproperties.get(f"spark.sql.statistics.colStats.{column}.min"), + max=tblproperties.get(f"spark.sql.statistics.colStats.{column}.max"), + avg_len=tblproperties.get(f"spark.sql.statistics.colStats.{column}.avgLen"), + max_len=tblproperties.get(f"spark.sql.statistics.colStats.{column}.maxLen"), + version=tblproperties.get( f"spark.sql.statistics.colStats.{column}.version" ), ) def _get_int(self, table_info: TableInfo, field: str) -> Optional[int]: - value = table_info.properties.get(field) + tblproperties = table_info.properties or {} + value = tblproperties.get(field) if value is not None: try: return int(value) @@ -223,14 +231,18 @@ def _get_int(self, table_info: TableInfo, field: str) -> Optional[int]: def _raise_if_error( response: Union[ExecuteStatementResponse, GetStatementResponse], key: str ) -> None: - if response.status.state in [ + if response.status and response.status.state in [ StatementState.FAILED, StatementState.CANCELED, StatementState.CLOSED, ]: raise DatabricksError( - response.status.error.message, - error_code=response.status.error.error_code.value, + response.status.error.message + if response.status.error and response.status.error.message + else "Unknown Error", + error_code=response.status.error.error_code.value + if response.status.error and response.status.error.error_code + else "Unknown Error Code", status=response.status.state.value, context=key, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index e5951cb0fa4ff..c66189d99f738 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -96,8 +96,8 @@ class CommonProperty: @dataclass class Metastore(CommonProperty): - global_metastore_id: str # Global across clouds and regions - metastore_id: str + global_metastore_id: Optional[str] # Global across clouds and regions + metastore_id: Optional[str] owner: Optional[str] cloud: Optional[str] region: Optional[str] @@ -107,7 +107,7 @@ class Metastore(CommonProperty): class Catalog(CommonProperty): metastore: Optional[Metastore] owner: Optional[str] - type: Union[CatalogType, CustomCatalogType] + type: Optional[Union[CatalogType, CustomCatalogType]] @dataclass @@ -224,14 +224,14 @@ class Table(CommonProperty): columns: List[Column] storage_location: Optional[str] data_source_format: Optional[DataSourceFormat] - table_type: Union[TableType, HiveTableType] + table_type: Optional[Union[TableType, HiveTableType]] owner: Optional[str] generation: Optional[int] created_at: Optional[datetime] created_by: Optional[str] updated_at: Optional[datetime] updated_by: Optional[str] - table_id: str + table_id: Optional[str] view_definition: Optional[str] properties: Dict[str, str] upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict) @@ -252,16 +252,16 @@ def __post_init__(self): @dataclass class Query: - query_id: str + query_id: Optional[str] query_text: str - statement_type: QueryStatementType + statement_type: Optional[QueryStatementType] start_time: datetime end_time: datetime # User who ran the query - user_id: int + user_id: Optional[int] user_name: Optional[str] # Email or username # User whose credentials were used to run the query - executed_as_user_id: int + executed_as_user_id: Optional[int] executed_as_user_name: Optional[str] @@ -310,9 +310,9 @@ def __bool__(self): class Notebook: id: NotebookId path: str - language: Language - created_at: datetime - modified_at: datetime + language: Optional[Language] + created_at: Optional[datetime] + modified_at: Optional[datetime] upstreams: FrozenSet[TableReference] = field(default_factory=frozenset) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 0770d9d27055c..02eedb67f4cc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -39,3 +39,9 @@ class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport): num_profile_missing_size_in_bytes: int = 0 num_profile_failed_unsupported_column_type: int = 0 num_profile_failed_int_casts: int = 0 + + num_catalogs_missing_name: int = 0 + num_schemas_missing_name: int = 0 + num_tables_missing_name: int = 0 + num_columns_missing_name: int = 0 + num_queries_missing_info: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 43c5e24439377..1bc47c6307849 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -304,22 +304,28 @@ def process_notebooks(self) -> Iterable[MetadataWorkUnit]: yield from self._gen_notebook_workunits(notebook) def _gen_notebook_workunits(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]: + + properties = {"path": notebook.path} + if notebook.language: + properties["language"] = notebook.language.value + mcps = MetadataChangeProposalWrapper.construct_many( entityUrn=self.gen_notebook_urn(notebook), aspects=[ DatasetPropertiesClass( name=notebook.path.rsplit("/", 1)[-1], - customProperties={ - "path": notebook.path, - "language": notebook.language.value, - }, + customProperties=properties, externalUrl=urljoin( self.config.workspace_url, f"#notebook/{notebook.id}" ), - created=TimeStampClass(int(notebook.created_at.timestamp() * 1000)), + created=TimeStampClass(int(notebook.created_at.timestamp() * 1000)) + if notebook.created_at + else None, lastModified=TimeStampClass( int(notebook.modified_at.timestamp() * 1000) - ), + ) + if notebook.modified_at + else None, ), SubTypesClass(typeNames=[DatasetSubTypes.NOTEBOOK]), BrowsePathsClass(paths=notebook.path.split("/")), @@ -352,6 +358,9 @@ def process_metastores(self) -> Iterable[MetadataWorkUnit]: metastore: Optional[Metastore] = None if self.config.include_metastore: metastore = self.unity_catalog_api_proxy.assigned_metastore() + if not metastore: + self.report.report_failure("Metastore", "Not found") + return yield from self.gen_metastore_containers(metastore) yield from self.process_catalogs(metastore) if metastore and self.config.include_metastore: @@ -705,13 +714,15 @@ def _create_table_property_aspect(self, table: Table) -> DatasetPropertiesClass: if table.generation is not None: custom_properties["generation"] = str(table.generation) - custom_properties["table_type"] = table.table_type.value + if table.table_type: + custom_properties["table_type"] = table.table_type.value if table.created_by: custom_properties["created_by"] = table.created_by if table.properties: custom_properties.update({k: str(v) for k, v in table.properties.items()}) - custom_properties["table_id"] = table.table_id + if table.table_id: + custom_properties["table_id"] = table.table_id if table.owner: custom_properties["owner"] = table.owner if table.updated_by: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index ab21c1a318659..f07e7a92d8762 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -117,7 +117,10 @@ def _get_workunits_internal( def _generate_operation_workunit( self, query: Query, table_info: QueryTableInfo ) -> Iterable[MetadataWorkUnit]: - if query.statement_type not in OPERATION_STATEMENT_TYPES: + if ( + not query.statement_type + or query.statement_type not in OPERATION_STATEMENT_TYPES + ): return None # Not sure about behavior when there are multiple target tables. This is a best attempt. diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index aab7630d57f46..05f1db0b932f8 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -186,6 +186,8 @@ def register_mock_data(workspace_client): "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", }, "generation": 2, "metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736", @@ -200,6 +202,57 @@ def register_mock_data(workspace_client): ) ] + workspace_client.tables.get = lambda *args, **kwargs: databricks.sdk.service.catalog.TableInfo.from_dict( + { + "name": "quickstart_table", + "catalog_name": "quickstart_catalog", + "schema_name": "quickstart_schema", + "table_type": "MANAGED", + "data_source_format": "DELTA", + "columns": [ + { + "name": "columnA", + "type_text": "int", + "type_json": '{"name":"columnA","type":"integer","nullable":true,"metadata":{}}', + "type_name": "INT", + "type_precision": 0, + "type_scale": 0, + "position": 0, + "nullable": True, + }, + { + "name": "columnB", + "type_text": "string", + "type_json": '{"name":"columnB","type":"string","nullable":true,"metadata":{}}', + "type_name": "STRING", + "type_precision": 0, + "type_scale": 0, + "position": 1, + "nullable": True, + }, + ], + "storage_location": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896", + "owner": "account users", + "properties": { + "delta.lastCommitTimestamp": "1666185711000", + "delta.lastUpdateVersion": "1", + "delta.minReaderVersion": "1", + "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", + }, + "generation": 2, + "metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736", + "full_name": "quickstart_catalog.quickstart_schema.quickstart_table", + "data_access_configuration_id": "00000000-0000-0000-0000-000000000000", + "created_at": 1666185698688, + "created_by": "abc@acryl.io", + "updated_at": 1666186049633, + "updated_by": "abc@acryl.io", + "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", + } + ) + workspace_client.service_principals.list.return_value = [ ServicePrincipal.from_dict(d) for d in [ @@ -220,7 +273,50 @@ def register_mock_data(workspace_client): def mock_hive_sql(query): - if query == "DESCRIBE EXTENDED `bronze_kambi`.`bet`": + + if query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` betStatusId": + return [ + ("col_name", "betStatusId"), + ("data_type", "bigint"), + ("comment", None), + ("min", None), + ("max", None), + ("num_nulls", 0), + ("distinct_count", 1), + ("avg_col_len", 8), + ("max_col_len", 8), + ("histogram", None), + ] + elif query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` channelId": + return [ + ("col_name", "channelId"), + ("data_type", "bigint"), + ("comment", None), + ("min", None), + ("max", None), + ("num_nulls", 0), + ("distinct_count", 1), + ("avg_col_len", 8), + ("max_col_len", 8), + ("histogram", None), + ] + elif query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` combination": + return [ + ("col_name", "combination"), + ( + "data_type", + "struct>,eventId:bigint,eventName:string,eventStartDate:string,live:boolean,odds:double,outcomeIds:array,outcomeLabel:string,sportId:string,status:string,voidReason:string>>,payout:double,rewardExtraPayout:double,stake:double>", + ), + ("comment", None), + ("min", None), + ("max", None), + ("num_nulls", None), + ("distinct_count", None), + ("avg_col_len", None), + ("max_col_len", None), + ("histogram", None), + ] + elif query == "DESCRIBE EXTENDED `bronze_kambi`.`bet`": return [ ("betStatusId", "bigint", None), ("channelId", "bigint", None), @@ -237,6 +333,7 @@ def mock_hive_sql(query): ("Created Time", "Wed Jun 22 05:14:56 UTC 2022", ""), ("Last Access", "UNKNOWN", ""), ("Created By", "Spark 3.2.1", ""), + ("Statistics", "1024 bytes, 3 rows", ""), ("Type", "MANAGED", ""), ("Location", "dbfs:/user/hive/warehouse/bronze_kambi.db/bet", ""), ("Provider", "delta", ""), @@ -312,6 +409,11 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock): "include_ownership": True, "include_hive_metastore": True, "warehouse_id": "test", + "profiling": { + "enabled": True, + "method": "analyze", + "call_analyze": False, + }, }, }, "sink": { diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index 98a6615dd2b52..383f94144ffdc 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -504,7 +504,7 @@ "Last Access": "UNKNOWN", "Created By": "Spark 3.2.1", "Owner": "root", - "table_id": "hive_metastore.bronze_kambi.view1", + "table_id": "acryl_metastore.hive_metastore.bronze_kambi.view1", "created_at": "2022-06-22 05:14:56" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/hive_metastore/bronze_kambi/view1", @@ -638,7 +638,7 @@ "aspectName": "schemaMetadata", "aspect": { "json": { - "schemaName": "hive_metastore.bronze_kambi.view1", + "schemaName": "acryl_metastore.hive_metastore.bronze_kambi.view1", "platform": "urn:li:dataPlatform:databricks", "version": 0, "created": { @@ -1172,10 +1172,11 @@ "Table": "bet", "Last Access": "UNKNOWN", "Created By": "Spark 3.2.1", + "Statistics": "1024 bytes, 3 rows", "Owner": "root", "Is_managed_location": "true", "Table Properties": "[delta.autoOptimize.autoCompact=true,delta.autoOptimize.optimizeWrite=true,delta.minReaderVersion=1,delta.minWriterVersion=2]", - "table_id": "hive_metastore.bronze_kambi.bet", + "table_id": "acryl_metastore.hive_metastore.bronze_kambi.bet", "created_at": "2022-06-22 05:14:56" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/hive_metastore/bronze_kambi/bet", @@ -1275,7 +1276,7 @@ "aspectName": "schemaMetadata", "aspect": { "json": { - "schemaName": "hive_metastore.bronze_kambi.bet", + "schemaName": "acryl_metastore.hive_metastore.bronze_kambi.bet", "platform": "urn:li:dataPlatform:databricks", "version": 0, "created": { @@ -1731,15 +1732,17 @@ "generation": "2", "table_type": "MANAGED", "created_by": "abc@acryl.io", - "created_at": "2022-10-19 13:21:38.688000+00:00", "delta.lastCommitTimestamp": "1666185711000", "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00" + "updated_at": "2022-10-19 13:27:29.633000+00:00", + "created_at": "2022-10-19 13:21:38.688000+00:00" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/main/default/quickstart_table", "name": "quickstart_table", @@ -2061,15 +2064,17 @@ "generation": "2", "table_type": "MANAGED", "created_by": "abc@acryl.io", - "created_at": "2022-10-19 13:21:38.688000+00:00", "delta.lastCommitTimestamp": "1666185711000", "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00" + "updated_at": "2022-10-19 13:27:29.633000+00:00", + "created_at": "2022-10-19 13:21:38.688000+00:00" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/main/quickstart_schema/quickstart_table", "name": "quickstart_table", @@ -2527,15 +2532,17 @@ "generation": "2", "table_type": "MANAGED", "created_by": "abc@acryl.io", - "created_at": "2022-10-19 13:21:38.688000+00:00", "delta.lastCommitTimestamp": "1666185711000", "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00" + "updated_at": "2022-10-19 13:27:29.633000+00:00", + "created_at": "2022-10-19 13:21:38.688000+00:00" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/quickstart_catalog/default/quickstart_table", "name": "quickstart_table", @@ -2857,15 +2864,17 @@ "generation": "2", "table_type": "MANAGED", "created_by": "abc@acryl.io", - "created_at": "2022-10-19 13:21:38.688000+00:00", "delta.lastCommitTimestamp": "1666185711000", "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00" + "updated_at": "2022-10-19 13:27:29.633000+00:00", + "created_at": "2022-10-19 13:21:38.688000+00:00" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/quickstart_catalog/quickstart_schema/quickstart_table", "name": "quickstart_table", @@ -3323,15 +3332,17 @@ "generation": "2", "table_type": "MANAGED", "created_by": "abc@acryl.io", - "created_at": "2022-10-19 13:21:38.688000+00:00", "delta.lastCommitTimestamp": "1666185711000", "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00" + "updated_at": "2022-10-19 13:27:29.633000+00:00", + "created_at": "2022-10-19 13:21:38.688000+00:00" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/system/default/quickstart_table", "name": "quickstart_table", @@ -3653,15 +3664,17 @@ "generation": "2", "table_type": "MANAGED", "created_by": "abc@acryl.io", - "created_at": "2022-10-19 13:21:38.688000+00:00", "delta.lastCommitTimestamp": "1666185711000", "delta.lastUpdateVersion": "1", "delta.minReaderVersion": "1", "delta.minWriterVersion": "2", + "spark.sql.statistics.numRows": "10", + "spark.sql.statistics.totalSize": "512", "table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896", "owner": "account users", "updated_by": "abc@acryl.io", - "updated_at": "2022-10-19 13:27:29.633000+00:00" + "updated_at": "2022-10-19 13:27:29.633000+00:00", + "created_at": "2022-10-19 13:21:38.688000+00:00" }, "externalUrl": "https://dummy.cloud.databricks.com/explore/data/system/quickstart_schema/quickstart_table", "name": "quickstart_table", @@ -3813,6 +3826,69 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.default.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703580920011, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 10, + "columnCount": 2, + "fieldProfiles": [], + "sizeInBytes": 512 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.hive_metastore.bronze_kambi.bet,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703581191932, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 3, + "columnCount": 3, + "fieldProfiles": [ + { + "fieldPath": "betStatusId", + "uniqueCount": 1, + "uniqueProportion": 0.3333333333333333, + "nullCount": 0, + "nullProportion": 0.0 + }, + { + "fieldPath": "channelId", + "uniqueCount": 1, + "uniqueProportion": 0.3333333333333333, + "nullCount": 0, + "nullProportion": 0.0 + } + ], + "sizeInBytes": 1024 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.default.quickstart_table,PROD)", @@ -3829,6 +3905,30 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.default.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703580406273, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 10, + "columnCount": 2, + "fieldProfiles": [], + "sizeInBytes": 512 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.quickstart_schema.quickstart_table,PROD)", @@ -3845,6 +3945,78 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.quickstart_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703580920008, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 10, + "columnCount": 2, + "fieldProfiles": [], + "sizeInBytes": 512 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.quickstart_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703580920011, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 10, + "columnCount": 2, + "fieldProfiles": [], + "sizeInBytes": 512 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.main.default.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703580920012, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 10, + "columnCount": 2, + "fieldProfiles": [], + "sizeInBytes": 512 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.quickstart_catalog.default.quickstart_table,PROD)", @@ -3877,6 +4049,30 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.quickstart_schema.quickstart_table,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "json": { + "timestampMillis": 1703580920010, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "rowCount": 10, + "columnCount": 2, + "fieldProfiles": [], + "sizeInBytes": 512 + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,dummy.acryl_metastore.system.default.quickstart_table,PROD)",