Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Sep 18, 2024
2 parents 617c915 + dea2ef0 commit 49ffe5f
Show file tree
Hide file tree
Showing 120 changed files with 5,321 additions and 3,549 deletions.
2 changes: 1 addition & 1 deletion datahub-web-react/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
"less": "^4.2.0",
"prettier": "^2.8.8",
"source-map-explorer": "^2.5.2",
"vite": "^4.5.3",
"vite": "^4.5.5",
"vite-plugin-babel-macros": "^1.0.6",
"vite-plugin-static-copy": "^0.17.0",
"vite-plugin-svgr": "^4.1.0",
Expand Down
8 changes: 4 additions & 4 deletions datahub-web-react/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10610,10 +10610,10 @@ vite-plugin-svgr@^4.1.0:
"@svgr/core" "^8.1.0"
"@svgr/plugin-jsx" "^8.1.0"

"vite@^3.0.0 || ^4.0.0 || ^5.0.0-0", "vite@^3.1.0 || ^4.0.0 || ^5.0.0-0", vite@^4.5.3:
version "4.5.3"
resolved "https://registry.yarnpkg.com/vite/-/vite-4.5.3.tgz#d88a4529ea58bae97294c7e2e6f0eab39a50fb1a"
integrity sha512-kQL23kMeX92v3ph7IauVkXkikdDRsYMGTVl5KY2E9OY4ONLvkHf04MDTbnfo6NKxZiDLWzVpP5oTa8hQD8U3dg==
"vite@^3.0.0 || ^4.0.0 || ^5.0.0-0", "vite@^3.1.0 || ^4.0.0 || ^5.0.0-0", vite@^4.5.5:
version "4.5.5"
resolved "https://registry.yarnpkg.com/vite/-/vite-4.5.5.tgz#639b9feca5c0a3bfe3c60cb630ef28bf219d742e"
integrity sha512-ifW3Lb2sMdX+WU91s3R0FyQlAyLxOzCSCP37ujw0+r5POeHPwe6udWVIElKQq8gk3t7b8rkmvqC6IHBpCff4GQ==
dependencies:
esbuild "^0.18.10"
postcss "^8.4.27"
Expand Down
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #11313 - `datahub get` will no longer return a key aspect for entities that don't exist.
- #11369 - The default datahub-rest sink mode has been changed to `ASYNC_BATCH`. This requires a server with version 0.14.0+.
- #11214 Container properties aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects.

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from avrogen.dict_wrapper import DictWrapper
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent

from datahub_airflow_plugin.hooks.datahub import (
Expand Down Expand Up @@ -45,11 +47,13 @@ class DatahubEmitterOperator(DatahubBaseOperator):
:type datahub_conn_id: str
"""

template_fields = ["metadata"]

# See above for why these mypy type issues are ignored here.
@apply_defaults # type: ignore[misc]
def __init__( # type: ignore[no-untyped-def]
self,
mces: List[MetadataChangeEvent],
mces: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]],
datahub_conn_id: str,
**kwargs,
):
Expand All @@ -59,5 +63,50 @@ def __init__( # type: ignore[no-untyped-def]
)
self.metadata = mces

def _render_template_fields(self, field_value, context, jinja_env):
if isinstance(field_value, DictWrapper):
for key, value in field_value.items():
setattr(
field_value,
key,
self._render_template_fields(value, context, jinja_env),
)
elif isinstance(field_value, list):
for item in field_value:
self._render_template_fields(item, context, jinja_env)
elif isinstance(field_value, str):
return super().render_template(field_value, context, jinja_env)
else:
return super().render_template(field_value, context, jinja_env)
return field_value

def execute(self, context):
if context:
jinja_env = self.get_template_env()

"""
The `_render_template_fields` method is called in the `execute` method to ensure that all template fields
are rendered with the current execution context, which includes runtime variables and other dynamic data,
is only available during the execution of the task.
The `render_template` method is not overridden because the `_render_template_fields` method is used to
handle the rendering of template fields recursively.
This approach allows for more granular control over how each field is rendered,
especially when dealing with complex data structures like `DictWrapper` and lists.
By not overriding `render_template`, the code leverages the existing functionality
provided by the base class while adding custom logic for specific cases.
"""
for item in self.metadata:
if isinstance(item, MetadataChangeProposalWrapper):
for key in item.__dict__.keys():
value = getattr(item, key)
setattr(
item,
key,
self._render_template_fields(value, context, jinja_env),
)
if isinstance(item, MetadataChangeEvent):
self._render_template_fields(item, context, jinja_env)

self.generic_hook.get_underlying_hook().emit(self.metadata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from datetime import datetime, timedelta

from airflow import DAG
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsV2Class,
DatasetPropertiesClass,
DatasetSnapshotClass,
)

from datahub_airflow_plugin.operators.datahub import DatahubEmitterOperator

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"execution_timeout": timedelta(minutes=5),
}


with DAG(
"datahub_emitter_operator_jinja_template_dag",
default_args=default_args,
description="An example dag with jinja template",
schedule_interval=None,
tags=["example_tag"],
catchup=False,
default_view="tree",
):
add_custom_properties = DatahubEmitterOperator(
task_id="datahub_emitter_operator_jinja_template_dag_task",
mces=[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.mcpw_example,DEV)",
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass("mcpw_example {{ ds }}")],
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.mcpw_example_{{ ts_nodash }},DEV)",
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass("mcpw_example {{ ds }}")],
),
),
MetadataChangeEvent(
proposedSnapshot=DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.lineage_example,DEV)",
aspects=[
DatasetPropertiesClass(
customProperties={"jinjaTemplate": "{{ ds }}"}
)
],
),
),
MetadataChangeEvent(
proposedSnapshot=DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:hive,datahub.example.lineage_example_{{ ts_nodash }},DEV)",
aspects=[
DatasetPropertiesClass(
customProperties={"jinjaTemplate": "{{ ds }}"}
)
],
),
),
],
datahub_conn_id="datahub_file_default",
)
Loading

0 comments on commit 49ffe5f

Please sign in to comment.