Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 10, 2024
2 parents f251b08 + f403370 commit bf1e915
Show file tree
Hide file tree
Showing 46 changed files with 7,861 additions and 3,377 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:main?expand=1
"acryl-sqlglot[rs]==25.8.2.dev9",
"acryl-sqlglot[rs]==25.20.2.dev5",
}

classification_lib = {
Expand Down Expand Up @@ -722,7 +722,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"tableau = datahub.ingestion.source.tableau.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
"teradata = datahub.ingestion.source.sql.teradata:TeradataSource",
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
"config": {"filename": out_file.name},
},
},
no_default_report=True,
report_to=None,
)

pipeline.run()
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
if token is not None:
recipe["sink"]["config"]["token"] = token

pipeline = Pipeline.create(recipe, no_default_report=True)
pipeline = Pipeline.create(recipe, report_to=None)
pipeline.run()
ret = pipeline.pretty_print_summary()
sys.exit(ret)
Expand Down
29 changes: 16 additions & 13 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def run(
strict_warnings: bool,
preview_workunits: int,
test_source_connection: bool,
report_to: str,
report_to: Optional[str],
no_default_report: bool,
no_spinner: bool,
no_progress: bool,
Expand Down Expand Up @@ -160,7 +160,11 @@ async def run_pipeline_to_completion(pipeline: Pipeline) -> int:
raw_pipeline_config = pipeline_config.pop("__raw_config")

if test_source_connection:
_test_source_connection(report_to, pipeline_config)
sys.exit(_test_source_connection(report_to, pipeline_config))

if no_default_report:
# The default is "datahub" reporting. The extra flag will disable it.
report_to = None

async def run_ingestion_and_check_upgrade() -> int:
# TRICKY: We want to make sure that the Pipeline.create() call happens on the
Expand All @@ -171,13 +175,12 @@ async def run_ingestion_and_check_upgrade() -> int:
# logger.debug(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(
pipeline_config,
dry_run,
preview,
preview_workunits,
report_to,
no_default_report,
no_progress,
raw_pipeline_config,
dry_run=dry_run,
preview_mode=preview,
preview_workunits=preview_workunits,
report_to=report_to,
no_progress=no_progress,
raw_config=raw_pipeline_config,
)

version_stats_future = asyncio.ensure_future(
Expand Down Expand Up @@ -392,7 +395,7 @@ def deploy(
click.echo(response)


def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None:
def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> int:
connection_report = None
try:
connection_report = ConnectionManager().test_source_connection(pipeline_config)
Expand All @@ -401,12 +404,12 @@ def _test_source_connection(report_to: Optional[str], pipeline_config: dict) ->
with open(report_to, "w") as out_fp:
out_fp.write(connection_report.as_json())
logger.info(f"Wrote report successfully to {report_to}")
sys.exit(0)
return 0
except Exception as e:
logger.error(f"Failed to test connection due to {e}")
if connection_report:
logger.error(connection_report.as_json())
sys.exit(1)
return 1


def parse_restli_response(response):
Expand Down Expand Up @@ -447,7 +450,7 @@ def mcps(path: str) -> None:
},
}

pipeline = Pipeline.create(recipe, no_default_report=True)
pipeline = Pipeline.create(recipe, report_to=None)
pipeline.run()
ret = pipeline.pretty_print_summary()
sys.exit(ret)
Expand Down
16 changes: 0 additions & 16 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn
from datahub.utilities.urn_encoder import UrnEncoder
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -225,21 +224,6 @@ def make_user_urn(username: str) -> str:
)


def make_actor_urn(actor: str) -> Union[CorpUserUrn, CorpGroupUrn]:
"""
Makes a user urn if the input is not a user or group urn already
"""
return (
CorpUserUrn(actor)
if not actor.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else (
CorpUserUrn.from_string(actor)
if actor.startswith("urn:li:corpuser:")
else CorpGroupUrn.from_string(actor)
)
)


