Skip to content

Commit

Permalink
feat(databricks): add hive metastore analyze profiling (datahub-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Jan 3, 2024
1 parent 6d72640 commit 29f2142
Show file tree
Hide file tree
Showing 12 changed files with 600 additions and 106 deletions.
4 changes: 1 addition & 3 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
# 0.16.0 added py.typed support which caused mypy to fail. The databricks sdk is pinned until we resolve mypy issues.
# https://github.com/databricks/databricks-sdk-py/pull/483
"databricks-sdk>=0.9.0,<0.16.0",
"databricks-sdk>=0.9.0",
"pyspark~=3.3.0",
"requests",
# Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
profiling: Optional[GEProfilingConfig] = values.get("profiling")
if profiling is not None and profiling.enabled:
# Note: isinstance() check is required here as unity-catalog source reuses
# SQLCommonConfig with different profiling config than GEProfilingConfig
if (
profiling is not None
and isinstance(profiling, GEProfilingConfig)
and profiling.enabled
):
profiling._allow_deny_patterns = values["profile_pattern"]
return values

Expand Down
16 changes: 7 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)

@pydantic.root_validator(skip_on_failure=True)
def warehouse_id_required_for_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
if values.get("enabled") and not values.get("warehouse_id"):
raise ValueError("warehouse_id must be set when profiling is enabled.")
return values

@property
def include_columns(self):
return not self.profile_table_level_only
Expand Down Expand Up @@ -254,6 +246,7 @@ class UnityCatalogSourceConfig(
description="Generate usage statistics.",
)

# TODO: Remove `type:ignore` by refactoring config
profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore
default=UnityCatalogGEProfilerConfig(),
description="Data profiling configuration",
Expand Down Expand Up @@ -316,7 +309,9 @@ def include_metastore_warning(cls, v: bool) -> bool:

@pydantic.root_validator(skip_on_failure=True)
def set_warehouse_id_from_profiling(cls, values: Dict[str, Any]) -> Dict[str, Any]:
profiling: Optional[UnityCatalogProfilerConfig] = values.get("profiling")
profiling: Optional[
Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig]
] = values.get("profiling")
if not values.get("warehouse_id") and profiling and profiling.warehouse_id:
values["warehouse_id"] = profiling.warehouse_id
if (
Expand All @@ -337,6 +332,9 @@ def set_warehouse_id_from_profiling(cls, values: Dict[str, Any]) -> Dict[str, An
if values.get("warehouse_id") and profiling and not profiling.warehouse_id:
profiling.warehouse_id = values["warehouse_id"]

if profiling and profiling.enabled and not profiling.warehouse_id:
raise ValueError("warehouse_id must be set when profiling is enabled.")

return values

@pydantic.validator("schema_pattern", always=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Column,
ColumnProfile,
CustomCatalogType,
HiveTableType,
Metastore,
Schema,
Table,
TableProfile,
TableReference,
)

logger = logging.getLogger(__name__)
Expand All @@ -38,6 +41,18 @@
"binary": ColumnTypeName.BINARY,
}

NUM_NULLS = "num_nulls"
DISTINCT_COUNT = "distinct_count"
MIN = "min"
MAX = "max"
AVG_COL_LEN = "avg_col_len"
MAX_COL_LEN = "max_col_len"
VERSION = "version"

ROWS = "rows"
BYTES = "bytes"
TABLE_STAT_LIST = {ROWS, BYTES}


class HiveMetastoreProxy(Closeable):
# TODO: Support for view lineage using SQL parsing
Expand Down Expand Up @@ -67,7 +82,7 @@ def get_inspector(sqlalchemy_url: str, options: dict) -> Inspector:

def hive_metastore_catalog(self, metastore: Optional[Metastore]) -> Catalog:
return Catalog(
id=HIVE_METASTORE,
id=f"{metastore.id}.{HIVE_METASTORE}" if metastore else HIVE_METASTORE,
name=HIVE_METASTORE,
comment=None,
metastore=metastore,
Expand Down Expand Up @@ -95,9 +110,14 @@ def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]:
continue
yield self._get_table(schema, table_name, False)

def _get_table(self, schema: Schema, table_name: str, is_view: bool) -> Table:
def _get_table(
self,
schema: Schema,
table_name: str,
is_view: bool = False,
) -> Table:
columns = self._get_columns(schema, table_name)
detailed_info = self._get_table_info(schema, table_name)
detailed_info = self._get_table_info(schema.name, table_name)

comment = detailed_info.pop("Comment", None)
storage_location = detailed_info.pop("Location", None)
Expand Down Expand Up @@ -129,6 +149,74 @@ def _get_table(self, schema: Schema, table_name: str, is_view: bool) -> Table:
comment=comment,
)

