Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Oct 10, 2024
2 parents 38250ed + 771cc9f commit d018b95
Show file tree
Hide file tree
Showing 32 changed files with 1,379 additions and 52 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ jobs:
- python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt"
extra_pip_extras: plugin-v2
- python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.10.2 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt"
extra_pip_extras: plugin-v2
fail-fast: false
steps:
- name: Set up JDK 17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,13 @@ export const EntityProfile = <T, U>({
{showBrowseBar && <EntityProfileNavBar urn={urn} entityType={entityType} />}
{entityData?.status?.removed === true && (
<Alert
message="This entity is not discoverable via search or lineage graph. Contact your DataHub admin for more information."
message={
<>
This entity is marked as soft-deleted, likely due to stateful ingestion or a manual
deletion command, and will not appear in search or lineage graphs. Contact your DataHub
admin for more information.
</>
}
banner
/>
)}
Expand Down
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| render_templates | true | If true, jinja-templated fields will be automatically rendered to improve the accuracy of SQL statement extraction. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_long_description():
# We remain restrictive on the versions allowed here to prevent
# us from being broken by backwards-incompatible changes in the
# underlying package.
"openlineage-airflow>=1.2.0,<=1.18.0",
"openlineage-airflow>=1.2.0,<=1.22.0",
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,24 @@ class DatahubLineageConfig(ConfigModel):

capture_executions: bool = False

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

# Note that this field is only respected by the lineage backend.
# The Airflow plugin v2 behaves as if it were set to True.
graceful_exceptions: bool = True

# The remaining config fields are only relevant for the v2 plugin.
enable_extractors: bool = True

# If true, ti.render_templates() will be called in the listener.
# Makes extraction of jinja-templated fields more accurate.
render_templates: bool = True

log_level: Optional[str] = None
debug_emitter: bool = False

disable_openlineage_plugin: bool = True

# Note that this field is only respected by the lineage backend.
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down Expand Up @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
render_templates = conf.get("datahub", "render_templates", fallback=True)
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)
Expand All @@ -102,4 +108,5 @@ def get_lineage_config() -> DatahubLineageConfig:
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
render_templates=render_templates,
)
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ def on_task_instance_running(
f"DataHub listener got notification about task instance start for {task_instance.task_id}"
)

task_instance = _render_templates(task_instance)
if self.config.render_templates:
task_instance = _render_templates(task_instance)

# The type ignore is to placate mypy on Airflow 2.1.x.
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
Expand Down Expand Up @@ -478,7 +479,8 @@ def on_task_instance_finish(
) -> None:
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]

task_instance = _render_templates(task_instance)
if self.config.render_templates:
task_instance = _render_templates(task_instance)

# We must prefer the task attribute, in case modifications to the task's inlets/outlets
# were made by the execute() method.
Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion-modules/airflow-plugin/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# and then run "tox" from this directory.

[tox]
envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29
envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210

[testenv]
use_develop = true
Expand All @@ -20,6 +20,7 @@ deps =
airflow27: apache-airflow~=2.7.0
airflow28: apache-airflow~=2.8.0
airflow29: apache-airflow~=2.9.0
airflow210: apache-airflow~=2.10.0

# Respect the Airflow constraints files.
# We can't make ourselves work with the constraints of Airflow < 2.3.
Expand All @@ -30,6 +31,7 @@ deps =
py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt
py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt
py311-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt
py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt

# Before pinning to the constraint files, we previously left the dependencies
# more open. There were a number of packages for which this caused issues.
Expand Down Expand Up @@ -57,6 +59,6 @@ commands =
[testenv:py310-airflow24]
extras = dev,integration-tests,plugin-v2,test-airflow24

[testenv:py310-airflow{26,27,28},py311-airflow{29}]
[testenv:py310-airflow{26,27,28},py311-airflow{29,210}]
extras = dev,integration-tests,plugin-v2

1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/cli/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ def _get_config_from_env() -> Tuple[Optional[str], Optional[str]]:
return url or host, token


def require_config_from_env() -> Tuple[str, Optional[str]]:
host, token = _get_config_from_env()
if host is None:
raise MissingConfigError("No GMS host was provided in env variables.")
return host, token