def make_group_urn(groupname: str) -> str:
"""
Makes a group urn if the input is not a user or group urn already
Expand Down
25 changes: 13 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ def __init__(
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = None,
no_default_report: bool = False,
no_progress: bool = False,
):
self.config = config
Expand Down Expand Up @@ -279,7 +278,7 @@ def __init__(

with set_graph_context(self.graph):
with _add_init_error_context("configure reporters"):
self._configure_reporting(report_to, no_default_report)
self._configure_reporting(report_to)

with _add_init_error_context(
f"find a registered source for type {self.source_type}"
Expand Down Expand Up @@ -326,15 +325,19 @@ def _configure_transforms(self) -> None:
# Add the system metadata transformer at the end of the list.
self.transformers.append(SystemMetadataTransformer(self.ctx))

def _configure_reporting(
self, report_to: Optional[str], no_default_report: bool
) -> None:
if report_to == "datahub":
def _configure_reporting(self, report_to: Optional[str]) -> None:
if self.dry_run:
# In dry run mode, we don't want to report anything.
return

if not report_to:
# Reporting is disabled.
pass
elif report_to == "datahub":
# we add the default datahub reporter unless a datahub reporter is already configured
if not no_default_report and (
not self.config.reporting
or "datahub" not in [x.type for x in self.config.reporting]
):
if not self.config.reporting or "datahub" not in [
reporter.type for reporter in self.config.reporting
]:
self.config.reporting.append(
ReporterConfig.parse_obj({"type": "datahub"})
)
Expand Down Expand Up @@ -409,7 +412,6 @@ def create(
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = "datahub",
no_default_report: bool = False,
no_progress: bool = False,
raw_config: Optional[dict] = None,
) -> "Pipeline":
Expand All @@ -420,7 +422,6 @@ def create(
preview_mode=preview_mode,
preview_workunits=preview_workunits,
report_to=report_to,
no_default_report=no_default_report,
no_progress=no_progress,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
BigQueryIdentifierBuilder,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
ObservedQuery,
Expand Down Expand Up @@ -363,7 +364,9 @@ def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery:
session_id=row["session_id"],
timestamp=row["creation_time"],
user=(
self.identifiers.gen_user_urn(row["user_email"])
CorpUserUrn.from_string(
self.identifiers.gen_user_urn(row["user_email"])
)
if row["user_email"]
else None
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def parse_alter_table_rename(default_schema: str, query: str) -> Tuple[str, str,
"""

parsed_query = parse_statement(query, dialect=get_dialect("redshift"))
assert isinstance(parsed_query, sqlglot.exp.AlterTable)
assert isinstance(parsed_query, sqlglot.exp.Alter)
prev_name = parsed_query.this.name
rename_clause = parsed_query.args["actions"][0]
assert isinstance(rename_clause, sqlglot.exp.RenameTable)
Expand Down Expand Up @@ -875,7 +875,7 @@ def _process_table_renames(
default_schema=self.config.default_schema,
query=query_text,
)
except ValueError as e:
except Exception as e:
logger.info(f"Failed to parse alter table rename: {e}")
self.report.num_alter_table_parse_errors += 1
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from datahub.metadata.urns import DatasetUrn
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownQueryLineageInfo,
ObservedQuery,
SqlParsingAggregator,
)
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -118,11 +119,13 @@ def build(
if self.config.resolve_temp_table_in_lineage:
for temp_row in self._lineage_v1.get_temp_tables(connection=connection):
self.aggregator.add_observed_query(
query=temp_row.query_text,
default_db=self.database,
default_schema=self.config.default_schema,
session_id=temp_row.session_id,
query_timestamp=temp_row.start_time,
ObservedQuery(
query=temp_row.query_text,
default_db=self.database,
default_schema=self.config.default_schema,
session_id=temp_row.session_id,
timestamp=temp_row.start_time,
),
# The "temp table" query actually returns all CREATE TABLE statements, even if they
# aren't explicitly a temp table. As such, setting is_known_temp_table=True
# would not be correct. We already have mechanisms to autodetect temp tables,
Expand Down Expand Up @@ -263,11 +266,13 @@ def _process_sql_parser_lineage(self, lineage_row: LineageRow) -> None:
# TODO actor

self.aggregator.add_observed_query(
query=ddl,
default_db=self.database,
default_schema=self.config.default_schema,
query_timestamp=lineage_row.timestamp,
session_id=lineage_row.session_id,
ObservedQuery(
query=ddl,
default_db=self.database,
default_schema=self.config.default_schema,
timestamp=lineage_row.timestamp,
session_id=lineage_row.session_id,
)
)

def _make_filtered_target(self, lineage_row: LineageRow) -> Optional[DatasetUrn]:
Expand Down
Empty file.
Loading

0 comments on commit bf1e915

Please sign in to comment.