Skip to content

Commit

Permalink
[dagster-airlift] dagster-airlift proxy scaffold cli
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Oct 14, 2024
1 parent fcf5c5d commit bc2aaae
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 0 deletions.
30 changes: 30 additions & 0 deletions examples/experimental/dagster-airlift/dagster_airlift/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import click


@click.group()
def cli():
"""Dagster Airlift CLI. Commands for interacting with the Dagster-Airlift package."""
pass


@cli.group()
def proxy() -> None:
"""Commands for working with the Dagster-Airlift proxied state. Requires the `dagster-airlift[in-airflow]` package."""
try:
import dagster_airlift.in_airflow # noqa
except:
raise Exception(
"dagster-airlift[in-airflow] must be installed in the environment to use any `dagster-airlift proxy` commands."
)


@proxy.command()
def scaffold():
"""Scaffolds a proxied state folder for the current Airflow installation. Goes in the airflow dags folder."""
from dagster_airlift.in_airflow.scaffolding import scaffold_proxied_state

scaffold_proxied_state()


if __name__ == "__main__":
cli()
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
from pathlib import Path
from typing import TYPE_CHECKING, Mapping

import yaml

if TYPE_CHECKING:
from airflow import DAG


def verify_airflow_home_set() -> None:
if not os.getenv("AIRFLOW_HOME"):
raise Exception(
"AIRFLOW_HOME not set. Please set AIRFLOW_HOME to the root of your Airflow installation."
)


def get_airflow_dags_folder() -> Path:
# Keep import within fxn to ensure that we perform necessary verification steps first.
from airflow.configuration import conf

return Path(conf.get("core", "dags_folder"))


def get_all_dags() -> Mapping[str, "DAG"]:
from airflow.models import DagBag

return DagBag().dags


def scaffold_proxied_state() -> None:
"""Scaffolds a proxied state folder for the current Airflow installation.
Each proxied state is marked as False.
"""
verify_airflow_home_set()

proxied_state_dir = get_airflow_dags_folder() / "proxied_state"
if proxied_state_dir.exists():
raise Exception(
f"Proxied state directory already exists at {proxied_state_dir}. Please remove this directory before scaffolding."
)
for dag_id, dag in get_all_dags().items():
proxied_state_file = proxied_state_dir / f"{dag_id}.yaml"
proxied_state_file.parent.mkdir(parents=True, exist_ok=True)
tasks_in_alphabetical_order = sorted(dag.tasks, key=lambda task: task.task_id)
proxied_state = {
"tasks": [
{"id": task.task_id, "proxied": False} for task in tasks_in_alphabetical_order
]
}
with open(proxied_state_file, "w") as f:
yaml.dump(proxied_state, f)
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
make_task_info as make_task_info,
make_task_instance as make_task_instance,
)
from .test_utils import (
airlift_root as airlift_root,
configured_airflow_home as configured_airflow_home,
remove_airflow_home_remnants as remove_airflow_home_remnants,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import Generator

from dagster._core.test_utils import environ


def airlift_root() -> Path:
return Path(__file__).parent.parent.parent


def remove_airflow_home_remnants(airflow_home: Path) -> None:
logs_path = airflow_home / "logs"
cfg_path = airflow_home / "airflow.cfg"
db_path = airflow_home / "airflow.db"
proxied_state_path = airflow_home / "airflow_dags" / "proxied_state"

subprocess.check_output(
["rm", "-rf", str(logs_path), str(cfg_path), str(db_path), str(proxied_state_path)]
)


def airflow_cfg_script_path() -> Path:
return airlift_root() / "scripts" / "airflow_setup.sh"


def chmod_script(script_path: Path) -> None:
subprocess.check_output(["chmod", "+x", str(script_path)])


@contextmanager
def configured_airflow_home(airflow_home: Path) -> Generator[None, None, None]:
path_to_dags = airflow_home / "airflow_dags"
with environ({"AIRFLOW_HOME": str(airflow_home)}):
# Start by removing old cruft if it exists
remove_airflow_home_remnants(airflow_home)
# Scaffold the airflow configuration file.
chmod_script(airflow_cfg_script_path())
subprocess.check_output([str(airflow_cfg_script_path()), str(path_to_dags)])
yield
# Clean up after ourselves.
remove_airflow_home_remnants(airflow_home)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator


def print_hello() -> None:
print("Hello") # noqa: T201


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 0,
}


