Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Oct 23, 2024
2 parents c54cfd6 + e96323a commit ad6eb84
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 56 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ source:
client_email: "client_email"
client_id: "client_id"
private_key: "private_key"
dataset: "fivetran_log_dataset"
dataset: "fivetran_log_dataset"

# Optional - filter for certain connector names instead of ingesting everything.
# connector_patterns:
Expand All @@ -35,7 +35,7 @@ source:
# Optional -- A mapping of the connector's all sources to its database.
# sources_to_database:
# connector_id: source_db

# Optional -- This mapping is optional and only required to configure platform-instance for source
# A mapping of Fivetran connector id to data platform instance
# sources_to_platform_instance:
Expand Down
55 changes: 28 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,19 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
else:
paginator_response = paginator.paginate()

for page in paginator_response:
yield from page["DatabaseList"]
pattern = "DatabaseList"
if self.source_config.ignore_resource_links:
# exclude resource links by using a JMESPath conditional query against the TargetDatabase struct key
pattern += "[?!TargetDatabase]"

for database in paginator_response.search(pattern):
if self.source_config.database_pattern.allowed(database["Name"]):
yield database

def get_tables_from_database(self, database_name: str) -> Iterable[Dict]:
def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
paginator = self.glue_client.get_paginator("get_tables")
database_name = database["Name"]

if self.source_config.catalog_id:
paginator_response = paginator.paginate(
Expand All @@ -692,34 +699,28 @@ def get_tables_from_database(self, database_name: str) -> Iterable[Dict]:
else:
paginator_response = paginator.paginate(DatabaseName=database_name)

for page in paginator_response:
yield from page["TableList"]
for table in paginator_response.search("TableList"):
# if resource links are detected, re-use database names from the current catalog
# otherwise, external names are picked up instead of aliased ones when creating full table names later
# This will cause an incoherent situation when creating full table names later
# Note: use an explicit source_config check but it is useless actually (filtering has already been done)
if (
not self.source_config.ignore_resource_links
and "TargetDatabase" in database
):
table["DatabaseName"] = database["Name"]
yield table

def get_all_databases_and_tables(
self,
) -> Tuple[Dict, List[Dict]]:
all_databases = self.get_all_databases()

if self.source_config.ignore_resource_links:
all_databases = [
database
for database in all_databases
if "TargetDatabase" not in database
]

allowed_databases = {
database["Name"]: database
for database in all_databases
if self.source_config.database_pattern.allowed(database["Name"])
}

) -> Tuple[List[Mapping[str, Any]], List[Dict]]:
all_databases = [*self.get_all_databases()]
all_tables = [
table
for database_name in allowed_databases
for table in self.get_tables_from_database(database_name)
tables
for database in all_databases
for tables in self.get_tables_from_database(database)
]

return allowed_databases, all_tables
return all_databases, all_tables

def get_lineage_if_enabled(
self, mce: MetadataChangeEventClass
Expand Down Expand Up @@ -1039,7 +1040,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
databases, tables = self.get_all_databases_and_tables()

for database in databases.values():
for database in databases:
yield from self.gen_database_containers(database)

for table in tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
)
connector_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Filtering regex patterns for connector ids. "
"They're visible in the Fivetran UI under Connectors -> Setup -> Fivetran Connector ID.",
description="Filtering regex patterns for connector names.",
)
destination_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,21 @@ def _query(self, query: str) -> List[Dict]:
query = sqlglot.parse_one(query, dialect="snowflake").sql(
dialect=self.fivetran_log_config.destination_platform, pretty=True
)
logger.debug(f"Query : {query}")
logger.info(f"Executing query: {query}")
resp = self.engine.execute(query)
return [row for row in resp]