def get_table_profile(
self, ref: TableReference, include_column_stats: bool = False
) -> TableProfile:
columns = self._get_columns(
Schema(
id=ref.schema,
name=ref.schema,
# This is okay, as none of this is used in profiling
catalog=self.hive_metastore_catalog(None),
comment=None,
owner=None,
),
ref.table,
)
detailed_info = self._get_table_info(ref.schema, ref.table)

table_stats = (
self._get_cached_table_statistics(detailed_info["Statistics"])
if detailed_info.get("Statistics")
else {}
)

return TableProfile(
num_rows=int(table_stats[ROWS])
if table_stats.get(ROWS) is not None
else None,
total_size=int(table_stats[BYTES])
if table_stats.get(BYTES) is not None
else None,
num_columns=len(columns),
column_profiles=[
self._get_column_profile(column.name, ref) for column in columns
]
if include_column_stats
else [],
)

def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile:

props = self._column_describe_extended(ref.schema, ref.table, column)
col_stats = {}
for prop in props:
col_stats[prop[0]] = prop[1]
return ColumnProfile(
name=column,
null_count=int(col_stats[NUM_NULLS])
if col_stats.get(NUM_NULLS) is not None
else None,
distinct_count=int(col_stats[DISTINCT_COUNT])
if col_stats.get(DISTINCT_COUNT) is not None
else None,
min=col_stats.get(MIN),
max=col_stats.get(MAX),
avg_len=col_stats.get(AVG_COL_LEN),
max_len=col_stats.get(MAX_COL_LEN),
version=col_stats.get(VERSION),
)

def _get_cached_table_statistics(self, statistics: str) -> dict:
# statistics is in format "xx bytes" OR "1382 bytes, 2 rows"
table_stats = dict()
for prop in statistics.split(","):
value_key_list = prop.strip().split(" ") # value_key_list -> [value, key]
if len(value_key_list) == 2 and value_key_list[1] in TABLE_STAT_LIST:
table_stats[value_key_list[1]] = value_key_list[0]

return table_stats

def _get_created_at(self, created_at: Optional[str]) -> Optional[datetime]:
return (
datetime.strptime(created_at, "%a %b %d %H:%M:%S %Z %Y")
Expand Down Expand Up @@ -171,8 +259,8 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType:
else:
return HiveTableType.UNKNOWN

def _get_table_info(self, schema: Schema, table_name: str) -> dict:
rows = self._describe_extended(schema.name, table_name)
def _get_table_info(self, schema_name: str, table_name: str) -> dict:
rows = self._describe_extended(schema_name, table_name)

index = rows.index(("# Detailed Table Information", "", ""))
rows = rows[index + 1 :]
Expand Down Expand Up @@ -235,6 +323,17 @@ def _describe_extended(self, schema_name: str, table_name: str) -> List[Row]:
"""
return self._execute_sql(f"DESCRIBE EXTENDED `{schema_name}`.`{table_name}`")

def _column_describe_extended(
self, schema_name: str, table_name: str, column_name: str
) -> List[Row]:
"""
Rows are structured as shown in examples here
https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-describe-table.html#examples
"""
return self._execute_sql(
f"DESCRIBE EXTENDED `{schema_name}`.`{table_name}` {column_name}"
)

def _execute_sql(self, sql: str) -> List[Row]:
return self.inspector.bind.execute(sql).fetchall()

Expand Down
Loading

0 comments on commit 29f2142

Please sign in to comment.