Skip to content

Commit

Permalink
Speed up LoadMode.DBT_LS by caching dbt ls output in Airflow Variab…
Browse files Browse the repository at this point in the history
…le (#1014)

Improve significantly the `LoadMode.DBT_LS` performance. The example
DAGs tested reduced the task queueing time significantly (from ~30s to
~0.5s) and the total DAG run time for Jaffle Shop from 1 min 25s to 40s
(by more than 50%). Some users[ reported improvements of
84%](#1014 (comment))
in the DAG run time when trying out these changes. This difference can
be even more significant on larger dbt projects.

The improvement was accomplished by caching the dbt ls output as an
Airflow Variable. This is an alternative to #992, when we cached the
pickled DAG/TaskGroup into a local file in the Airflow node. Unlike
#992, this approach works well for distributed deployments of Airflow.

As with any caching solution, this strategy does not guarantee optimal
performance on every run—whenever the cache is regenerated, the
scheduler or DAG processor will experience a delay. It was also observed
that the key value could change across platforms (e.g., `Darwin` and
`Linux`). Therefore, if using a deployment with heterogeneous OS, the
key may be regenerated often.

Closes: #990
Closes: #1061

**Enabling/disabling this feature**

This feature is enabled by default.
Users can disable it by setting the environment variable
`AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS=0`.

**How the cache is refreshed**

Users can purge or delete the cache via Airflow UI by identifying and
deleting the cache key.

The cache will be automatically refreshed in case any files of the dbt
project change. Changes are calculated using the SHA256 of all the files
in the directory. Initially, this feature was implemented using the
files' modified timestamp, but this did not work well for some Airflow
deployments (e.g., `astro --dags` since the timestamp was changed during
deployments).

Additionally, if any of the following DAG configurations are changed,
we'll automatically purge the cache of the DAGs that use that specific
configuration:
* `ProjectConfig.dbt_vars`
* `ProjectConfig.env_vars`
* `ProjectConfig.partial_parse`
* `RenderConfig.env_vars`
* `RenderConfig.exclude`
* `RenderConfig.select`
* `RenderConfig.selector`

The following argument was introduced in case users would like to define
Airflow variables that could be used to refresh the cache (it expects a
list with Airflow variable names):
* `RenderConfig.airflow_vars_to_purge_cache`

Example:
```
RenderConfig(
    airflow_vars_to_purge_cache==["refresh_cache"]
)
```

**Cache key**

The Airflow variables that represent the dbt ls cache are prefixed by
`cosmos_cache`. When using `DbtDag`, the keys use the DAG name. When
using `DbtTaskGroup`, they consider the TaskGroup and parent task groups
and DAG.

Examples:
1. The `DbtDag` "cosmos_dag" will have the cache represented by
`"cosmos_cache__basic_cosmos_dag"`.
2. The `DbtTaskGroup` "customers" declared inside teh DAG
"basic_cosmos_task_group" will have the cache key
`"cosmos_cache__basic_cosmos_task_group__customers"`.

**Cache value**

The cache values contain a few properties:
- `last_modified` timestamp, represented using the ISO 8601 format.
- `version` is a hash that represents the version of the dbt project and
arguments used to run dbt ls by the time the cache was created
- `dbt_ls_compressed` represents the dbt ls output compressed using zlib
and encoded to base64 to be recorded as a string to the Airflow metadata
database.

Steps used to compress:
```
        compressed_data = zlib.compress(dbt_ls_output.encode("utf-8"))
        encoded_data = base64.b64encode(compressed_data)
        dbt_ls_compressed = encoded_data.decode("utf-8")
```

We are compressing this value because it will be significant for larger
dbt projects, depending on the selectors used, and we wanted this
approach to be safe and not clutter the Airflow metadata database.

Some numbers on the compression
* A dbt project with 100 models can lead to a dbt ls output of 257k
characters when using JSON. Zlib could compress it by 20x.
* Another [real-life dbt
project](https://gitlab.com/gitlab-data/analytics/-/tree/master/transform/snowflake-dbt?ref_type=heads)
with 9,285 models led to a dbt ls output of 8.4 MB, uncompressed. It
reduces to 489 KB after being compressed using `zlib` and encoded using
`base64` - to 6% of the original size.
* Maximum cell size in Postgres: 20MB

The latency used to compress is in the order of milliseconds, not
interfering in the performance of this solution.

**Future work**

* How this will affect the Airflow db in the long term
* How does this performance compare to `ObjectStorage`?

**Example of results before and after this change**

Task queue times in Astro before the change:
<img width="1488" alt="Screenshot 2024-06-03 at 11 15 26"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/20f6ae8f-02e0-4974-b445-740925ab1b3c">

Task queue times in Astro after the change on the second run of the DAG:
<img width="1624" alt="Screenshot 2024-06-03 at 11 15 44"
src="https://github.com/astronomer/astronomer-cosmos/assets/272048/c7b8a821-8751-4d2c-8feb-1d0c9bbba97e">

This feature will be available in `astronomer-cosmos==1.5.0a8`.
  • Loading branch information
tatiana authored Jun 25, 2024
1 parent f983783 commit 9ce6faf
Show file tree
Hide file tree
Showing 22 changed files with 994 additions and 63 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
- uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- uses: actions/cache@v3
with:
path: |
Expand All @@ -139,6 +140,7 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-setup
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
env:
AIRFLOW__COSMOS__ENABLE_CACHE_DBT_LS: 0
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:[email protected]:5432/postgres
DATABRICKS_HOST: mock
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ repos:
types-PyYAML,
types-attrs,
attrs,
types-pytz,
types-requests,
types-python-dateutil,
apache-airflow,
Expand Down
77 changes: 76 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,83 @@
Changelog
=========

1.5.0a9 (2024-06-25)
--------------------

New Features

* Speed up ``LoadMode.DBT_LS`` by caching dbt ls output in Airflow Variable by @tatiana in #1014
* Support for running dbt tasks in AWS EKS in #944 by @VolkerSchiewe
* Add Clickhouse profile mapping by @roadan and @pankajastro in #353 and #1016
* Add node config to TaskInstance Context by @linchun3 in #1044

Bug fixes

* Fix disk permission error in restricted env by @pankajastro in #1051
* Add CSP header to iframe contents by @dwreeves in #1055
* Stop attaching log adaptors to root logger to reduce logging costs by @glebkrapivin in #1047

Enhancements

* Support ``static_index.html`` docs by @dwreeves in #999
* Support deep linking dbt docs via Airflow UI by @dwreeves in #1038
* Add ability to specify host/port for Snowflake connection by @whummer in #1063

Others

* Update documentation for DbtDocs generator by @arjunanan6 in #1043
* Use uv in CI by @dwreeves in #1013
* Cache hatch folder in the CI by @tatiana in #1056
* Change example DAGs to use ``example_conn`` as opposed to ``airflow_db`` by @tatiana in #1054
* Mark plugin integration tests as integration by @tatiana in #1057
* Ensure compliance with linting rule D300 by using triple quotes for docstrings by @pankajastro in #1049
* Pre-commit hook updates in #1039, #1050, #1064


1.4.3 (2024-06-07)
-----------------
------------------

Bug fixes

* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033

Enhancements

* Only run ``dbt deps`` when there are dependencies by @tatiana and @AlgirdasDubickas in #1030

Docs

* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034


v1.4.2 (2024-06-06)
-------------------

Bug fixes

* Fix the invocation mode for ``ExecutionMode.VIRTUALENV`` by @marco9663 in #1023
* Fix Cosmos ``enable_cache`` setting by @tatiana in #1025
* Make ``GoogleCloudServiceAccountDictProfileMapping`` dataset profile arg optional by @oliverrmaa and @pankajastro in #839 and #1017
* Athena profile mapping set ``aws_session_token`` in profile only if it exists by @pankajastro in #1022

Others

* Update dbt and Airflow conflicts matrix by @tatiana in #1026
* Enable Python 3.12 unittest by @pankajastro in #1018
* Improve error logging in ``DbtLocalBaseOperator`` by @davidsteinar in #1004
* Add GitHub issue templates for bug reports and feature request by @pankajkoti in #1009
* Add more fields in bug template to reduce turnaround in issue triaging by @pankajkoti in #1027
* Fix ``dev/Dockerfile`` + Add ``uv pip install`` for faster build time by @dwreeves in #997
* Drop support for Airflow 2.3 by @pankajkoti in #994
* Update Astro Runtime image by @RNHTTR in #988 and #989
* Enable ruff F linting by @pankajastro in #985
* Move Cosmos Airflow configuration to settings.py by @pankajastro in #975
* Fix CI Issues by @tatiana in #1005
* Pre-commit hook updates in #1000, #1019


1.4.1 (2024-05-17)
------------------

Bug fixes

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.4.3"
__version__ = "1.5.0a9"


from cosmos.airflow.dag import DbtDag
Expand Down
193 changes: 184 additions & 9 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,47 @@
from __future__ import annotations

import functools
import hashlib
import json
import os
import shutil
import time
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path

import msgpack
from airflow.models import DagRun, Variable
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from sqlalchemy import select
from sqlalchemy.orm import Session

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.dbt.project import get_partial_parse_path
from cosmos.log import get_logger

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"


def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]:
dag_id = None
task_group_id = None
cosmos_type = "DbtDag"

if task_group:
if task_group.dag_id is not None:
dag_id = task_group.dag_id
if task_group.group_id is not None:
task_group_id = task_group.group_id
cosmos_type = "DbtTaskGroup"
else:
dag_id = dag.dag_id

return {"cosmos_type": cosmos_type, "dag_id": dag_id, "task_group_id": task_group_id}


# It was considered to create a cache identifier based on the dbt project path, as opposed
Expand All @@ -28,16 +57,21 @@ def _create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str:
:param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached
:return: Unique identifier representing the cache
"""
if task_group:
if task_group.dag_id is not None:
cache_identifiers_list = [task_group.dag_id]
if task_group.group_id is not None:
cache_identifiers_list.extend([task_group.group_id.replace(".", "__")])
cache_identifier = "__".join(cache_identifiers_list)
else:
cache_identifier = dag.dag_id
metadata = _get_airflow_metadata(dag, task_group)
cache_identifiers_list = []
dag_id = metadata.get("dag_id")
task_group_id = metadata.get("task_group_id")

if dag_id:
cache_identifiers_list.append(dag_id)
if task_group_id:
cache_identifiers_list.append(task_group_id.replace(".", "__"))

return cache_identifier
return "__".join(cache_identifiers_list)


def create_cache_key(cache_identifier: str) -> str:
return f"{VAR_KEY_CACHE_PREFIX}{cache_identifier}"


def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path:
Expand Down Expand Up @@ -171,3 +205,144 @@ def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: P

if source_manifest_filepath.exists():
shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath))


def _create_folder_version_hash(dir_path: Path) -> str:
"""
Given a directory, iterate through its content and create a hash that will change in case the
contents of the directory change. The value should not change if the values of the directory do not change, even if
the command is run from different Airflow instances.
This method output must be concise and it currently changes based on operating system.
"""
# This approach is less efficient than using modified time
# sum([path.stat().st_mtime for path in dir_path.glob("**/*")])
# unfortunately, the modified time approach does not work well for dag-only deployments
# where DAGs are constantly synced to the deployed Airflow
# for 5k files, this seems to take 0.14
hasher = hashlib.md5()
filepaths = []

for root_dir, dirs, files in os.walk(dir_path):
paths = [os.path.join(root_dir, filepath) for filepath in files]
filepaths.extend(paths)

for filepath in sorted(filepaths):
with open(str(filepath), "rb") as fp:
buf = fp.read()
hasher.update(buf)

return hasher.hexdigest()


def _calculate_dbt_ls_cache_current_version(cache_identifier: str, project_dir: Path, cmd_args: list[str]) -> str:
"""
Taking into account the project directory contents and the command arguments, calculate the
hash that represents the "dbt ls" command version - to be used to decide if the cache should be refreshed or not.
:param cache_identifier: Unique identifier of the cache (may include DbtDag or DbtTaskGroup information)
:param project_path: Path to the target dbt project directory
:param cmd_args: List containing the arguments passed to the dbt ls command that would affect its output
"""
start_time = time.perf_counter()

# Combined value for when the dbt project directory files were last modified
# This is fast (e.g. 0.01s for jaffle shop, 0.135s for a 5k models dbt folder)
dbt_project_hash = _create_folder_version_hash(project_dir)

# The performance for the following will depend on the user's configuration
hash_args = hashlib.md5("".join(cmd_args).encode()).hexdigest()

elapsed_time = time.perf_counter() - start_time
logger.info(
f"Cosmos performance: time to calculate cache identifier {cache_identifier} for current version: {elapsed_time}"
)
return f"{dbt_project_hash},{hash_args}"


@functools.lru_cache
def was_project_modified(previous_version: str, current_version: str) -> bool:
"""
Given the cache version of a project and the latest version of the project,
decides if the project was modified or not.
"""
return previous_version != current_version


@provide_session
def delete_unused_dbt_ls_cache(
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete Cosmos cache stored in Airflow Variables based on the last execution of their associated DAGs.
Example usage:
There are three Cosmos cache Airflow Variables:
1. ``cache cosmos_cache__basic_cosmos_dag``
2. ``cosmos_cache__basic_cosmos_task_group__orders``
3. ``cosmos_cache__basic_cosmos_task_group__customers``
The first relates to the ``DbtDag`` ``basic_cosmos_dag`` and the two last ones relate to the DAG
``basic_cosmos_task_group`` that has two ``DbtTaskGroups``: ``orders`` and ``customers``.
Let's assume the last DAG run of ``basic_cosmos_dag`` was a week ago and the last DAG run of
``basic_cosmos_task_group`` was an hour ago.
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 5 days ago:
..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=5))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 minutes ago:
..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(minutes=10))
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_dag
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
INFO - Removing the dbt ls cache cosmos_cache__basic_cosmos_task_group__orders
To delete the cache related to ``DbtDags`` and ``DbtTaskGroup`` that were run more than 10 days ago
..code: python
>>> delete_unused_dbt_ls_cache(max_age_last_usage=timedelta(days=10))
In this last example, nothing is deleted.
"""
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored in Airflow Variables that hasn't been used for {max_age_last_usage}")
cosmos_dags_ids = defaultdict(list)
all_variables = session.scalars(select(Variable)).all()
total_cosmos_variables = 0
deleted_cosmos_variables = 0

# Identify Cosmos-related cache in Airflow variables
for var in all_variables:
if var.key.startswith(VAR_KEY_CACHE_PREFIX):
var_value = json.loads(var.val)
cosmos_dags_ids[var_value["dag_id"]].append(var.key)
total_cosmos_variables += 1

# Delete DAGs that have not been run in the last X time
for dag_id, vars_keys in cosmos_dags_ids.items():
last_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.execution_date.desc())
.first()
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for var_key in vars_keys:
logger.info(f"Removing the dbt ls cache {var_key}")
Variable.delete(var_key)
deleted_cosmos_variables += 1

logger.info(
f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. "
)
return deleted_cosmos_variables
1 change: 1 addition & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class RenderConfig:
dbt_ls_path: Path | None = None
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class LoadMode(Enum):
CUSTOM = "custom"
DBT_LS = "dbt_ls"
DBT_LS_FILE = "dbt_ls_file"
DBT_LS_CACHE = "dbt_ls_cache"
DBT_MANIFEST = "dbt_manifest"


Expand Down
Loading

0 comments on commit 9ce6faf

Please sign in to comment.