Skip to content

Commit

Permalink
[dagster-airlift] Remove airflow variable parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Oct 16, 2024
1 parent f221567 commit dde0981
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Set

from airflow import DAG
from airflow.models import BaseOperator, Variable
from airflow.utils.session import create_session
from airflow.models import BaseOperator

from dagster_airlift.in_airflow.dag_proxy_operator import (
BaseProxyDAGToDagsterOperator,
DefaultProxyDAGToDagsterOperator,
)
from dagster_airlift.in_airflow.proxied_state import AirflowProxiedState, DagProxiedState
from dagster_airlift.in_airflow.proxied_state import AirflowProxiedState
from dagster_airlift.in_airflow.task_proxy_operator import (
BaseProxyTaskToDagsterOperator,
DefaultProxyTaskToDagsterOperator,
)
from dagster_airlift.utils import get_local_proxied_state_dir


def proxying_to_dagster(
Expand Down Expand Up @@ -91,7 +88,6 @@ def proxying_to_dagster(
dag.task_dict[override_task.task_id] = override_task
for dag in task_level_proxying_dags:
logger.debug(f"Tagging dag {dag.dag_id} as proxied.")
set_proxied_state_for_dag_if_changed(dag.dag_id, proxied_state.dags[dag.dag_id], logger)
proxied_state_for_dag = proxied_state.dags[dag.dag_id]
num_proxied_tasks = len(
[
Expand Down Expand Up @@ -132,36 +128,3 @@ def proxying_to_dagster(
logger.debug(f"Proxied tasks {proxied_tasks} in dag {dag.dag_id}.")
logging.debug(f"Proxied {len(task_level_proxying_dags)}.")
logging.debug(f"Completed switching proxied tasks to dagster{suffix}.")


def set_proxied_state_for_dag_if_changed(
dag_id: str, proxied_state: DagProxiedState, logger: logging.Logger
) -> None:
if get_local_proxied_state_dir():
logger.info(
"Executing in local mode. Not setting proxied state in airflow metadata database, and instead expect dagster to be pointed at proxied state via DAGSTER_AIRLIFT_PROXIED_STATE_DIR env var."
)
return
else:
prev_proxied_state = get_proxied_state_var_for_dag(dag_id)
if prev_proxied_state is None or prev_proxied_state != proxied_state:
logger.info(
f"Migration state for dag {dag_id} has changed. Setting proxied state in airflow metadata database via Variable."
)
set_proxied_state_var_for_dag(dag_id, proxied_state)


def get_proxied_state_var_for_dag(dag_id: str) -> Optional[DagProxiedState]:
proxied_var = Variable.get(f"{dag_id}_dagster_proxied_state", None)
if not proxied_var:
return None
return DagProxiedState.from_dict(json.loads(proxied_var))


def set_proxied_state_var_for_dag(dag_id: str, proxied_state: DagProxiedState) -> None:
with create_session() as session:
Variable.set(
key=f"{dag_id}_dagster_proxied_state",
value=json.dumps(proxied_state.to_dict()),
session=session,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from tempfile import TemporaryDirectory
from typing import Any, Callable, Generator, List, Optional

import mock
import pytest
import requests
from dagster._core.test_utils import environ
Expand Down Expand Up @@ -159,26 +158,3 @@ def setup_dagster(dagster_home: str, dagster_dev_cmd: List[str]) -> Generator[An
# MISCELLANEOUS FIXTURES
# Fixtures that are useful across contexts.
####################################################################################################

VAR_DICT = {}


def dummy_get_var(key: str, default: Any) -> Optional[str]:
return VAR_DICT.get(key, default)


def dummy_set_var(key: str, value: str, session: Any) -> None:
return VAR_DICT.update({key: value})


@pytest.fixture
def sqlite_backend():
with environ({"AIRFLOW__CORE__SQL_ALCHEMY_CONN": "sqlite://"}):
yield


@pytest.fixture
def mock_airflow_variable():
with mock.patch("airflow.models.Variable.get", side_effect=dummy_get_var):
with mock.patch("airflow.models.Variable.set", side_effect=dummy_set_var):
yield
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def configured_airflow_home(airflow_home: Path) -> Generator[None, None, None]:
remove_airflow_home_remnants(airflow_home)
# Scaffold the airflow configuration file.
chmod_script(airflow_cfg_script_path())
subprocess.check_output([str(airflow_cfg_script_path()), str(path_to_dags)])
subprocess.run([str(airflow_cfg_script_path()), str(path_to_dags)], check=False)
yield
finally:
# Clean up after ourselves.
Expand Down
10 changes: 0 additions & 10 deletions examples/experimental/dagster-airlift/dagster_airlift/utils.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from pathlib import Path
from typing import cast

import mock
Expand All @@ -17,7 +16,6 @@
sensor,
)
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.test_utils import environ
from dagster._serdes.serdes import deserialize_value
from dagster._utils.test.definitions import (
scoped_reconstruction_metadata,
Expand Down Expand Up @@ -46,7 +44,6 @@
from dagster_airlift.core.top_level_dag_def_api import assets_with_task_mappings
from dagster_airlift.core.utils import is_task_mapped_asset_spec, metadata_for_task_mapping
from dagster_airlift.test import make_instance
from dagster_airlift.utils import DAGSTER_AIRLIFT_PROXIED_STATE_DIR_ENV_VAR

from dagster_airlift_tests.unit_tests.conftest import (
assert_dependency_structure_in_assets,
Expand Down Expand Up @@ -313,23 +310,16 @@ def test_local_airflow_instance() -> None:
assert defs.assets
repo_def = defs.get_repository_def()

with environ(
{
DAGSTER_AIRLIFT_PROXIED_STATE_DIR_ENV_VAR: str(
Path(__file__).parent / "proxied_state_for_sqlite_test"
),
}
):
defs = load_definitions_airflow_asset_graph(
assets_per_task={
"dag": {"task": [("a", [])]},
},
create_assets_defs=True,
)
repo_def = defs.get_repository_def()
assert defs.assets
repo_def = defs.get_repository_def()
assert len(repo_def.assets_defs_by_key) == 2
defs = load_definitions_airflow_asset_graph(
assets_per_task={
"dag": {"task": [("a", [])]},
},
create_assets_defs=True,
)
repo_def = defs.get_repository_def()
assert defs.assets
repo_def = defs.get_repository_def()
assert len(repo_def.assets_defs_by_key) == 2


def test_cached_loading() -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,17 @@
import copy
import json

import pytest
from airflow.operators.python import PythonOperator
from dagster._core.test_utils import environ
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.base_asset_operator import BaseDagsterAssetsOperator
from dagster_airlift.in_airflow.proxied_state import AirflowProxiedState
from dagster_airlift.test.shared_fixtures import VAR_DICT
from dagster_airlift.utils import DAGSTER_AIRLIFT_PROXIED_STATE_DIR_ENV_VAR

from dagster_airlift_tests.unit_tests.in_airflow_tests.conftest import (
build_dags_dict_given_structure,
)


def test_proxying_to_dagster_local_dir_set(mock_airflow_variable: None, caplog) -> None:
"""Test that when a local directory variable is set, we don't use the variable."""
with environ({DAGSTER_AIRLIFT_PROXIED_STATE_DIR_ENV_VAR: "/tmp"}):
globals_fake = build_dags_dict_given_structure(
{
"dag": {"task": []},
}
)

proxying_to_dagster(
global_vars=globals_fake,
proxied_state=AirflowProxiedState.from_dict(
{
"dag": {"tasks": [{"id": "task", "proxied": True}]},
}
),
)
assert any(
"Executing in local mode" in record.message and record.levelname == "INFO"
for record in caplog.records
)

assert VAR_DICT == {}


def test_proxying_to_dagster(mock_airflow_variable: None) -> None:
def test_proxying_to_dagster() -> None:
"""Test that we can proxy a set of dags, and as a result, operators are replaced and tags are added."""
globals_fake = build_dags_dict_given_structure(
{
Expand Down Expand Up @@ -71,16 +42,6 @@ def test_proxying_to_dagster(mock_airflow_variable: None) -> None:
assert isinstance(globals_fake["initially_not_proxied"].task_dict["task"], PythonOperator)
assert isinstance(globals_fake["should_be_ignored"].task_dict["task"], PythonOperator)

# Check the variable store
assert VAR_DICT == {
"task_is_proxied_dagster_proxied_state": json.dumps(
{"tasks": [{"id": "task", "proxied": True}]}
),
"initially_not_proxied_dagster_proxied_state": json.dumps(
{"tasks": [{"id": "task", "proxied": False}]}
),
}

# Change initially_not_proxied to be proxied
proxying_to_dagster(
global_vars=original_globals,
Expand Down Expand Up @@ -108,16 +69,6 @@ def test_proxying_to_dagster(mock_airflow_variable: None) -> None:
)
assert isinstance(original_globals["should_be_ignored"].task_dict["task"], PythonOperator)

# Check the variable store
assert VAR_DICT == {
"task_is_proxied_dagster_proxied_state": json.dumps(
{"tasks": [{"id": "task", "proxied": True}]}
),
"initially_not_proxied_dagster_proxied_state": json.dumps(
{"tasks": [{"id": "task", "proxied": True}]}
),
}


def test_proxying_to_dagster_no_dags() -> None:
"""Ensure that we error when no dags are found in the current context."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ endef
export MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home
export DAGSTER_AIRLIFT_PROXIED_STATE_DIR := $(MAKEFILE_DIR)/dbt_example/airflow_dags/proxied_state
export DBT_PROJECT_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt
export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt
export DAGSTER_URL := http://localhost:3333
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ endef
export MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home
export DAGSTER_AIRLIFT_PROXIED_STATE_DIR := $(MAKEFILE_DIR)/kitchen_sink/airflow_dags/proxied_state
export DAGSTER_URL := http://localhost:3333

help:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ endef

MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export DAGSTER_AIRLIFT_PROXIED_STATE_DIR := $(MAKEFILE_DIR)/perf_harness/airflow_dags/proxied_state
export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home
export DAGSTER_URL := http://localhost:3333

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export TUTORIAL_EXAMPLE_DIR := $(MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export AIRFLOW_HOME := $(MAKEFILE_DIR)/.airflow_home
export TUTORIAL_DBT_PROJECT_DIR := $(MAKEFILE_DIR)/tutorial_example/shared/dbt
export DAGSTER_AIRLIFT_PROXIED_STATE_DIR := $(MAKEFILE_DIR)/tutorial_example/airflow_dags/proxied_state
export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/tutorial_example/shared/dbt
export DAGSTER_URL := http://localhost:3000

Expand Down

0 comments on commit dde0981

Please sign in to comment.