def load_client_config() -> DatahubClientConfig:
gms_host_env, gms_token_env = _get_config_from_env()
if gms_host_env:
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def __init__(
):
if not gms_server:
raise ConfigurationError("gms server is required")
if gms_server == "__from_env__" and token is None:
# HACK: similar to what we do with system auth, we transparently
# inject the config in here. Ideally this should be done in the
# config loader or by the caller, but it gets the job done for now.
gms_server, token = config_utils.require_config_from_env()

self._gms_server = fixup_gms_url(gms_server)
self._token = token
self.server_config: Dict[str, Any] = {}
Expand Down
17 changes: 9 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,28 @@ def _get_generic(self, url: str, params: Optional[Dict] = None) -> Dict:
def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
return self._send_restli_request("POST", url, json=payload_dict)

def _make_rest_sink_config(self) -> "DatahubRestSinkConfig":
from datahub.ingestion.sink.datahub_rest import (
DatahubRestSinkConfig,
RestSinkMode,
)
def _make_rest_sink_config(
self, extra_config: Optional[Dict] = None
) -> "DatahubRestSinkConfig":
from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig

# This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter,
# but initializing the rest sink creates another rest emitter.
# TODO: We should refactor out the multithreading functionality of the sink
# into a separate class that can be used by both the sink and the graph client
# e.g. a DatahubBulkRestEmitter that both the sink and the graph client use.
return DatahubRestSinkConfig(**self.config.dict(), mode=RestSinkMode.ASYNC)
return DatahubRestSinkConfig(**self.config.dict(), **(extra_config or {}))

@contextlib.contextmanager
def make_rest_sink(
self, run_id: str = _GRAPH_DUMMY_RUN_ID
self,
run_id: str = _GRAPH_DUMMY_RUN_ID,
extra_sink_config: Optional[Dict] = None,
) -> Iterator["DatahubRestSink"]:
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.datahub_rest import DatahubRestSink

sink_config = self._make_rest_sink_config()
sink_config = self._make_rest_sink_config(extra_config=extra_sink_config)
with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink:
yield sink
if sink.report.failures:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class DataHubRestSinkReport(SinkReport):
gms_version: Optional[str] = None
pending_requests: int = 0

async_batches_prepared: int = 0
async_batches_split: int = 0

main_thread_blocking_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
Expand Down Expand Up @@ -260,6 +261,7 @@ def _emit_batch_wrapper(
events.append(event)

chunks = self.emitter.emit_mcps(events)
self.report.async_batches_prepared += 1
if chunks > 1:
self.report.async_batches_split += chunks
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,25 @@ def folder_ancestors(
fields: Union[str, List[str]] = ["id", "name", "parent_id"],
) -> Sequence[Folder]:
self.client_stats.folder_calls += 1
return self.client.folder_ancestors(
folder_id,
self.__fields_mapper(fields),
transport_options=self.transport_options,
)
try:
return self.client.folder_ancestors(
folder_id,
self.__fields_mapper(fields),
transport_options=self.transport_options,
)
except SDKError as e:
if "Looker Not Found (404)" in str(e):
# Folder ancestors not found
logger.info(
f"Could not find ancestors for folder with id {folder_id}: 404 error"
)
else:
logger.warning(
f"Could not find ancestors for folder with id {folder_id}"
)
logger.warning(f"Failure was {e}")
# Folder ancestors not found
return []

def all_connections(self):
self.client_stats.all_connections_calls += 1
Expand Down
114 changes: 114 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource
from datahub.utilities import config_clean

logger = logging.getLogger(__name__)


class PresetConfig(SupersetConfig):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default="", description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()

def login(self):
try:
login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to authenticate with Preset: {e}")
raise e

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)
# Test the connection
test_response = requests_session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
return requests_session
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}


Expand Down Expand Up @@ -75,7 +76,10 @@ def auto_stale_entity_removal(

if wu.is_primary_source:
entity_type = guess_entity_type(urn)
if entity_type is not None:
if (
entity_type is not None
and entity_type not in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
):
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)
Expand Down
Loading

0 comments on commit d018b95

Please sign in to comment.