diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index afc32e702..96f0d5564 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -116,6 +116,7 @@ jobs: - uses: actions/checkout@v3 with: ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/cache@v3 with: path: | @@ -139,6 +140,7 @@ jobs: hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration env: + AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0 AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres DATABRICKS_HOST: mock diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 86bfdbc74..a95bc2bdf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,6 +82,7 @@ repos: types-PyYAML, types-attrs, attrs, + types-pytz, types-requests, types-python-dateutil, apache-airflow, diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7757d5fb5..065209c99 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,83 @@ Changelog ========= +1.5.0a9 (2024-06-25) +-------------------- + +New Features + +* Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in #1014 +* Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe +* Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016 +* Add node config to TaskInstance Context by @linchun3 in #1044 + +Bug fixes + +* Fix disk permission error in restricted env by @pankajastro in #1051 +* Add CSP header to iframe contents by @dwreeves in #1055 +* Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in #1047 + +Enhancements + +* Support ``static_index.html`` docs by @dwreeves in #999 +* Support deep linking dbt docs via Airflow UI by @dwreeves in #1038 +* Add ability to specify host/port for Snowflake connection by @whummer in #1063 + +Others + +* Update documentation for DbtDocs generator by @arjunanan6 in #1043 +* Use uv in CI by @dwreeves in #1013 +* Cache hatch folder in the CI by @tatiana in #1056 +* Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in #1054 +* Mark plugin integration tests as integration by @tatiana in #1057 +* Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in #1049 +* Pre-commit hook updates in #1039, #1050, #1064 + + 1.4.3 (2024-06-07) ------------------ +------------------ + +Bug fixes + +* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033 + +Enhancements + +* Only run ``dbt deps`` when there are dependencies by @tatiana and @AlgirdasDubickas in #1030 + +Docs + +* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034 + + +v1.4.2 (2024-06-06) +------------------- + +Bug fixes + +* Fix the invocation mode for ``ExecutionMode.VIRTUALENV`` by @marco9663 in #1023 +* Fix Cosmos ``enable_cache`` setting by @tatiana in #1025 +* Make ``GoogleCloudServiceAccountDictProfileMapping`` dataset profile arg optional by @oliverrmaa and @pankajastro in #839 and #1017 +* Athena profile mapping set ``aws_session_token`` in profile only if it exists by @pankajastro in #1022 + +Others + +* Update dbt and Airflow conflicts matrix by @tatiana in #1026 +* Enable Python 3.12 unittest by @pankajastro in #1018 +* Improve error logging in ``DbtLocalBaseOperator`` by @davidsteinar in #1004 +* Add GitHub issue templates for bug reports and feature request by @pankajkoti in #1009 +* Add more fields in bug template to reduce turnaround in issue triaging by @pankajkoti in #1027 +* Fix ``dev/Dockerfile`` + Add ``uv pip install`` for faster build time by @dwreeves in #997 +* Drop support for Airflow 2.3 by @pankajkoti in #994 +* Update Astro Runtime image by @RNHTTR in #988 and #989 +* Enable ruff F linting by @pankajastro in #985 +* Move Cosmos Airflow configuration to settings.py by @pankajastro in #975 +* Fix CI Issues by @tatiana in #1005 +* Pre-commit hook updates in #1000, #1019 + + +1.4.1 (2024-05-17) +------------------ Bug fixes diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 555f97e06..ee860228a 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.4.3" +__version__ = "1.5.0a9" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/cache.py b/cosmos/cache.py index b101366a0..fd1dd53f4 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -1,11 +1,22 @@ from __future__ import annotations +import functools +import hashlib +import json +import os import shutil +import time +from collections import defaultdict +from datetime import datetime, timedelta, timezone from pathlib import Path import msgpack +from airflow.models import DagRun, Variable from airflow.models.dag import DAG +from airflow.utils.session import provide_session from airflow.utils.task_group import TaskGroup +from sqlalchemy import select +from sqlalchemy.orm import Session from cosmos import settings from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME @@ -13,6 +24,24 @@ from cosmos.log import get_logger logger = get_logger(__name__) +VAR_KEY_CACHE_PREFIX = "cosmos_cache__" + + +def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]: + dag_id = None + task_group_id = None + cosmos_type = "DbtDag" + + if task_group: + if task_group.dag_id is not None: + dag_id = task_group.dag_id + if task_group.group_id is not None: + task_group_id = task_group.group_id + cosmos_type = "DbtTaskGroup" + else: + dag_id = dag.dag_id + + return {"cosmos_type": cosmos_type, "dag_id": dag_id, "task_group_id": task_group_id} # It was considered to create a cache identifier based on the dbt project path, as opposed @@ -28,16 +57,21 @@ def _create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str: :param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached :return: Unique identifier representing the cache """ - if task_group: - if task_group.dag_id is not None: - cache_identifiers_list = [task_group.dag_id] - if task_group.group_id is not None: - cache_identifiers_list.extend([task_group.group_id.replace(".", "__")]) - cache_identifier = "__".join(cache_identifiers_list) - else: - cache_identifier = dag.dag_id + metadata = _get_airflow_metadata(dag, task_group) + cache_identifiers_list = [] + dag_id = metadata.get("dag_id") + task_group_id = metadata.get("task_group_id") + + if dag_id: + cache_identifiers_list.append(dag_id) + if task_group_id: + cache_identifiers_list.append(task_group_id.replace(".", "__")) - return cache_identifier + return "__".join(cache_identifiers_list) + + +def create_cache_key(cache_identifier: str) -> str: + return f"{VAR_KEY_CACHE_PREFIX}{cache_identifier}" def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path: @@ -171,3 +205,144 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P if source_manifest_filepath.exists(): shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath)) + + +def _create_folder_version_hash(dir_path: Path) -> str: + """ + Given a directory, iterate through its content and create a hash that will change in case the + contents of the directory change. The value should not change if the values of the directory do not change, even if + the command is run from different Airflow instances. + + This method output must be concise and it currently changes based on operating system. + """ + # This approach is less efficient than using modified time + # sum([path.stat().st_mtime for path in dir_path.glob("**/*")]) + # unfortunately, the modified time approach does not work well for dag-only deployments + # where DAGs are constantly synced to the deployed Airflow + # for 5k files, this seems to take 0.14 + hasher = hashlib.md5() + filepaths = [] + + for root_dir, dirs, files in os.walk(dir_path): + paths = [os.path.join(root_dir, filepath) for filepath in files] + filepaths.extend(paths) + + for filepath in sorted(filepaths): + with open(str(filepath), "rb") as fp: + buf = fp.read() + hasher.update(buf) + + return hasher.hexdigest() + + +def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str: + """ + Taking into account the project directory contents and the command arguments, calculate the + hash that represents the "dbt ls" command version - to be used to decide if the cache should be refreshed or not. + + :param cache_identifier: Unique identifier of the cache (may include DbtDag or DbtTaskGroup information) + :param project_path: Path to the target dbt project directory + :param cmd_args: List containing the arguments passed to the dbt ls command that would affect its output + """ + start_time = time.perf_counter() + + # Combined value for when the dbt project directory files were last modified + # This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder) + dbt_project_hash = _create_folder_version_hash(project_dir) + + # The performance for the following will depend on the user's configuration + hash_args = hashlib.md5("".join(cmd_args).encode()).hexdigest() + + elapsed_time = time.perf_counter() - start_time + logger.info( + f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}" + ) + return f"{dbt_project_hash},{hash_args}" + + +@functools.lru_cache +def was_project_modified(previous_version: str, current_version: str) -> bool: + """ + Given the cache version of a project and the latest version of the project, + decides if the project was modified or not. + """ + return previous_version != current_version + + +@provide_session +def delete_unused_dbt_ls_cache( + max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None +) -> int: + """ + Delete Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs. + + Example usage: + + There are three Cosmos cache Airflow Variables: + 1. ``cache cosmos_cache__basic_cosmos_dag`` + 2. ``cosmos_cache__basic_cosmos_task_group__orders`` + 3. ``cosmos_cache__basic_cosmos_task_group__customers`` + + The first relates to the ``DbtDag`` ``basic_cosmos_dag`` and the two last ones relate to the DAG + ``basic_cosmos_task_group`` that has two ``DbtTaskGroups``: ``orders`` and ``customers``. + + Let's assume the last DAG run of ``basic_cosmos_dag`` was a week ago and the last DAG run of + ``basic_cosmos_task_group`` was an hour ago. + + To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 5 days ago: + + ..code: python + >>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5)) + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag + + To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 minutes ago: + + ..code: python + >>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=10)) + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders + INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders + + To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago + + ..code: python + >>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=10)) + + In this last example, nothing is deleted. + """ + if session is None: + return 0 + + logger.info(f"Delete the Cosmos cache stored in Airflow Variables that hasn't been used for {max_age_last_usage}") + cosmos_dags_ids = defaultdict(list) + all_variables = session.scalars(select(Variable)).all() + total_cosmos_variables = 0 + deleted_cosmos_variables = 0 + + # Identify Cosmos-related cache in Airflow variables + for var in all_variables: + if var.key.startswith(VAR_KEY_CACHE_PREFIX): + var_value = json.loads(var.val) + cosmos_dags_ids[var_value["dag_id"]].append(var.key) + total_cosmos_variables += 1 + + # Delete DAGs that have not been run in the last X time + for dag_id, vars_keys in cosmos_dags_ids.items(): + last_dag_run = ( + session.query(DagRun) + .filter( + DagRun.dag_id == dag_id, + ) + .order_by(DagRun.execution_date.desc()) + .first() + ) + if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage): + for var_key in vars_keys: + logger.info(f"Removing the dbt ls cache {var_key}") + Variable.delete(var_key) + deleted_cosmos_variables += 1 + + logger.info( + f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. " + ) + return deleted_cosmos_variables diff --git a/cosmos/config.py b/cosmos/config.py index 13622563e..5ca21709d 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -71,6 +71,7 @@ class RenderConfig: dbt_ls_path: Path | None = None project_path: Path | None = field(init=False) enable_mock_profile: bool = True + airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: diff --git a/cosmos/constants.py b/cosmos/constants.py index 92bf883b2..2a1abb20e 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -36,6 +36,7 @@ class LoadMode(Enum): CUSTOM = "custom" DBT_LS = "dbt_ls" DBT_LS_FILE = "dbt_ls_file" + DBT_LS_CACHE = "dbt_ls_cache" DBT_MANIFEST = "dbt_manifest" diff --git a/cosmos/converter.py b/cosmos/converter.py index 5e415486e..40929ef55 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -5,6 +5,9 @@ import copy import inspect +import os +import platform +import time from typing import Any, Callable from warnings import warn @@ -225,19 +228,31 @@ def __init__( dbt_vars = project_config.dbt_vars or operator_args.get("vars") cache_dir = None + cache_identifier = None if settings.enable_cache: - cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache._create_cache_identifier(dag, task_group)) + cache_identifier = cache._create_cache_identifier(dag, task_group) + cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache_identifier) + previous_time = time.perf_counter() self.dbt_graph = DbtGraph( project=project_config, render_config=render_config, execution_config=execution_config, profile_config=profile_config, cache_dir=cache_dir, + cache_identifier=cache_identifier, dbt_vars=dbt_vars, + airflow_metadata=cache._get_airflow_metadata(dag, task_group), ) self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) + current_time = time.perf_counter() + elapsed_time = current_time - previous_time + logger.info( + f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to parse the dbt project for DAG using {self.dbt_graph.load_method}" + ) + previous_time = current_time + task_args = { **operator_args, "project_dir": execution_config.project_path, @@ -272,3 +287,9 @@ def __init__( on_warning_callback=on_warning_callback, render_config=render_config, ) + + current_time = time.perf_counter() + elapsed_time = current_time - previous_time + logger.info( + f"Cosmos performance ({cache_identifier}) - [{platform.node()}|{os.getpid()}]: It took {elapsed_time:.3}s to build the Airflow DAG." + ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index bd3181a20..8dba0a92e 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -1,17 +1,23 @@ from __future__ import annotations +import base64 +import datetime +import functools import itertools import json import os +import platform import tempfile +import zlib from dataclasses import dataclass, field +from functools import cached_property from pathlib import Path from subprocess import PIPE, Popen from typing import Any -import yaml +from airflow.models import Variable -from cosmos import cache +from cosmos import cache, settings from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, @@ -116,7 +122,7 @@ def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> return stdout -def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode]: +def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, DbtNode]: """Parses the output of `dbt ls` into a dictionary of `DbtNode` instances.""" nodes = {} for line in ls_stdout.split("\n"): @@ -148,6 +154,8 @@ class DbtGraph: nodes: dict[str, DbtNode] = dict() filtered_nodes: dict[str, DbtNode] = dict() + load_method: LoadMode = LoadMode.AUTOMATIC + current_version: str = "" def __init__( self, @@ -156,16 +164,135 @@ def __init__( execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, cache_dir: Path | None = None, - # dbt_vars only supported for LegacyDbtProject + cache_identifier: str = "", dbt_vars: dict[str, str] | None = None, + airflow_metadata: dict[str, str] | None = None, ): self.project = project self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config self.cache_dir = cache_dir + self.airflow_metadata = airflow_metadata or {} + if cache_identifier: + self.dbt_ls_cache_key = cache.create_cache_key(cache_identifier) + else: + self.dbt_ls_cache_key = "" self.dbt_vars = dbt_vars or {} + @cached_property + def env_vars(self) -> dict[str, str]: + """ + User-defined environment variables, relevant to running dbt ls. + """ + return self.render_config.env_vars or self.project.env_vars or {} + + @cached_property + def project_path(self) -> Path: + """ + Return the user-defined path to their dbt project. Tries to retrieve the configuration from render_config and + (legacy support) ExecutionConfig, where it was originally defined. + """ + # we're considering the execution_config only due to backwards compatibility + path = self.render_config.project_path or self.project.dbt_project_path or self.execution_config.project_path + if not path: + raise CosmosLoadDbtException( + "Unable to load project via dbt ls without RenderConfig.dbt_project_path, ProjectConfig.dbt_project_path or ExecutionConfig.dbt_project_path" + ) + return path.absolute() + + @cached_property + def dbt_ls_args(self) -> list[str]: + """ + Flags set while running dbt ls. This information is also used to define the dbt ls cache key. + """ + ls_args = [] + if self.render_config.exclude: + ls_args.extend(["--exclude", *self.render_config.exclude]) + + if self.render_config.select: + ls_args.extend(["--select", *self.render_config.select]) + + if self.project.dbt_vars: + ls_args.extend(["--vars", json.dumps(self.project.dbt_vars, sort_keys=True)]) + + if self.render_config.selector: + ls_args.extend(["--selector", self.render_config.selector]) + + if not self.project.partial_parse: + ls_args.append("--no-partial-parse") + + return ls_args + + @cached_property + def dbt_ls_cache_key_args(self) -> list[str]: + """ + Values that are used to represent the dbt ls cache key. If any parts are changed, the dbt ls command will be + executed and the new value will be stored. + """ + # if dbt deps, we can consider the md5 of the packages or deps file + cache_args = list(self.dbt_ls_args) + env_vars = self.env_vars + if env_vars: + envvars_str = json.dumps(env_vars, sort_keys=True) + cache_args.append(envvars_str) + if self.render_config.airflow_vars_to_purge_dbt_ls_cache: + for var_name in self.render_config.airflow_vars_to_purge_dbt_ls_cache: + airflow_vars = [var_name, Variable.get(var_name, "")] + cache_args.extend(airflow_vars) + + logger.debug(f"Value of `dbt_ls_cache_key_args` for <{self.dbt_ls_cache_key}>: {cache_args}") + return cache_args + + def save_dbt_ls_cache(self, dbt_ls_output: str) -> None: + """ + Store compressed dbt ls output into an Airflow Variable. + + Stores: + { + "version": "cache-version", + "dbt_ls_compressed": "compressed dbt ls output", + "last_modified": "Isoformat timestamp" + } + """ + # This compression reduces the dbt ls output to 10% of the original size + compressed_data = zlib.compress(dbt_ls_output.encode("utf-8")) + encoded_data = base64.b64encode(compressed_data) + dbt_ls_compressed = encoded_data.decode("utf-8") + cache_dict = { + "version": cache._calculate_dbt_ls_cache_current_version( + self.dbt_ls_cache_key, self.project_path, self.dbt_ls_cache_key_args + ), + "dbt_ls_compressed": dbt_ls_compressed, + "last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(), + **self.airflow_metadata, + } + Variable.set(self.dbt_ls_cache_key, cache_dict, serialize_json=True) + + def get_dbt_ls_cache(self) -> dict[str, str]: + """ + Retrieve previously saved dbt ls cache from an Airflow Variable, decompressing the dbt ls output. + + Outputs: + { + "version": "cache-version", + "dbt_ls": "uncompressed dbt ls output", + "last_modified": "Isoformat timestamp" + } + """ + cache_dict: dict[str, str] = {} + try: + cache_dict = Variable.get(self.dbt_ls_cache_key, deserialize_json=True) + except (json.decoder.JSONDecodeError, KeyError): + return cache_dict + else: + dbt_ls_compressed = cache_dict.pop("dbt_ls_compressed", None) + if dbt_ls_compressed: + encoded_data = base64.b64decode(dbt_ls_compressed.encode()) + cache_dict["dbt_ls"] = zlib.decompress(encoded_data).decode() + + return cache_dict + def load( self, method: LoadMode = LoadMode.AUTOMATIC, @@ -181,11 +308,11 @@ def load( Fundamentally, there are two different execution paths There is automatic, and manual. """ - load_method = { LoadMode.CUSTOM: self.load_via_custom_parser, LoadMode.DBT_LS: self.load_via_dbt_ls, LoadMode.DBT_LS_FILE: self.load_via_dbt_ls_file, + LoadMode.DBT_LS_CACHE: self.load_via_dbt_ls_cache, LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, } @@ -214,22 +341,9 @@ def run_dbt_ls( """Runs dbt ls command and returns the parsed nodes.""" ls_command = [dbt_cmd, "ls", "--output", "json"] - if self.render_config.exclude: - ls_command.extend(["--exclude", *self.render_config.exclude]) - - if self.render_config.select: - ls_command.extend(["--select", *self.render_config.select]) - - if self.project.dbt_vars: - ls_command.extend(["--vars", yaml.dump(self.project.dbt_vars)]) - - if self.render_config.selector: - ls_command.extend(["--selector", self.render_config.selector]) - - if not self.project.partial_parse: - ls_command.append("--no-partial-parse") - + ls_args = self.dbt_ls_args ls_command.extend(self.local_flags) + ls_command.extend(ls_args) stdout = run_command(ls_command, tmp_dir, env_vars) @@ -241,10 +355,56 @@ def run_dbt_ls( for line in logfile: logger.debug(line.strip()) + if self.should_use_dbt_ls_cache(): + self.save_dbt_ls_cache(stdout) + nodes = parse_dbt_ls_output(project_path, stdout) return nodes def load_via_dbt_ls(self) -> None: + """Retrieve the dbt ls cache if enabled and available or run dbt ls""" + if not self.load_via_dbt_ls_cache(): + self.load_via_dbt_ls_without_cache() + + @functools.lru_cache + def should_use_dbt_ls_cache(self) -> bool: + """Identify if Cosmos should use/store dbt ls cache or not.""" + return settings.enable_cache and settings.enable_cache_dbt_ls and bool(self.dbt_ls_cache_key) + + def load_via_dbt_ls_cache(self) -> bool: + """(Try to) load dbt ls cache from an Airflow Variable""" + + logger.info(f"Trying to parse the dbt project using dbt ls cache {self.dbt_ls_cache_key}...") + if self.should_use_dbt_ls_cache(): + project_path = self.project_path + + cache_dict = self.get_dbt_ls_cache() + if not cache_dict: + logger.info(f"Cosmos performance: Cache miss for {self.dbt_ls_cache_key}") + return False + + cache_version = cache_dict.get("version") + dbt_ls_cache = cache_dict.get("dbt_ls") + + current_version = cache._calculate_dbt_ls_cache_current_version( + self.dbt_ls_cache_key, project_path, self.dbt_ls_cache_key_args + ) + + if dbt_ls_cache and not cache.was_project_modified(cache_version, current_version): + logger.info( + f"Cosmos performance [{platform.node()}|{os.getpid()}]: The cache size for {self.dbt_ls_cache_key} is {len(dbt_ls_cache)}" + ) + self.load_method = LoadMode.DBT_LS_CACHE + + nodes = parse_dbt_ls_output(project_path=project_path, ls_stdout=dbt_ls_cache) + self.nodes = nodes + self.filtered_nodes = nodes + logger.info(f"Cosmos performance: Cache hit for {self.dbt_ls_cache_key} - {current_version}") + return True + logger.info(f"Cosmos performance: Cache miss for {self.dbt_ls_cache_key} - skipped") + return False + + def load_via_dbt_ls_without_cache(self) -> None: """ This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command line for both parsing and filtering the nodes. @@ -253,37 +413,33 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ + self.load_method = LoadMode.DBT_LS self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) dbt_cmd = self.render_config.dbt_executable_path dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") - if not self.render_config.project_path or not self.execution_config.project_path: - raise CosmosLoadDbtException( - "Unable to load project via dbt ls without RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path" - ) - + project_path = self.project_path if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") with tempfile.TemporaryDirectory() as tmpdir: - logger.info( + logger.debug( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" ) tmpdir_path = Path(tmpdir) - abs_project_path = self.render_config.project_path.absolute() - create_symlinks(abs_project_path, tmpdir_path, self.render_config.dbt_deps) + create_symlinks(project_path, tmpdir_path, self.render_config.dbt_deps) if self.project.partial_parse and self.cache_dir: - latest_partial_parse = cache._get_latest_partial_parse(abs_project_path, self.cache_dir) + latest_partial_parse = cache._get_latest_partial_parse(project_path, self.cache_dir) logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse) if latest_partial_parse is not None: cache._copy_partial_parse_to_project(latest_partial_parse, tmpdir_path) with self.profile_config.ensure_profile( use_mock_values=self.render_config.enable_mock_profile - ) as profile_values, environ(self.project.env_vars or self.render_config.env_vars or {}): + ) as profile_values, environ(self.env_vars): (profile_path, env_vars) = profile_values env = os.environ.copy() env.update(env_vars) @@ -303,15 +459,13 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.render_config.dbt_deps and has_non_empty_dependencies_file( - Path(self.render_config.project_path) - ): + if self.render_config.dbt_deps and has_non_empty_dependencies_file(self.project_path): deps_command = [dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(dbt_cmd, self.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes @@ -330,6 +484,7 @@ def load_via_dbt_ls_file(self) -> None: This technically should increase performance and also removes the necessity to have your whole dbt project copied to the airflow image. """ + self.load_method = LoadMode.DBT_LS_FILE logger.info("Trying to parse the dbt project `%s` using a dbt ls output file...", self.project.project_name) if not self.render_config.is_dbt_ls_file_available(): @@ -357,6 +512,7 @@ def load_via_custom_parser(self) -> None: * self.nodes * self.filtered_nodes """ + self.load_method = LoadMode.CUSTOM logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) if self.render_config.selector: @@ -415,6 +571,7 @@ def load_from_dbt_manifest(self) -> None: * self.nodes * self.filtered_nodes """ + self.load_method = LoadMode.DBT_MANIFEST logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.project_name) if self.render_config.selector: diff --git a/cosmos/settings.py b/cosmos/settings.py index fc5954131..68ed8758f 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -11,6 +11,8 @@ DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True) +enable_cache_partial_parse = conf.getboolean("cosmos", "enable_cache_partial_parse", fallback=True) +enable_cache_dbt_ls = conf.getboolean("cosmos", "enable_cache_dbt_ls", fallback=True) propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 4221e3019..d63cf2c92 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -43,10 +43,13 @@ def basic_cosmos_task_group() -> None: customers = DbtTaskGroup( group_id="customers", - project_config=ProjectConfig( - (DBT_ROOT_PATH / "jaffle_shop").as_posix(), + project_config=ProjectConfig((DBT_ROOT_PATH / "jaffle_shop").as_posix(), dbt_vars={"var": "2"}), + render_config=RenderConfig( + select=["path:seeds/raw_customers.csv"], + enable_mock_profile=False, + env_vars={"PURGE": os.getenv("PURGE", "0")}, + airflow_vars_to_purge_dbt_ls_cache=["purge"], ), - render_config=RenderConfig(select=["path:seeds/raw_customers.csv"], enable_mock_profile=False), execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, diff --git a/dev/dags/example_cosmos_cleanup_dag.py b/dev/dags/example_cosmos_cleanup_dag.py new file mode 100644 index 000000000..c93bdf002 --- /dev/null +++ b/dev/dags/example_cosmos_cleanup_dag.py @@ -0,0 +1,34 @@ +""" +Example of cleanup DAG that can be used to clear cache originated from running the dbt ls command while +parsing the DbtDag or DbtTaskGroup since Cosmos 1.5. +""" + +# [START cache_example] +from datetime import datetime, timedelta + +from airflow.decorators import dag, task + +from cosmos.cache import delete_unused_dbt_ls_cache + + +@dag( + schedule_interval="0 0 * * 0", # Runs every Sunday + start_date=datetime(2023, 1, 1), + catchup=False, + tags=["example"], +) +def example_cosmos_cleanup_dag(): + + @task() + def clear_db_ls_cache(session=None): + """ + Delete the dbt ls cache that has not been used for the last five days. + """ + delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5)) + + clear_db_ls_cache() + + +# [END cache_example] + +example_cosmos_cleanup_dag() diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst new file mode 100644 index 000000000..b5ec155da --- /dev/null +++ b/docs/configuration/caching.rst @@ -0,0 +1,118 @@ +.. _caching: + +Caching +======= + +This page explains the caching strategies in ``astronomer-cosmos`` Astronomer Cosmos behavior. + +All Cosmos caching mechanisms can be enabled or turned off in the ``airflow.cfg`` file or using environment variables. + +.. note:: + For more information, see `configuring a Cosmos project <./project-config.html>`_. + +Depending on the Cosmos version, it creates a cache for two types of data: + +- The ``dbt ls`` output +- The dbt ``partial_parse.msgpack`` file + +It is possible to turn off any cache in Cosmos by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE=0``. +Disabling individual types of cache in Cosmos is also possible, as explained below. + +Caching the dbt ls output +~~~~~~~~~~~~~ + +(Introduced in Cosmos 1.5) + +While parsing a dbt project using `LoadMode.DBT_LS <./parsing-methods.html#dbt-ls>`_, Cosmos uses subprocess to run ``dbt ls``. +This operation can be very costly; it can increase the DAG parsing times and affect not only the scheduler DAG processing but +also the tasks queueing time. + +Cosmos 1.5 introduced a feature to mitigate the performance issue associated with ``LoadMode.DBT_LS`` by caching the output +of this command as an `Airflow Variable `_. +Based on an initial `analysis `_, enabling this setting reduced some DAGs ask queueing from 30s to 0s. Additionally, some users `reported improvements of 84% `_ in the DAG run time. + +This feature is on by default. To turn it off, export the following environment variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0``. + +**How the cache is refreshed** + +Users can purge or delete the cache via Airflow UI by identifying and deleting the cache key. + +Cosmos will refresh the cache in a few circumstances: + +* if any files of the dbt project change +* if one of the arguments that affect the dbt ls command execution changes + +To evaluate if the dbt project changed, it calculates the changes using a few of the MD5 of all the files in the directory. + +Additionally, if any of the following DAG configurations are changed, we'll automatically purge the cache of the DAGs that use that specific configuration: + +* ``ProjectConfig.dbt_vars`` +* ``ProjectConfig.env_vars`` +* ``ProjectConfig.partial_parse`` +* ``RenderConfig.env_vars`` +* ``RenderConfig.exclude`` +* ``RenderConfig.select`` +* ``RenderConfig.selector`` + +Finally, if users would like to define specific Airflow variables that, if changed, will cause the recreation of the cache, they can specify those by using: + +* ``RenderConfig.airflow_vars_to_purge_cache`` + +Example: + +.. code-block:: python + + RenderConfig(airflow_vars_to_purge_cache == ["refresh_cache"]) + +**Cleaning up stale cache** + +Not rarely, Cosmos DbtDags and DbtTaskGroups may be renamed or deleted. In those cases, to clean up the Airflow metadata database, it is possible to use the method ``delete_unused_dbt_ls_cache``. + +The method deletes the Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs. + +As an example, the following clean-up DAG will delete any cache associated with Cosmos that has not been used for the last five days: + +.. literalinclude:: ../../dev/dags/example_cosmos_cleanup_dag.py + :language: python + :start-after: [START cache_example] + :end-before: [END cache_example] + +**Cache key** + +The Airflow variables that represent the dbt ls cache are prefixed by ``cosmos_cache``. +When using ``DbtDag``, the keys use the DAG name. When using ``DbtTaskGroup``, they contain the ``TaskGroup`` and parent task groups and DAG. + +Examples: + +* The ``DbtDag`` "cosmos_dag" will have the cache represented by "cosmos_cache__basic_cosmos_dag". +* The ``DbtTaskGroup`` "customers" declared inside the DAG "basic_cosmos_task_group" will have the cache key "cosmos_cache__basic_cosmos_task_group__customers". + +**Cache value** + +The cache values contain a few properties: + +* ``last_modified`` timestamp, represented using the ISO 8601 format. +* ``version`` is a hash that represents the version of the dbt project and arguments used to run dbt ls by the time Cosmos created the cache +* ``dbt_ls_compressed`` represents the dbt ls output compressed using zlib and encoded to base64 so Cosmos can record the value as a compressed string in the Airflow metadata database. +* ``dag_id`` is the DAG associated to this cache +* ``task_group_id`` is the TaskGroup associated to this cache +* ``cosmos_type`` is either ``DbtDag`` or ``DbtTaskGroup`` + + +Caching the partial parse file +~~~~~~~~~~~~~ + +(Introduced in Cosmos 1.4) + +After parsing the dbt project, dbt stores an internal project manifest in a file called ``partial_parse.msgpack`` (`official docs `_). +This file contributes significantly to the performance of running dbt commands when the dbt project did not change. + +Cosmos 1.4 introduced `support to partial parse files `_ both +provided by the user, and also by storing in the disk temporary folder in the Airflow scheduler and worker node the file +generated after running dbt commands. + +Users can customize where to store the cache using the setting ``AIRFLOW__COSMOS__CACHE_DIR``. + +It is possible to switch off this feature by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE=0``. + +For more information, read the `Cosmos partial parsing documentation <./partial-parsing.html>`_ diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 1d334884f..9c1b56c89 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -30,6 +30,22 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``True`` - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE`` +.. enable_cache_dbt_ls: + +`enable_cache_dbt_ls`_: + Enable or disable caching of the dbt ls command in case using ``LoadMode.DBT_LS`` in an Airflow Variable. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS`` + +.. _enable_cache_partial_parse: + +`enable_cache_partial_parse`_: + Enable or disable caching of dbt partial parse files in the local disk. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE`` + .. _propagate_logs: `propagate_logs`_: diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index fc34b993e..90f195938 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -25,3 +25,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m Operator Args Compiled SQL Logging + Caching diff --git a/docs/configuration/parsing-methods.rst b/docs/configuration/parsing-methods.rst index 14dafb021..ebd6030e6 100644 --- a/docs/configuration/parsing-methods.rst +++ b/docs/configuration/parsing-methods.rst @@ -66,6 +66,9 @@ If you don't have a ``manifest.json`` file, Cosmos will attempt to generate one When Cosmos runs ``dbt ls``, it also passes your ``select`` and ``exclude`` arguments to the command. This means that Cosmos will only generate a manifest for the models you want to run. +Starting in Cosmos 1.5, Cosmos will cache the output of the ``dbt ls`` command, to improve the performance of this +parsing method. Learn more `here <./caching.html>`_. + To use this: .. code-block:: python diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index f3e216712..4b2535e07 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -17,6 +17,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` +- ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index c8bee0b20..6c518613b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,7 @@ tests = [ "pytest-cov", "pytest-describe", "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html + "types-pytz", "types-requests", "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html "pre-commit", @@ -136,7 +137,7 @@ dependencies = [ "types-requests", "types-python-dateutil", "Werkzeug<3.0.0", - "apache-airflow=={matrix:airflow}.0", + "apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670 ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index eba4f1513..c6e106fd5 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -1,6 +1,14 @@ +#!/bin/bash + +set -v +set -x +set -e + # we install using the following workaround to overcome installation conflicts, such as: # apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies -pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; \ -rm -rf airflow.*; \ -airflow db init; \ +pip uninstall -y dbt-postgres dbt-databricks dbt-vertica +rm -rf airflow.* +pip freeze | grep airflow +airflow db reset -y +airflow db init pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index 823f70a7e..1d8264768 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -1,3 +1,15 @@ +#!/bin/bash + +set -x +set -e + +pip freeze | grep airflow +echo $AIRFLOW_HOME +ls $AIRFLOW_HOME + +airflow db check + + rm -rf dbt/jaffle_shop/dbt_packages; pytest -vv \ --cov=cosmos \ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 652a81482..9e931ba8c 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1,12 +1,17 @@ +import importlib +import os import shutil +import sys import tempfile +from datetime import datetime from pathlib import Path from subprocess import PIPE, Popen from unittest.mock import MagicMock, patch import pytest -import yaml +from airflow.models import Variable +from cosmos import settings from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( @@ -587,7 +592,7 @@ def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): ) with pytest.raises(CosmosLoadDbtException) as err_info: - dbt_graph.load_via_dbt_ls() + dbt_graph.load_via_dbt_ls_without_cache() expected = "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." assert err_info.value.args[0] == expected @@ -658,12 +663,12 @@ def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_p (tmp_path / DBT_TARGET_DIR_NAME).mkdir(parents=True, exist_ok=True) # First time dbt ls is run, partial parsing was not cached, so we don't benefit from this - dbt_graph.load_via_dbt_ls() + dbt_graph.load_via_dbt_ls_without_cache() assert "Unable to do partial parsing" in caplog.text # From the second time we run dbt ls onwards, we benefit from partial parsing caplog.clear() - dbt_graph.load_via_dbt_ls() # should not not raise exception + dbt_graph.load_via_dbt_ls_without_cache() # should not not raise exception assert not "Unable to do partial parsing" in caplog.text @@ -978,10 +983,13 @@ def test_parse_dbt_ls_output_with_json_without_tags_or_config(): assert expected_nodes == nodes +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") -def test_load_via_dbt_ls_project_config_env_vars(mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir): +def test_load_via_dbt_ls_project_config_env_vars( + mock_validate, mock_update_nodes, mock_popen, mock_enable_cache, tmp_dbt_project_dir +): """Tests that the dbt ls command in the subprocess has the project config env vars set.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 @@ -1006,10 +1014,13 @@ def test_load_via_dbt_ls_project_config_env_vars(mock_validate, mock_update_node assert mock_popen.call_args.kwargs["env"]["MY_ENV_VAR"] == "my_value" +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") -def test_load_via_dbt_ls_project_config_dbt_vars(mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir): +def test_load_via_dbt_ls_project_config_dbt_vars( + mock_validate, mock_update_nodes, mock_popen, mock_use_case, tmp_dbt_project_dir +): """Tests that the dbt ls command in the subprocess has "--vars" with the project config dbt_vars.""" mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 @@ -1031,14 +1042,15 @@ def test_load_via_dbt_ls_project_config_dbt_vars(mock_validate, mock_update_node dbt_graph.load_via_dbt_ls() ls_command = mock_popen.call_args.args[0] assert "--vars" in ls_command - assert ls_command[ls_command.index("--vars") + 1] == yaml.dump(dbt_vars) + assert ls_command[ls_command.index("--vars") + 1] == '{"my_var1": "my_value1", "my_var2": "my_value2"}' +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") def test_load_via_dbt_ls_render_config_selector_arg_is_used( - mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir + mock_validate, mock_update_nodes, mock_popen, mock_enable_cache, tmp_dbt_project_dir ): """Tests that the dbt ls command in the subprocess has "--selector" with the RenderConfig.selector.""" mock_popen().communicate.return_value = ("", "") @@ -1068,11 +1080,12 @@ def test_load_via_dbt_ls_render_config_selector_arg_is_used( assert ls_command[ls_command.index("--selector") + 1] == selector +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) @patch("cosmos.dbt.graph.Popen") @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") @patch("cosmos.config.RenderConfig.validate_dbt_command") def test_load_via_dbt_ls_render_config_no_partial_parse( - mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir + mock_validate, mock_update_nodes, mock_popen, mock_enable_cache, tmp_dbt_project_dir ): """Tests that --no-partial-parse appears when partial_parse=False.""" mock_popen().communicate.return_value = ("", "") @@ -1180,3 +1193,219 @@ def test_load_via_dbt_ls_with_selector_arg(tmp_dbt_project_dir, postgres_profile assert "seed.jaffle_shop.raw_customers" in filtered_nodes # Two tests should be filtered assert sum(node.startswith("test.jaffle_shop") for node in filtered_nodes) == 2 + + +@pytest.mark.parametrize( + "render_config,project_config,expected_envvars", + [ + (RenderConfig(), ProjectConfig(), {}), + (RenderConfig(env_vars={"a": 1}), ProjectConfig(), {"a": 1}), + (RenderConfig(), ProjectConfig(env_vars={"b": 2}), {"b": 2}), + (RenderConfig(env_vars={"a": 1}), ProjectConfig(env_vars={"b": 2}), {"a": 1}), + ], +) +def test_env_vars(render_config, project_config, expected_envvars): + graph = DbtGraph( + project=project_config, + render_config=render_config, + ) + assert graph.env_vars == expected_envvars + + +def test_project_path_fails(): + graph = DbtGraph(project=ProjectConfig()) + with pytest.raises(CosmosLoadDbtException) as e: + graph.project_path + + expected = "Unable to load project via dbt ls without RenderConfig.dbt_project_path, ProjectConfig.dbt_project_path or ExecutionConfig.dbt_project_path" + assert e.value.args[0] == expected + + +@pytest.mark.parametrize( + "render_config,project_config,expected_dbt_ls_args", + [ + (RenderConfig(), ProjectConfig(), []), + (RenderConfig(exclude=["package:snowplow"]), ProjectConfig(), ["--exclude", "package:snowplow"]), + ( + RenderConfig(select=["tag:prod", "config.materialized:incremental"]), + ProjectConfig(), + ["--select", "tag:prod", "config.materialized:incremental"], + ), + (RenderConfig(selector="nightly"), ProjectConfig(), ["--selector", "nightly"]), + (RenderConfig(), ProjectConfig(dbt_vars={"a": 1}), ["--vars", '{"a": 1}']), + (RenderConfig(), ProjectConfig(partial_parse=False), ["--no-partial-parse"]), + ( + RenderConfig(exclude=["1", "2"], select=["a", "b"], selector="nightly"), + ProjectConfig(dbt_vars={"a": 1}, partial_parse=False), + [ + "--exclude", + "1", + "2", + "--select", + "a", + "b", + "--vars", + '{"a": 1}', + "--selector", + "nightly", + "--no-partial-parse", + ], + ), + ], +) +def test_dbt_ls_args(render_config, project_config, expected_dbt_ls_args): + graph = DbtGraph( + project=project_config, + render_config=render_config, + ) + assert graph.dbt_ls_args == expected_dbt_ls_args + + +def test_dbt_ls_cache_key_args_sorts_envvars(): + project_config = ProjectConfig(env_vars={11: "November", 12: "December", 5: "May"}) + graph = DbtGraph(project=project_config) + assert graph.dbt_ls_cache_key_args == ['{"5": "May", "11": "November", "12": "December"}'] + + +@pytest.fixture() +def airflow_variable(): + key = "cosmos_cache__undefined" + value = "some_value" + Variable.set(key, value) + + yield key, value + + Variable.delete(key) + + +@pytest.mark.integration +def test_dbt_ls_cache_key_args_uses_airflow_vars_to_purge_dbt_ls_cache(airflow_variable): + key, value = airflow_variable + graph = DbtGraph(project=ProjectConfig(), render_config=RenderConfig(airflow_vars_to_purge_dbt_ls_cache=[key])) + assert graph.dbt_ls_cache_key_args == [key, value] + + +@patch("cosmos.dbt.graph.datetime") +@patch("cosmos.dbt.graph.Variable.set") +def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir): + mock_datetime.datetime.now.return_value = datetime(2022, 1, 1, 12, 0, 0) + graph = DbtGraph(cache_identifier="something", project=ProjectConfig(dbt_project_path=tmp_dbt_project_dir)) + dbt_ls_output = "some output" + graph.save_dbt_ls_cache(dbt_ls_output) + assert mock_variable_set.call_args[0][0] == "cosmos_cache__something" + assert mock_variable_set.call_args[0][1]["dbt_ls_compressed"] == "eJwrzs9NVcgvLSkoLQEAGpAEhg==" + assert mock_variable_set.call_args[0][1]["last_modified"] == "2022-01-01T12:00:00" + version = mock_variable_set.call_args[0][1].get("version") + hash_dir, hash_args = version.split(",") + assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" + if sys.platform == "darwin": + assert hash_dir == "cdc6f0bec00f4edc616f3aa755a34330" + else: + assert hash_dir == "77d08d6da374330ac1b49438ff2873f7" + + +@pytest.mark.integration +def test_get_dbt_ls_cache_returns_empty_if_non_json_var(airflow_variable): + graph = DbtGraph(project=ProjectConfig()) + assert graph.get_dbt_ls_cache() == {} + + +@patch("cosmos.dbt.graph.Variable.get", return_value={"dbt_ls_compressed": "eJwrzs9NVcgvLSkoLQEAGpAEhg=="}) +def test_get_dbt_ls_cache_returns_decoded_and_decompressed_value(mock_variable_get): + graph = DbtGraph(project=ProjectConfig()) + assert graph.get_dbt_ls_cache() == {"dbt_ls": "some output"} + + +@patch("cosmos.dbt.graph.Variable.get", return_value={}) +def test_get_dbt_ls_cache_returns_empty_dict_if_empty_dict_var(mock_variable_get): + graph = DbtGraph(project=ProjectConfig()) + assert graph.get_dbt_ls_cache() == {} + + +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_without_cache") +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_does_not_call_without_cache(mock_cache, mock_without_cache): + graph = DbtGraph(project=ProjectConfig()) + graph.load_via_dbt_ls() + assert mock_cache.called + assert not mock_without_cache.called + + +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_without_cache") +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls_cache", return_value=False) +def test_load_via_dbt_ls_calls_without_cache(mock_cache, mock_without_cache): + graph = DbtGraph(project=ProjectConfig()) + graph.load_via_dbt_ls() + assert mock_cache.called + assert mock_without_cache.called + + +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=False) +def test_load_via_dbt_ls_cache_is_false_if_disabled(mock_should_use_dbt_ls_cache): + graph = DbtGraph(project=ProjectConfig()) + assert not graph.load_via_dbt_ls_cache() + assert mock_should_use_dbt_ls_cache.called + + +@patch("cosmos.dbt.graph.DbtGraph.get_dbt_ls_cache", return_value={}) +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_cache_is_false_if_no_cache(mock_should_use_dbt_ls_cache, mock_get_dbt_ls_cache): + graph = DbtGraph(project=ProjectConfig(dbt_project_path="/tmp")) + assert not graph.load_via_dbt_ls_cache() + assert mock_should_use_dbt_ls_cache.called + assert mock_get_dbt_ls_cache.called + + +@patch("cosmos.dbt.graph.cache._calculate_dbt_ls_cache_current_version", return_value=1) +@patch("cosmos.dbt.graph.DbtGraph.get_dbt_ls_cache", return_value={"version": 2, "dbt_ls": "output"}) +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_cache_is_false_if_cache_is_outdated( + mock_should_use_dbt_ls_cache, mock_get_dbt_ls_cache, mock_calculate_current_version +): + graph = DbtGraph(project=ProjectConfig(dbt_project_path="/tmp")) + assert not graph.load_via_dbt_ls_cache() + assert mock_should_use_dbt_ls_cache.called + assert mock_get_dbt_ls_cache.called + assert mock_calculate_current_version.called + + +@patch("cosmos.dbt.graph.parse_dbt_ls_output", return_value={"some-node": {}}) +@patch("cosmos.dbt.graph.cache._calculate_dbt_ls_cache_current_version", return_value=1) +@patch("cosmos.dbt.graph.DbtGraph.get_dbt_ls_cache", return_value={"version": 1, "dbt_ls": "output"}) +@patch("cosmos.dbt.graph.DbtGraph.should_use_dbt_ls_cache", return_value=True) +def test_load_via_dbt_ls_cache_is_true( + mock_should_use_dbt_ls_cache, mock_get_dbt_ls_cache, mock_calculate_current_version, mock_parse_dbt_ls_output +): + graph = DbtGraph(project=ProjectConfig(dbt_project_path="/tmp")) + assert graph.load_via_dbt_ls_cache() + assert graph.load_method == LoadMode.DBT_LS_CACHE + assert graph.nodes == {"some-node": {}} + assert graph.filtered_nodes == {"some-node": {}} + assert mock_should_use_dbt_ls_cache.called + assert mock_get_dbt_ls_cache.called + assert mock_calculate_current_version.called + assert mock_parse_dbt_ls_output.called + + +@pytest.mark.parametrize( + "enable_cache,enable_cache_dbt_ls,cache_id,should_use", + [ + (False, True, "id", False), + (True, False, "id", False), + (False, False, "id", False), + (True, True, "", False), + (True, True, "id", True), + ], +) +def test_should_use_dbt_ls_cache(enable_cache, enable_cache_dbt_ls, cache_id, should_use): + with patch.dict( + os.environ, + { + "AIRFLOW__COSMOS__ENABLE_CACHE": str(enable_cache), + "AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS": str(enable_cache_dbt_ls), + }, + ): + importlib.reload(settings) + graph = DbtGraph(cache_identifier=cache_id, project=ProjectConfig(dbt_project_path="/tmp")) + graph.should_use_dbt_ls_cache.cache_clear() + assert graph.should_use_dbt_ls_cache() == should_use diff --git a/tests/test_cache.py b/tests/test_cache.py index 7d6a2d36c..9cd216998 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -2,12 +2,14 @@ import shutil import tempfile import time -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import call, patch import pytest from airflow import DAG +from airflow.models import DagRun, Variable +from airflow.utils.db import create_session from airflow.utils.task_group import TaskGroup from cosmos.cache import ( @@ -15,6 +17,7 @@ _create_cache_identifier, _get_latest_partial_parse, _update_partial_parse_cache, + delete_unused_dbt_ls_cache, ) from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME @@ -112,3 +115,70 @@ def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile): call(str(latest_partial_parse_filepath.parent / "manifest.json"), str(manifest_path)), ] mock_copyfile.assert_has_calls(calls) + + +@pytest.fixture +def vars_session(): + with create_session() as session: + var1 = Variable(key="cosmos_cache__dag_a", val='{"dag_id": "dag_a"}') + var2 = Variable(key="cosmos_cache__dag_b", val='{"dag_id": "dag_b"}') + var3 = Variable(key="cosmos_cache__dag_c__task_group_1", val='{"dag_id": "dag_c"}') + + dag_run_a = DagRun( + dag_id="dag_a", + run_id="dag_a_run_a_week_ago", + execution_date=datetime.now(timezone.utc) - timedelta(days=7), + state="success", + run_type="manual", + ) + dag_run_b = DagRun( + dag_id="dag_b", + run_id="dag_b_run_yesterday", + execution_date=datetime.now(timezone.utc) - timedelta(days=1), + state="failed", + run_type="manual", + ) + dag_run_c = DagRun( + dag_id="dag_c", + run_id="dag_c_run_on_hour_ago", + execution_date=datetime.now(timezone.utc) - timedelta(hours=1), + state="running", + run_type="manual", + ) + + session.add(var1) + session.add(var2) + session.add(var3) + session.add(dag_run_a) + session.add(dag_run_b) + session.add(dag_run_c) + session.commit() + + yield session + + session.query(Variable).filter_by(key="cosmos_cache__dag_a").delete() + session.query(Variable).filter_by(key="cosmos_cache__dag_b").delete() + session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").delete() + + session.query(DagRun).filter_by(dag_id="dag_a", run_id="dag_a_run_a_week_ago").delete() + session.query(DagRun).filter_by(dag_id="dag_b", run_id="dag_b_run_yesterday").delete() + session.query(DagRun).filter_by(dag_id="dag_c", run_id="dag_c_run_on_hour_ago").delete() + session.commit() + + +@pytest.mark.integration +def test_delete_unused_dbt_ls_cache_deletes_a_week_ago_cache(vars_session): + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + assert delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5), session=vars_session) == 1 + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + + +@pytest.mark.integration +def test_delete_unused_dbt_ls_cache_deletes_all_cache_five_minutes_ago(vars_session): + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_b").first() + assert vars_session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").first() + assert delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=5), session=vars_session) == 3 + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_b").first() + assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").first()