From ca7b2e7c161ca39e4653c526858cd3980e294455 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Thu, 17 Oct 2024 10:14:18 -0700 Subject: [PATCH] [dagster-airlift] Multi code locations working --- .../dagster_airlift/core/load_defs.py | 23 +++-- .../core/serialization/compute.py | 18 ++-- .../core_tests/test_airflow_asset_mapping.py | 1 + .../examples/kitchen-sink/Makefile | 4 + .../airflow_dags/multi_location_dags.py | 41 +++++++++ .../dag_first_code_location.yaml | 3 + .../dag_second_code_location.yaml | 3 + .../{dagster_defs => }/airflow_instance.py | 0 .../{dagster_defs => }/constants.py | 0 .../dagster_defs/automapped_defs.py | 2 +- .../kitchen_sink/dagster_defs/mapped_defs.py | 2 +- .../dagster_defs/observation_defs.py | 2 +- .../dagster_multi_code_locations/__init__.py | 0 .../first_dag_defs.py | 17 ++++ .../second_dag_defs.py | 17 ++++ .../workspace.yaml | 7 ++ .../integration_tests/conftest.py | 85 ++++++++++++++++++- .../integration_tests/test_e2e_automapped.py | 54 +++--------- .../integration_tests/test_e2e_mapped.py | 84 +++--------------- .../test_e2e_multi_code_location.py | 37 ++++++++ .../integration_tests/test_e2e_observation.py | 2 +- 21 files changed, 273 insertions(+), 129 deletions(-) create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_first_code_location.yaml create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_second_code_location.yaml rename examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/{dagster_defs => }/airflow_instance.py (100%) rename examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/{dagster_defs => }/constants.py (100%) create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/__init__.py create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/workspace.yaml create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_multi_code_location.py diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index 86b816fbef4c7..d6467082ecd8c 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Iterable, Iterator, Optional +from typing import Callable, Iterable, Iterator, Optional from dagster import ( AssetsDefinition, @@ -20,13 +20,16 @@ DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, build_airflow_polling_sensor_defs, ) -from dagster_airlift.core.serialization.compute import compute_serialized_data +from dagster_airlift.core.serialization.compute import DagSelectorFn, compute_serialized_data from dagster_airlift.core.serialization.defs_construction import ( construct_automapped_dag_assets_defs, construct_dag_assets_defs, get_airflow_data_to_spec_mapper, ) -from dagster_airlift.core.serialization.serialized_data import SerializedAirflowDefinitionsData +from dagster_airlift.core.serialization.serialized_data import ( + DagInfo, + SerializedAirflowDefinitionsData, +) from dagster_airlift.core.utils import get_metadata_key @@ -35,6 +38,7 @@ class AirflowInstanceDefsLoader(StateBackedDefinitionsLoader[SerializedAirflowDe airflow_instance: AirflowInstance explicit_defs: Definitions sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS + dag_selector_fn: Optional[Callable[[DagInfo], bool]] = None @property def defs_key(self) -> str: @@ -42,7 +46,9 @@ def defs_key(self) -> str: def fetch_state(self) -> SerializedAirflowDefinitionsData: return compute_serialized_data( - airflow_instance=self.airflow_instance, defs=self.explicit_defs + airflow_instance=self.airflow_instance, + defs=self.explicit_defs, + dag_selector_fn=self.dag_selector_fn, ) def defs_from_state( @@ -58,10 +64,12 @@ def build_airflow_mapped_defs( *, airflow_instance: AirflowInstance, defs: Optional[Definitions] = None, + dag_selector_fn: Optional[DagSelectorFn] = None, ) -> Definitions: return AirflowInstanceDefsLoader( airflow_instance=airflow_instance, explicit_defs=defs or Definitions(), + dag_selector_fn=dag_selector_fn, ).build_defs() @@ -72,8 +80,11 @@ def build_defs_from_airflow_instance( defs: Optional[Definitions] = None, sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, + dag_selector_fn: Optional[DagSelectorFn] = None, ) -> Definitions: - mapped_defs = build_airflow_mapped_defs(airflow_instance=airflow_instance, defs=defs) + mapped_defs = build_airflow_mapped_defs( + airflow_instance=airflow_instance, defs=defs, dag_selector_fn=dag_selector_fn + ) return Definitions.merge( mapped_defs, build_airflow_polling_sensor_defs( @@ -97,7 +108,7 @@ def defs_key(self) -> str: def fetch_state(self) -> SerializedAirflowDefinitionsData: return compute_serialized_data( - airflow_instance=self.airflow_instance, defs=self.explicit_defs + airflow_instance=self.airflow_instance, defs=self.explicit_defs, dag_selector_fn=None ) def defs_from_state( diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py index 182ffa34b54b8..81b6666ab2636 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/serialization/compute.py @@ -1,6 +1,6 @@ from collections import defaultdict from functools import cached_property -from typing import AbstractSet, Dict, List, Set +from typing import AbstractSet, Callable, Dict, List, Optional, Set from dagster import AssetKey, AssetSpec, Definitions from dagster._record import record @@ -24,6 +24,8 @@ task_handles_for_spec, ) +DagSelectorFn = Callable[[DagInfo], bool] + @record class AirliftMetadataMappingInfo: @@ -130,9 +132,15 @@ def all_mapped_dags(self) -> Dict[AssetKey, AbstractSet[DagHandle]]: def fetch_all_airflow_data( - airflow_instance: AirflowInstance, mapping_info: AirliftMetadataMappingInfo + airflow_instance: AirflowInstance, + mapping_info: AirliftMetadataMappingInfo, + dag_selector_fn: Optional[DagSelectorFn], ) -> FetchedAirflowData: - dag_infos = {dag.dag_id: dag for dag in airflow_instance.list_dags()} + dag_infos = { + dag.dag_id: dag + for dag in airflow_instance.list_dags() + if dag_selector_fn is None or dag_selector_fn(dag) + } task_info_map = defaultdict(dict) for dag_id in dag_infos: task_info_map[dag_id] = { @@ -148,10 +156,10 @@ def fetch_all_airflow_data( def compute_serialized_data( - airflow_instance: AirflowInstance, defs: Definitions + airflow_instance: AirflowInstance, defs: Definitions, dag_selector_fn: Optional[DagSelectorFn] ) -> "SerializedAirflowDefinitionsData": mapping_info = build_airlift_metadata_mapping_info(defs) - fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info) + fetched_airflow_data = fetch_all_airflow_data(airflow_instance, mapping_info, dag_selector_fn) return SerializedAirflowDefinitionsData( instance_name=airflow_instance.name, key_scoped_task_handles=[ diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py index 60d8ad74502d1..57eb1a0256976 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_airflow_asset_mapping.py @@ -192,6 +192,7 @@ def test_produce_fetched_airflow_data() -> None: fetched_airflow_data = fetch_all_airflow_data( airflow_instance=instance, mapping_info=mapping_info, + dag_selector_fn=None, ) assert len(fetched_airflow_data.mapping_info.mapped_task_asset_specs) == 1 diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile b/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile index e035388dc33ff..5681cf376d943 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile @@ -36,6 +36,10 @@ run_dagster_automapped: run_observation_defs: dagster dev -m kitchen_sink.dagster_defs.observation_defs -p 3333 +# Command to point at a workspace.yaml +run_dagster_multi_code_locations: + dagster dev -w $(MAKEFILE_DIR)/kitchen_sink/dagster_multi_code_locations/workspace.yaml -p 3333 + wipe: ## Wipe out all the files created by the Makefile rm -rf $(AIRFLOW_HOME) $(DAGSTER_HOME) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py new file mode 100644 index 0000000000000..d122797a9923b --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/multi_location_dags.py @@ -0,0 +1,41 @@ +from datetime import datetime +from pathlib import Path + +from airflow import DAG +from airflow.operators.python import PythonOperator +from dagster_airlift.in_airflow import proxying_to_dagster +from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml + + +def print_hello() -> None: + print("Hello") # noqa: T201 + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 1, 1), + "retries": 0, +} + +with DAG( + "dag_first_code_location", + default_args=default_args, + schedule_interval=None, + is_paused_upon_creation=False, +) as first_dag: + PythonOperator(task_id="task", python_callable=print_hello) + +with DAG( + "dag_second_code_location", + default_args=default_args, + schedule_interval=None, + is_paused_upon_creation=False, +) as second_dag: + PythonOperator(task_id="task", python_callable=print_hello) + + +proxying_to_dagster( + proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), + global_vars=globals(), +) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_first_code_location.yaml b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_first_code_location.yaml new file mode 100644 index 0000000000000..03068f4fcf047 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_first_code_location.yaml @@ -0,0 +1,3 @@ +tasks: + - id: task + proxied: False \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_second_code_location.yaml b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_second_code_location.yaml new file mode 100644 index 0000000000000..03068f4fcf047 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/proxied_state/dag_second_code_location.yaml @@ -0,0 +1,3 @@ +tasks: + - id: task + proxied: False \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/airflow_instance.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_instance.py similarity index 100% rename from examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/airflow_instance.py rename to examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_instance.py diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/constants.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/constants.py similarity index 100% rename from examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/constants.py rename to examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/constants.py diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/automapped_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/automapped_defs.py index f74c71c7399b3..467f3849118d4 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/automapped_defs.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/automapped_defs.py @@ -2,7 +2,7 @@ from dagster_airlift.core import dag_defs, task_defs from dagster_airlift.core.load_defs import build_full_automapped_dags_from_airflow_instance -from .airflow_instance import local_airflow_instance +from ..airflow_instance import local_airflow_instance @asset diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py index c0bec5c60ebf5..724b0bd204238 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/mapped_defs.py @@ -14,7 +14,7 @@ ) from dagster_airlift.core.multiple_tasks import targeted_by_multiple_tasks -from .airflow_instance import local_airflow_instance +from ..airflow_instance import local_airflow_instance def make_print_asset(key: str) -> AssetsDefinition: diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py index 1e6ec86afbac9..45f5733cc9e29 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py @@ -15,7 +15,7 @@ task_defs, ) -from .airflow_instance import local_airflow_instance +from ..airflow_instance import local_airflow_instance def observations_from_materializations( diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/__init__.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py new file mode 100644 index 0000000000000..50842e707859c --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/first_dag_defs.py @@ -0,0 +1,17 @@ +from dagster import AssetSpec, Definitions +from dagster_airlift.core import assets_with_task_mappings, build_defs_from_airflow_instance + +from kitchen_sink.airflow_instance import local_airflow_instance + +defs = build_defs_from_airflow_instance( + airflow_instance=local_airflow_instance(), + defs=Definitions( + assets=assets_with_task_mappings( + dag_id="dag_first_code_location", + task_mappings={ + "task": [AssetSpec(key="dag_first_code_location__asset")], + }, + ), + ), + dag_selector_fn=lambda dag_info: dag_info.dag_id == "dag_first_code_location", +) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py new file mode 100644 index 0000000000000..aa8b12a422669 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/second_dag_defs.py @@ -0,0 +1,17 @@ +from dagster import AssetSpec, Definitions +from dagster_airlift.core import assets_with_task_mappings, build_defs_from_airflow_instance + +from kitchen_sink.airflow_instance import local_airflow_instance + +defs = build_defs_from_airflow_instance( + airflow_instance=local_airflow_instance(), + defs=Definitions( + assets=assets_with_task_mappings( + dag_id="dag_second_code_location", + task_mappings={ + "task": [AssetSpec(key="dag_second_code_location__asset")], + }, + ), + ), + dag_selector_fn=lambda dag_info: dag_info.dag_id == "dag_second_code_location", +) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/workspace.yaml b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/workspace.yaml new file mode 100644 index 0000000000000..e73a9a5b8b630 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_multi_code_locations/workspace.yaml @@ -0,0 +1,7 @@ +load_from: + - python_module: + module_name: kitchen_sink.dagster_multi_code_locations.first_dag_defs + location_name: first_dag_location + - python_module: + module_name: kitchen_sink.dagster_multi_code_locations.second_dag_defs + location_name: second_dag_location \ No newline at end of file diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/conftest.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/conftest.py index c7b2234e3af45..c27a4c35b1ca7 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/conftest.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/conftest.py @@ -1,11 +1,16 @@ import os import subprocess import time +from datetime import timedelta from pathlib import Path -from typing import Generator +from typing import Generator, List, Mapping, NamedTuple, Sequence, Union import pytest +from dagster import AssetKey, DagsterInstance +from dagster._core.events.log import EventLogEntry from dagster._core.test_utils import environ +from dagster._time import get_current_datetime +from dagster_airlift.constants import DAG_RUN_ID_TAG_KEY from dagster_airlift.core.airflow_instance import AirflowInstance from dagster_airlift.test.shared_fixtures import stand_up_airflow @@ -37,6 +42,11 @@ def airflow_home_fixture(local_env: None) -> Path: return Path(os.environ["AIRFLOW_HOME"]) +@pytest.fixture(name="dagster_home") +def dagster_home_fixture(local_env: None) -> str: + return os.environ["DAGSTER_HOME"] + + @pytest.fixture(name="airflow_instance") def airflow_instance_fixture(local_env: None) -> Generator[subprocess.Popen, None, None]: with stand_up_airflow( @@ -59,3 +69,76 @@ def poll_for_airflow_run_existence_and_completion( except Exception: time.sleep(0.1) continue + + +class ExpectedMat(NamedTuple): + asset_key: AssetKey + runs_in_dagster: bool + + +def poll_for_expected_mats( + af_instance: AirflowInstance, + expected_mats_per_dag: Mapping[str, Sequence[Union[ExpectedMat, AssetKey]]], +) -> None: + resolved_expected_mats_per_dag: Mapping[str, List[ExpectedMat]] = { + dag_id: [ + expected_mat + if isinstance(expected_mat, ExpectedMat) + else ExpectedMat(expected_mat, True) + for expected_mat in expected_mats + ] + for dag_id, expected_mats in expected_mats_per_dag.items() + } + for dag_id, expected_mats in resolved_expected_mats_per_dag.items(): + airflow_run_id = af_instance.trigger_dag(dag_id=dag_id) + af_instance.wait_for_run_completion(dag_id=dag_id, run_id=airflow_run_id, timeout=60) + dagster_instance = DagsterInstance.get() + + dag_asset_key = AssetKey([af_instance.name, "dag", dag_id]) + assert poll_for_materialization(dagster_instance, dag_asset_key) + + for expected_mat in expected_mats: + mat_event_log_entry = poll_for_materialization(dagster_instance, expected_mat.asset_key) + assert mat_event_log_entry.asset_materialization + assert mat_event_log_entry.asset_materialization.asset_key == expected_mat.asset_key + + assert mat_event_log_entry.asset_materialization + dagster_run_id = mat_event_log_entry.run_id + + all_materializations = dagster_instance.fetch_materializations( + records_filter=expected_mat.asset_key, limit=10 + ) + + assert all_materializations + + if expected_mat.runs_in_dagster: + assert dagster_run_id + dagster_run = dagster_instance.get_run_by_id(dagster_run_id) + assert dagster_run + run_ids = dagster_instance.get_run_ids() + assert ( + dagster_run + ), f"Could not find dagster run {dagster_run_id} All run_ids {run_ids}" + assert ( + DAG_RUN_ID_TAG_KEY in dagster_run.tags + ), f"Could not find dagster run tag: dagster_run.tags {dagster_run.tags}" + assert ( + dagster_run.tags[DAG_RUN_ID_TAG_KEY] == airflow_run_id + ), "dagster run tag does not match dag run id" + + +def poll_for_materialization( + dagster_instance: DagsterInstance, + asset_key: AssetKey, +) -> EventLogEntry: + start_time = get_current_datetime() + while get_current_datetime() - start_time < timedelta(seconds=30): + asset_materialization = dagster_instance.get_latest_materialization_event( + asset_key=asset_key + ) + + time.sleep(0.1) + if asset_materialization: + return asset_materialization + + raise Exception(f"Timeout waiting for materialization event on {asset_key}") diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_automapped.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_automapped.py index a639f449bef15..b2068aecb32c6 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_automapped.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_automapped.py @@ -1,17 +1,13 @@ -import os -from datetime import timedelta from typing import List import pytest -from dagster import AssetKey, DagsterInstance -from dagster._time import get_current_datetime +from dagster import AssetKey -from kitchen_sink_tests.integration_tests.conftest import makefile_dir - - -@pytest.fixture(name="dagster_home") -def dagster_home_fixture(local_env: None) -> str: - return os.environ["DAGSTER_HOME"] +from kitchen_sink_tests.integration_tests.conftest import ( + ExpectedMat, + makefile_dir, + poll_for_expected_mats, +) @pytest.fixture(name="dagster_dev_cmd") @@ -25,9 +21,12 @@ def ak(key: str) -> AssetKey: expected_mats_per_dag = { "print_dag": [ - AssetKey("the_print_asset"), - ak("my_airflow_instance/dag/print_dag/task/downstream_print_task"), - ak("my_airflow_instance/dag/print_dag/task/print_task"), + ExpectedMat(AssetKey("the_print_asset"), runs_in_dagster=False), + ExpectedMat( + ak("my_airflow_instance/dag/print_dag/task/downstream_print_task"), + runs_in_dagster=False, + ), + ExpectedMat(ak("my_airflow_instance/dag/print_dag/task/print_task"), runs_in_dagster=False), ], } @@ -38,32 +37,7 @@ def test_dagster_materializes( dagster_home: str, ) -> None: """Test that assets can load properly, and that materializations register.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() - - for dag_id, expected_asset_keys in expected_mats_per_dag.items(): - run_id = af_instance.trigger_dag(dag_id=dag_id) - af_instance.wait_for_run_completion(dag_id=dag_id, run_id=run_id, timeout=60) - dagster_instance = DagsterInstance.get() - start_time = get_current_datetime() - # First check to see that the dag asset materialization occured - dag_asset_key = AssetKey(["my_airflow_instance", "dag", dag_id]) - while get_current_datetime() - start_time < timedelta(seconds=30): - asset_materialization = dagster_instance.get_latest_materialization_event( - asset_key=dag_asset_key - ) - if asset_materialization: - break - - assert ( - asset_materialization - ), f"Timeout waiting for materialization event on {dag_id} with asset key {dag_asset_key}" - - # Then check that there are materialiations for the print_asset as well as each of the - # automapped tasks - for expected_asset_key in expected_asset_keys: - asset_materialization = dagster_instance.get_latest_materialization_event( - asset_key=expected_asset_key - ) - assert asset_materialization, f"Add did not materialize: {expected_asset_key}" + poll_for_expected_mats(af_instance, expected_mats_per_dag) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_mapped.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_mapped.py index a0943d9c238a3..1c758fae3d2d5 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_mapped.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_mapped.py @@ -1,95 +1,33 @@ -import os import time from datetime import timedelta -from typing import List, Mapping +from typing import List import pytest from dagster import AssetKey, DagsterInstance from dagster._core.definitions.metadata.metadata_value import JsonMetadataValue from dagster._core.events.log import EventLogEntry from dagster._time import get_current_datetime, get_current_datetime_midnight -from dagster_airlift.constants import DAG_RUN_ID_TAG_KEY -from dagster_airlift.core.airflow_instance import AirflowInstance from kitchen_sink_tests.integration_tests.conftest import ( makefile_dir, poll_for_airflow_run_existence_and_completion, + poll_for_expected_mats, + poll_for_materialization, ) -def poll_for_materialization( - dagster_instance: DagsterInstance, - asset_key: AssetKey, -) -> EventLogEntry: - start_time = get_current_datetime() - while get_current_datetime() - start_time < timedelta(seconds=30): - asset_materialization = dagster_instance.get_latest_materialization_event( - asset_key=asset_key - ) - - time.sleep(0.1) - if asset_materialization: - return asset_materialization - - raise Exception(f"Timeout waiting for materialization event on {asset_key}") - - -@pytest.fixture(name="dagster_home") -def dagster_home_fixture(local_env: None) -> str: - return os.environ["DAGSTER_HOME"] - - @pytest.fixture(name="dagster_dev_cmd") def dagster_dev_cmd_fixture() -> List[str]: return ["make", "run_dagster_mapped", "-C", str(makefile_dir())] -def poll_for_expected_mats( - af_instance: AirflowInstance, - expected_mats_per_dag: Mapping[str, List[AssetKey]], -) -> None: - for dag_id, expected_asset_keys in expected_mats_per_dag.items(): - airflow_run_id = af_instance.trigger_dag(dag_id=dag_id) - af_instance.wait_for_run_completion(dag_id=dag_id, run_id=airflow_run_id, timeout=60) - dagster_instance = DagsterInstance.get() - - dag_asset_key = AssetKey([af_instance.name, "dag", dag_id]) - assert poll_for_materialization(dagster_instance, dag_asset_key) - - for expected_asset_key in expected_asset_keys: - mat_event_log_entry = poll_for_materialization(dagster_instance, expected_asset_key) - assert mat_event_log_entry.asset_materialization - assert mat_event_log_entry.asset_materialization.asset_key == expected_asset_key - - assert mat_event_log_entry.asset_materialization - dagster_run_id = mat_event_log_entry.run_id - - all_materializations = dagster_instance.fetch_materializations( - records_filter=expected_asset_key, limit=10 - ) - - assert all_materializations - - assert dagster_run_id - dagster_run = dagster_instance.get_run_by_id(dagster_run_id) - assert dagster_run - run_ids = dagster_instance.get_run_ids() - assert dagster_run, f"Could not find dagster run {dagster_run_id} All run_ids {run_ids}" - assert ( - DAG_RUN_ID_TAG_KEY in dagster_run.tags - ), f"Could not find dagster run tag: dagster_run.tags {dagster_run.tags}" - assert ( - dagster_run.tags[DAG_RUN_ID_TAG_KEY] == airflow_run_id - ), "dagster run tag does not match dag run id" - - def test_migrated_dagster_print_materializes( airflow_instance: None, dagster_dev: None, dagster_home: str, ) -> None: """Test that assets can load properly, and that materializations register.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() @@ -121,7 +59,7 @@ def test_dagster_weekly_daily_materializes( it triggers both dags that target it, and ensure that two materializations register. """ - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() @@ -166,7 +104,7 @@ def test_migrated_overridden_dag_materializes( dagster_home: str, ) -> None: """Test that assets are properly materialized from an overridden dag.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() @@ -182,7 +120,7 @@ def test_custom_callback_behavior( dagster_home: str, ) -> None: """Test that custom callbacks to proxying_to_dagster are properly applied.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() @@ -212,7 +150,7 @@ def test_migrated_overridden_dag_custom_operator_materializes( dagster_home: str, ) -> None: """Test that assets are properly materialized from an overridden dag, and that the proxied task retains attributes from the custom operator.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() assert af_instance.get_task_info(dag_id="overridden_dag_custom_callback", task_id="OVERRIDDEN") @@ -229,7 +167,7 @@ def test_partitioned_observation( dagster_home: str, ) -> None: """Test that assets with time-window partitions get partitions mapped correctly onto their materializations.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() af_run_id = af_instance.trigger_dag( @@ -252,7 +190,7 @@ def test_assets_multiple_jobs_same_task( dagster_home: str, ) -> None: """Test the case where multiple assets within the same task have different jobs. Ensure we still materialize them correctly.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() assert af_instance.get_task_info(dag_id="overridden_dag_custom_callback", task_id="OVERRIDDEN") @@ -273,7 +211,7 @@ def test_partitioned_migrated( dagster_home: str, ) -> None: """Test that partitioned assets are properly materialized from a proxied task.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() af_instance.unpause_dag(dag_id="migrated_daily_interval_dag") diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_multi_code_location.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_multi_code_location.py new file mode 100644 index 0000000000000..a4be50e1bedb8 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_multi_code_location.py @@ -0,0 +1,37 @@ +from typing import List + +import pytest +from dagster import AssetKey + +from kitchen_sink_tests.integration_tests.conftest import ( + ExpectedMat, + makefile_dir, + poll_for_expected_mats, +) + + +@pytest.fixture(name="dagster_dev_cmd") +def dagster_dev_cmd_fixture() -> List[str]: + return ["make", "run_dagster_multi_code_locations", "-C", str(makefile_dir())] + + +def test_multiple_code_locations_materialize( + airflow_instance: None, + dagster_dev: None, + dagster_home: str, +) -> None: + """Test that assets can load properly, and that materializations register across multiple code locations.""" + from kitchen_sink.airflow_instance import local_airflow_instance + + af_instance = local_airflow_instance() + + expected_mats_per_dag = { + "dag_first_code_location": [ + ExpectedMat(AssetKey("dag_first_code_location__asset"), runs_in_dagster=False) + ], + "dag_second_code_location": [ + ExpectedMat(AssetKey("dag_first_code_location__asset"), runs_in_dagster=False) + ], + } + + poll_for_expected_mats(af_instance, expected_mats_per_dag) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py index 153fe4e5be14e..1dfb41f5771ef 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py @@ -53,7 +53,7 @@ def test_observation_defs_are_observed( dagster_home: str, ) -> None: """Test that assets can load properly, and that observations register.""" - from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + from kitchen_sink.airflow_instance import local_airflow_instance af_instance = local_airflow_instance() dag_ids = [dag_info.dag_id for dag_info in af_instance.list_dags()]