def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]:
def _get_column_lineage_metadata(
self, connector_ids: List[str]
) -> Dict[Tuple[str, str], List]:
"""
Returns dict of column lineage metadata with key as (<SOURCE_TABLE_ID>, <DESTINATION_TABLE_ID>)
"""
all_column_lineage = defaultdict(list)
column_lineage_result = self._query(
self.fivetran_log_query.get_column_lineage_query()
self.fivetran_log_query.get_column_lineage_query(
connector_ids=connector_ids
)
)
for column_lineage in column_lineage_result:
key = (
Expand All @@ -104,13 +108,13 @@ def _get_column_lineage_metadata(self) -> Dict[Tuple[str, str], List]:
all_column_lineage[key].append(column_lineage)
return dict(all_column_lineage)

def _get_table_lineage_metadata(self) -> Dict[str, List]:
def _get_table_lineage_metadata(self, connector_ids: List[str]) -> Dict[str, List]:
"""
Returns dict of table lineage metadata with key as 'CONNECTOR_ID'
"""
connectors_table_lineage_metadata = defaultdict(list)
table_lineage_result = self._query(
self.fivetran_log_query.get_table_lineage_query()
self.fivetran_log_query.get_table_lineage_query(connector_ids=connector_ids)
)
for table_lineage in table_lineage_result:
connectors_table_lineage_metadata[
Expand Down Expand Up @@ -224,8 +228,9 @@ def get_user_email(self, user_id: str) -> Optional[str]:
return self._get_users().get(user_id)

def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
connector_ids = [connector.connector_id for connector in connectors]
table_lineage_metadata = self._get_table_lineage_metadata(connector_ids)
column_lineage_metadata = self._get_column_lineage_metadata(connector_ids)
for connector in connectors:
connector.lineage = self._extract_connector_lineage(
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
Expand Down Expand Up @@ -254,20 +259,25 @@ def get_allowed_connectors_list(
logger.info("Fetching connector list")
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
connector_name = connector[Constant.CONNECTOR_NAME]
if not connector_patterns.allowed(connector_name):
report.report_connectors_dropped(connector_name)
continue
if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
if not destination_patterns.allowed(
destination_id := connector[Constant.DESTINATION_ID]
):
report.report_connectors_dropped(
f"{connector_name} (destination_id: {destination_id})"
)
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_name=connector_name,
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
destination_id=destination_id,
user_id=connector[Constant.CONNECTING_USER_ID],
lineage=[], # filled later
jobs=[], # filled later
Expand All @@ -279,6 +289,7 @@ def get_allowed_connectors_list(
# we push down connector id filters.
logger.info("No allowed connectors found")
return []
logger.info(f"Found {len(connectors)} allowed connectors")

with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
logger.info("Fetching connector lineage")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ def get_sync_logs_query(
ORDER BY connector_id, end_time DESC
"""

def get_table_lineage_query(self) -> str:
def get_table_lineage_query(self, connector_ids: List[str]) -> str:
# Format connector_ids as a comma-separated string of quoted IDs
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)

return f"""\
SELECT
stm.connector_id as connector_id,
Expand All @@ -95,11 +98,15 @@ def get_table_lineage_query(self) -> str:
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, tl.created_at DESC
"""

def get_column_lineage_query(self) -> str:
def get_column_lineage_query(self, connector_ids: List[str]) -> str:
# Format connector_ids as a comma-separated string of quoted IDs
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)

return f"""\
SELECT
scm.table_id as source_table_id,
Expand All @@ -114,6 +121,7 @@ def get_column_lineage_query(self) -> str:
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, cl.created_at DESC
"""
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def default_query_results(
return []
elif query == fivetran_log_query.get_connectors_query():
return connector_query_results
elif query == fivetran_log_query.get_table_lineage_query():
elif query == fivetran_log_query.get_table_lineage_query(
connector_ids=["calendar_elected"]
):
return [
{
"connector_id": "calendar_elected",
Expand All @@ -64,7 +66,9 @@ def default_query_results(
"destination_schema_name": "postgres_public",
},
]
elif query == fivetran_log_query.get_column_lineage_query():
elif query == fivetran_log_query.get_column_lineage_query(
connector_ids=["calendar_elected"]
):
return [
{
"source_table_id": "10040",
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/tests/unit/glue/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ def test_platform_config():
@pytest.mark.parametrize(
"ignore_resource_links, all_databases_and_tables_result",
[
(True, ({}, [])),
(False, ({"test-database": resource_link_database}, target_database_tables)),
(True, ([], [])),
(False, ([resource_link_database], target_database_tables)),
],
)
def test_ignore_resource_links(ignore_resource_links, all_databases_and_tables_result):
Expand All @@ -289,7 +289,7 @@ def test_ignore_resource_links(ignore_resource_links, all_databases_and_tables_r
glue_stubber.add_response(
"get_tables",
get_tables_response_for_target_database,
{"DatabaseName": "test-database"},
{"DatabaseName": "resource-link-test-database"},
)

assert source.get_all_databases_and_tables() == all_databases_and_tables_result
Expand Down
8 changes: 3 additions & 5 deletions metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from botocore.response import StreamingBody

resource_link_database = {
"Name": "test-database",
"Name": "resource-link-test-database",
"CreateTime": datetime.datetime(2021, 6, 9, 14, 14, 19),
"CreateTableDefaultPermissions": [],
"TargetDatabase": {"CatalogId": "432143214321", "DatabaseName": "test-database"},
Expand Down Expand Up @@ -92,10 +92,8 @@
},
]
}
databases_1 = {
"flights-database": {"Name": "flights-database", "CatalogId": "123412341234"}
}
databases_2 = {"test-database": {"Name": "test-database", "CatalogId": "123412341234"}}
databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}]
databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}]
tables_1 = [
{
"Name": "avro",
Expand Down

0 comments on commit ad6eb84

Please sign in to comment.