Skip to content

Commit

Permalink
[rfc] replace_attributes(spec, ...) and merge_attributes(spec, ...) (#…
Browse files Browse the repository at this point in the history
…25094)

## Summary

Adds two new type-hinted methods to let users replace `AssetSpec`
attributes or merge new metadata, tag, owner, dep values. Shipped as
standalone methods for us to use internally, at first.

These methods pass through `dagster_internal_init`, so they correctly
post-process any coercible inputs (e.g. `AssetKey` to `AssetDep`).

```python
spec = AssetSpec(key="foo", owners=["[email protected]"])

spec_with_replaced_owner = replace_attributes(spec, owners=["[email protected]"])

spec_with_additional_owner = merge_attributes(spec, owners=["[email protected]"])
```

## Changelog

`NOCHANGELOG`
  • Loading branch information
benpankow authored Oct 18, 2024
1 parent f7d0b30 commit f7a57f3
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 0 deletions.
79 changes: 79 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,82 @@ def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec":
return self._replace(
metadata={**self.metadata, SYSTEM_METADATA_KEY_IO_MANAGER_KEY: io_manager_key}
)


def replace_attributes(
spec: AssetSpec,
*,
key: CoercibleToAssetKey = ...,
deps: Optional[Iterable["CoercibleToAssetDep"]] = ...,
description: Optional[str] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
skippable: bool = ...,
group_name: Optional[str] = ...,
code_version: Optional[str] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
automation_condition: Optional[AutomationCondition] = ...,
owners: Optional[Sequence[str]] = ...,
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
auto_materialize_policy: Optional[AutoMaterializePolicy] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in spec.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
return spec.dagster_internal_init(
key=key if key is not ... else spec.key,
deps=deps if deps is not ... else spec.deps,
description=description if description is not ... else spec.description,
metadata=metadata if metadata is not ... else spec.metadata,
skippable=skippable if skippable is not ... else spec.skippable,
group_name=group_name if group_name is not ... else spec.group_name,
code_version=code_version if code_version is not ... else spec.code_version,
freshness_policy=freshness_policy if freshness_policy is not ... else spec.freshness_policy,
automation_condition=automation_condition
if automation_condition is not ...
else spec.automation_condition,
owners=owners if owners is not ... else spec.owners,
tags=tags if tags is not ... else current_tags_without_kinds,
kinds=kinds if kinds is not ... else spec.kinds,
auto_materialize_policy=auto_materialize_policy
if auto_materialize_policy is not ...
else spec.auto_materialize_policy,
partitions_def=partitions_def if partitions_def is not ... else spec.partitions_def,
)


def merge_attributes(
spec: AssetSpec,
*,
deps: Iterable["CoercibleToAssetDep"] = ...,
metadata: Mapping[str, Any] = ...,
owners: Sequence[str] = ...,
tags: Mapping[str, str] = ...,
kinds: Set[str] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes merged with the current attributes."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in spec.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
return spec.dagster_internal_init(
key=spec.key,
deps=[*spec.deps, *(deps if deps is not ... else [])],
description=spec.description,
metadata={**spec.metadata, **(metadata if metadata is not ... else {})},
skippable=spec.skippable,
group_name=spec.group_name,
code_version=spec.code_version,
freshness_policy=spec.freshness_policy,
automation_condition=spec.automation_condition,
owners=[*spec.owners, *(owners if owners is not ... else [])],
tags={**current_tags_without_kinds, **(tags if tags is not ... else {})},
kinds={*spec.kinds, *(kinds if kinds is not ... else {})},
auto_materialize_policy=spec.auto_materialize_policy,
partitions_def=spec.partitions_def,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import pytest
from dagster import AssetSpec, AutoMaterializePolicy, AutomationCondition
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import merge_attributes, replace_attributes
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError


Expand Down Expand Up @@ -37,3 +40,106 @@ def test_resolve_automation_condition() -> None:
automation_condition=AutomationCondition.eager(),
auto_materialize_policy=AutoMaterializePolicy.eager(),
)


def test_replace_attributes_basic() -> None:
spec = AssetSpec(key="foo")
assert spec.key == AssetKey("foo")

new_spec = replace_attributes(spec, key="bar")
assert new_spec.key == AssetKey("bar")

spec_with_metadata = AssetSpec(key="foo", metadata={"foo": "bar"})
assert spec_with_metadata.metadata == {"foo": "bar"}

spec_with_replace_metadata = replace_attributes(spec_with_metadata, metadata={"bar": "baz"})
assert spec_with_replace_metadata.metadata == {"bar": "baz"}


def test_replace_attributes_kinds() -> None:
spec = AssetSpec(key="foo", kinds={"foo"}, tags={"a": "b"})
assert spec.kinds == {"foo"}
assert spec.tags == {"a": "b", "dagster/kind/foo": ""}

new_spec = replace_attributes(spec, kinds={"bar"}, tags={"c": "d"})
assert new_spec.kinds == {"bar"}
assert new_spec.tags == {"c": "d", "dagster/kind/bar": ""}

with pytest.raises(DagsterInvalidDefinitionError):
replace_attributes(spec, kinds={"a", "b", "c", "d", "e"})


def test_replace_attributes_deps_coercion() -> None:
spec = AssetSpec(key="foo", deps={AssetKey("bar")})
assert spec.deps == [AssetDep(AssetKey("bar"))]

new_spec = replace_attributes(spec, deps={AssetKey("baz")})
assert new_spec.deps == [AssetDep(AssetKey("baz"))]


def test_replace_attributes_group() -> None:
spec = AssetSpec(key="foo", group_name="group1")
assert spec.group_name == "group1"

new_spec = replace_attributes(spec, group_name="group2")
assert new_spec.group_name == "group2"

new_spec_no_group = replace_attributes(spec, group_name=None)
assert new_spec_no_group.group_name is None


def test_merge_attributes_metadata() -> None:
spec = AssetSpec(key="foo")
assert spec.key == AssetKey("foo")

new_spec = merge_attributes(spec, metadata={"bar": "baz"})
assert new_spec.key == AssetKey("foo")
assert new_spec.metadata == {"bar": "baz"}

spec_new_meta_key = merge_attributes(new_spec, metadata={"baz": "qux"})
assert spec_new_meta_key.metadata == {"bar": "baz", "baz": "qux"}

spec_replace_meta = merge_attributes(spec_new_meta_key, metadata={"bar": "qux"})
assert spec_replace_meta.metadata == {"bar": "qux", "baz": "qux"}


def test_merge_attributes_tags() -> None:
spec = AssetSpec(key="foo")
assert spec.key == AssetKey("foo")

new_spec = merge_attributes(spec, tags={"bar": "baz"})
assert new_spec.key == AssetKey("foo")
assert new_spec.tags == {"bar": "baz"}

spec_new_tags_key = merge_attributes(new_spec, tags={"baz": "qux"})
assert spec_new_tags_key.tags == {"bar": "baz", "baz": "qux"}

spec_replace_tags = merge_attributes(spec_new_tags_key, tags={"bar": "qux"})
assert spec_replace_tags.tags == {"bar": "qux", "baz": "qux"}


def test_merge_attributes_owners() -> None:
spec = AssetSpec(key="foo")
assert spec.key == AssetKey("foo")

new_spec = merge_attributes(spec, owners=["[email protected]"])
assert new_spec.key == AssetKey("foo")
assert new_spec.owners == ["[email protected]"]

spec_new_owner = merge_attributes(new_spec, owners=["[email protected]"])
assert spec_new_owner.owners == ["[email protected]", "[email protected]"]

with pytest.raises(DagsterInvalidDefinitionError):
merge_attributes(spec_new_owner, owners=["notvalid"])


def test_merge_attributes_deps() -> None:
spec = AssetSpec(key="foo")
assert spec.key == AssetKey("foo")

new_spec = merge_attributes(spec, deps={AssetKey("bar")})
assert new_spec.key == AssetKey("foo")
assert new_spec.deps == [AssetDep(AssetKey("bar"))]

spec_new_dep = merge_attributes(new_spec, deps={AssetKey("baz")})
assert spec_new_dep.deps == [AssetDep(AssetKey("bar")), AssetDep(AssetKey("baz"))]

0 comments on commit f7a57f3

Please sign in to comment.