def make_print_dag(dag_id: str) -> DAG:
with DAG(
dag_id,
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
) as dag:
PythonOperator(task_id="print_task", python_callable=print_hello) >> PythonOperator(
task_id="downstream_print_task", python_callable=print_hello
) # type: ignore
return dag


print_dag = make_print_dag("print_dag")
other_print_dag = make_print_dag("other_print_dag")
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import subprocess
from pathlib import Path

import yaml
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
from dagster_airlift.test import airlift_root, configured_airflow_home


def update_script_path() -> Path:
return airlift_root() / "scripts" / "airflow_setup.sh"


def path_to_test_proj() -> Path:
return Path(__file__).parent / "scaffold_test_airflow_home"


def run_scaffold_script() -> None:
subprocess.check_output(["dagster-airlift", "proxy", "scaffold"])


def expected_yaml() -> str:
return """
tasks:
- id: downstream_print_task
proxied: false
- id: print_task
proxied: false
"""


def test_scaffold_proxied_state() -> None:
"""Test scaffold proxied state under different scenarios."""
# No AIRFLOW_HOME set.

# Airflow home set but a proxied_state directory already exists.

# Airflow home set and no proxied_state directory exists.
with configured_airflow_home(path_to_test_proj()):
run_scaffold_script()
path_to_proxied_state = path_to_test_proj() / "airflow_dags" / "proxied_state"
assert path_to_proxied_state.exists()
symbols = list(path_to_proxied_state.iterdir())
# Check that path contents are two files, one for each dag.
assert len(list(symbols)) == 2
assert all([symbol.is_file() for symbol in symbols])
assert all([symbol.suffix == ".yaml" for symbol in symbols])
assert {symbol.name for symbol in symbols} == {"print_dag.yaml", "other_print_dag.yaml"}
print_dag_yaml = path_to_proxied_state / "print_dag.yaml"
assert yaml.safe_load(print_dag_yaml.read_text()) == yaml.safe_load(expected_yaml())
other_print_dag_yaml = path_to_proxied_state / "other_print_dag.yaml"
assert yaml.safe_load(other_print_dag_yaml.read_text()) == yaml.safe_load(expected_yaml())
proxied_state = load_proxied_state_from_yaml(path_to_proxied_state)
assert (
proxied_state.get_task_proxied_state(dag_id="print_dag", task_id="print_task") is False
)
assert (
proxied_state.get_task_proxied_state(
dag_id="print_dag", task_id="downstream_print_task"
)
is False
)
assert (
proxied_state.get_task_proxied_state(dag_id="other_print_dag", task_id="print_task")
is False
)
assert (
proxied_state.get_task_proxied_state(
dag_id="other_print_dag", task_id="downstream_print_task"
)
is False
)
6 changes: 6 additions & 0 deletions examples/experimental/dagster-airlift/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def get_version() -> str:
"Operating System :: OS Independent",
],
packages=find_packages(exclude=["dagster_airlift_tests*", "examples*"]),
requires=["click"],
extras_require={
"core": [
f"dagster{pin}",
Expand All @@ -81,5 +82,10 @@ def get_version() -> str:
*AIRFLOW_REQUIREMENTS,
],
},
entry_points={
"console_scripts": [
"dagster-airlift = dagster_airlift.cli:cli",
]
},
zip_safe=False,
)

0 comments on commit bc2aaae

Please sign in to comment.