Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] [tutorial] Improve code snippet parsing in tutorial #25268

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from .base_asset_operator import BaseDagsterAssetsOperator as BaseDagsterAssetsOperator
from .dag_proxy_operator import (
BaseProxyDAGToDagsterOperator as BaseProxyDAGToDagsterOperator,
DefaultProxyDAGToDagsterOperator as DefaultProxyDAGToDagsterOperator,
)
from .proxying_fn import proxying_to_dagster as proxying_to_dagster
from .task_proxy_operator import (
BaseProxyTaskToDagsterOperator as BaseProxyTaskToDagsterOperator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N
assets_to_trigger_per_job[_build_dagster_job_identifier(asset_node)].append(
asset_node["assetKey"]["path"]
)
logger.debug(f"Found {len(assets_to_trigger_per_job)} jobs to trigger")
logger.info(f"Found {len(assets_to_trigger_per_job)} jobs to trigger")

triggered_runs = []
for job_identifier, asset_key_paths in assets_to_trigger_per_job.items():
logger.debug(f"Triggering run for {job_identifier} with assets {asset_key_paths}")
logger.info(f"Triggering run for {job_identifier} with assets {asset_key_paths}")
run_id = self.launch_dagster_run(
context,
session,
Expand All @@ -168,14 +168,14 @@ def launch_runs_for_task(self, context: Context, dag_id: str, task_id: str) -> N
continue
run_status = self.get_dagster_run_status(session, dagster_url, run_id)
if run_status in ["SUCCESS", "FAILURE", "CANCELED"]:
logger.debug(f"Run {run_id} completed with status {run_status}")
logger.info(f"Run {run_id} completed with status {run_status}")
completed_runs[run_id] = run_status
non_successful_runs = [
run_id for run_id, status in completed_runs.items() if status != "SUCCESS"
]
if non_successful_runs:
raise Exception(f"Runs {non_successful_runs} did not complete successfully.")
logger.debug("All runs completed successfully.")
logger.info("All runs completed successfully.")
return None

def execute(self, context: Context) -> Any:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from abc import abstractmethod
from typing import Any, Iterable, Mapping, Sequence

import requests
Expand All @@ -20,6 +21,11 @@ def filter_asset_nodes(
if matched_dag_id(asset_node, self.get_airflow_dag_id(context)):
yield asset_node

@classmethod
@abstractmethod
def build_from_dag(cls, dag: DAG) -> "BaseProxyDAGToDagsterOperator":
"""Builds a proxy operator from a DAG."""


class DefaultProxyDAGToDagsterOperator(BaseProxyDAGToDagsterOperator):
"""The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment.
Expand All @@ -32,12 +38,12 @@ def get_dagster_session(self, context: Context) -> requests.Session:
def get_dagster_url(self, context: Context) -> str:
return os.environ["DAGSTER_URL"]


def build_dag_level_proxied_task(dag: DAG) -> DefaultProxyDAGToDagsterOperator:
return DefaultProxyDAGToDagsterOperator(
task_id=f"DAGSTER_OVERRIDE_DAG_{dag.dag_id}",
dag=dag,
)
@classmethod
def build_from_dag(cls, dag: DAG) -> "DefaultProxyDAGToDagsterOperator":
return DefaultProxyDAGToDagsterOperator(
task_id=f"DAGSTER_OVERRIDE_DAG_{dag.dag_id}",
dag=dag,
)


def matched_dag_id(asset_node: Mapping[str, Any], dag_id: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from airflow.models import BaseOperator, Variable
from airflow.utils.session import create_session

from dagster_airlift.in_airflow.dag_proxy_operator import build_dag_level_proxied_task
from dagster_airlift.in_airflow.dag_proxy_operator import (
BaseProxyDAGToDagsterOperator,
DefaultProxyDAGToDagsterOperator,
)
from dagster_airlift.in_airflow.proxied_state import AirflowProxiedState, DagProxiedState
from dagster_airlift.in_airflow.task_proxy_operator import (
BaseProxyTaskToDagsterOperator,
Expand All @@ -23,6 +26,9 @@ def proxying_to_dagster(
build_from_task_fn: Callable[
[BaseOperator], BaseProxyTaskToDagsterOperator
] = DefaultProxyTaskToDagsterOperator.build_from_task,
build_from_dag_fn: Callable[
[DAG], BaseProxyDAGToDagsterOperator
] = DefaultProxyDAGToDagsterOperator.build_from_dag,
) -> None:
"""Uses passed-in dictionary to alter dags and tasks to proxy to dagster.
Uses a proxied dictionary to determine the proxied status for each task within each dag.
Expand Down Expand Up @@ -81,7 +87,7 @@ def proxying_to_dagster(
dag.tags = [*dag.tags, "Dag overriden to Dagster"]
dag.task_dict = {}
dag.task_group.children = {}
override_task = build_dag_level_proxied_task(dag)
override_task = build_from_dag_fn(dag)
dag.task_dict[override_task.task_id] = override_task
for dag in task_level_proxying_dags:
logger.debug(f"Tagging dag {dag.dag_id} as proxied.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def scaffold_proxied_state(logger: Any) -> None:
)
logger.info(f"Scaffolding proxied state directory at {proxied_state_dir}")
for dag_id, dag in get_all_dags().items():
logger.info(f"Scaffolding proxied state for dag {dag_id}")
proxied_state_file = proxied_state_dir / f"{dag_id}.yaml"
proxied_state_file.parent.mkdir(parents=True, exist_ok=True)
tasks_in_alphabetical_order = sorted(dag.tasks, key=lambda task: task.task_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ def chmod_script(script_path: Path) -> None:
def configured_airflow_home(airflow_home: Path) -> Generator[None, None, None]:
path_to_dags = airflow_home / "airflow_dags"
with environ({"AIRFLOW_HOME": str(airflow_home)}):
# Start by removing old cruft if it exists
remove_airflow_home_remnants(airflow_home)
# Scaffold the airflow configuration file.
chmod_script(airflow_cfg_script_path())
subprocess.check_output([str(airflow_cfg_script_path()), str(path_to_dags)])
yield
# Clean up after ourselves.
remove_airflow_home_remnants(airflow_home)
try:
# Start by removing old cruft if it exists
remove_airflow_home_remnants(airflow_home)
# Scaffold the airflow configuration file.
chmod_script(airflow_cfg_script_path())
subprocess.check_output([str(airflow_cfg_script_path()), str(path_to_dags)])
yield
finally:
# Clean up after ourselves.
remove_airflow_home_remnants(airflow_home)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def path_to_test_proj() -> Path:


def run_scaffold_script() -> None:
subprocess.check_output(["dagster-airlift", "proxy", "scaffold"])
subprocess.run(["dagster-airlift", "proxy", "scaffold"], check=False)


def expected_yaml() -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from datetime import datetime
from pathlib import Path

from airflow import DAG
from airflow.operators.python import PythonOperator
from dagster_airlift.in_airflow import DefaultProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


def print_hello() -> None:
print("Hello") # noqa: T201


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
}


with DAG(
"overridden_dag_custom_callback",
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
) as dag:
PythonOperator(task_id="print_task", python_callable=print_hello) << PythonOperator(
task_id="downstream_print_task", python_callable=print_hello
) # type: ignore


class CustomProxyDagToDagsterOperator(DefaultProxyDAGToDagsterOperator):
@classmethod
def build_from_dag(cls, dag):
return CustomProxyDagToDagsterOperator(dag=dag, task_id="OVERRIDDEN")


proxying_to_dagster(
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
global_vars=globals(),
build_from_dag_fn=CustomProxyDagToDagsterOperator.build_from_dag,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
proxied: True
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def asset_two() -> None:
print("Materialized asset two")


@asset(description="Materialized by overridden_dag_custom_callback")
def asset_overridden_dag_custom_callback() -> None:
print("Materialized by overridden_dag_custom_callback")


def build_mapped_defs() -> Definitions:
return build_defs_from_airflow_instance(
airflow_instance=local_airflow_instance(),
Expand Down Expand Up @@ -73,6 +78,11 @@ def build_mapped_defs() -> Definitions:
Definitions(assets=[make_print_asset("unaffected_dag__another_print_asset")]),
),
),
Definitions(
assets_with_dag_mappings(
{"overridden_dag_custom_callback": [asset_overridden_dag_custom_callback]}
)
),
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,20 @@ def test_custom_callback_behavior(
assert affected_print_task.metadata["retries"] == 1
unaffected_print_task = af_instance.get_task_info(dag_id="unaffected_dag", task_id=task_id)
assert unaffected_print_task.metadata["retries"] == 0


def test_migrated_overridden_dag_custom_operator_materializes(
airflow_instance: None,
dagster_dev: None,
dagster_home: str,
) -> None:
"""Test that assets are properly materialized from an overridden dag, and that the proxied task retains attributes from the custom operator."""
from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance

af_instance = local_airflow_instance()
assert af_instance.get_task_info(dag_id="overridden_dag_custom_callback", task_id="OVERRIDDEN")

expected_mats_per_dag = {
"overridden_dag_custom_callback": [AssetKey("asset_overridden_dag_custom_callback")],
}
poll_for_expected_mats(af_instance, expected_mats_per_dag)
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,5 @@ dagster_run:


update_readme_snippets:
python ../../scripts/update_readme_snippets.py \
$(MAKEFILE_DIR)/README.md \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/peer.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/observe.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/migrate.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/standalone.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/migrate_with_check.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/peer_with_check.py
python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md

Loading