Skip to content

Commit

Permalink
[dagster-airlift] [tutorial] fix readme parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Oct 15, 2024
1 parent ffa77e7 commit efe6b1b
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,5 @@ dagster_run:


update_readme_snippets:
python ../../scripts/update_readme_snippets.py \
$(MAKEFILE_DIR)/README.md \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/peer.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/observe.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/migrate.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/standalone.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/migrate_with_check.py \
$(MAKEFILE_DIR)/tutorial_example/dagster_defs/stages/peer_with_check.py
python ../../scripts/update_readme_snippets.py $(MAKEFILE_DIR)/README.md

Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ uv pip install 'dagster-airlift[core]' dagster-webserver dagster
Next, create a `Definitions` object using `build_defs_from_airflow_instance`. You can use the empty [`tutorial_example/dagster_defs/definitions.py`](./tutorial_example/dagster_defs/definitions.py) file as a starting point:

```python
# peer.py
# tutorial_example/dagster_defs/stages/peer.py
from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance

defs = build_defs_from_airflow_instance(
Expand All @@ -91,7 +91,6 @@ defs = build_defs_from_airflow_instance(
name="airflow_instance_one",
)
)

```

This function creates:
Expand Down Expand Up @@ -189,7 +188,7 @@ uv pip install 'dagster-airlift[dbt]'
Then, we will construct our assets:

```python
# observe.py
# tutorial_example/dagster_defs/stages/observe.py
import os
from pathlib import Path

Expand Down Expand Up @@ -241,7 +240,6 @@ defs = build_defs_from_airflow_instance(
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)

```

### Viewing observed assets
Expand Down Expand Up @@ -281,25 +279,26 @@ tasks:
Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG:
```python
# tutorial_example/airflow_dags/dags.py
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.proxied_state import load_proxied_state_from_yaml
# tutorial_example/snippets/dags_truncated.py
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path

from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml

dag = DAG("rebuild_customers_list", ...)

dag = DAG("rebuild_customers_list")
...

# Set this to True to begin the proxying process
PROXYING = False

if PROXYING:
mark_as_dagster_proxying(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(
Path(__file__).parent / "proxied_state"
),
)
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)
```
Set `PROXYING` to `True` or eliminate the `if` statement.
Expand Down Expand Up @@ -334,16 +333,13 @@ We can create a custom `BaseProxyToDagsterOperator` subclass which will retrieve
will be made using that api key.

```python
# tutorial_example/custom_operator_examples/custom_proxy.py
# tutorial_example/snippets/custom_operator_examples/custom_proxy.py
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import (
BaseProxyTaskToDagsterOperator,
proxying_to_dagster,
)
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
Expand All @@ -370,7 +366,6 @@ proxying_to_dagster(
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
build_from_task_fn=CustomProxyToDagsterOperator.build_from_task,
)
```

#### Dagster Plus Authorization
Expand All @@ -379,13 +374,13 @@ You can use a customer proxy operator to establish a connection to a Dagster plu
Airflow Variables. To set a Dagster+ user token, follow this guide: https://docs.dagster.io/dagster-plus/account/managing-user-agent-tokens#managing-user-tokens.

```python
# tutorial_example/custom_operator_examples/plus_proxy_operator.py
# tutorial_example/snippets/custom_operator_examples/plus_proxy_operator.py
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyToDagsterOperator
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator
class DagsterCloudProxyOperator(BaseProxyToDagsterOperator):
class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator):
def get_variable(self, context: Context, var_name: str) -> str:
if "var" not in context:
raise ValueError("No variables found in context")
Expand All @@ -408,12 +403,11 @@ class DagsterCloudProxyOperator(BaseProxyToDagsterOperator):
For some common operator patterns, like our dbt operator, Dagster supplies factories to build software defined assets for our tasks. In fact, the `@dbt_assets` decorator used earlier already backs its assets with definitions, so we can toggle the proxied state of the `build_dbt_models` task to `proxied: True` in the proxied state file:

```yaml
# tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml
# tutorial_example/snippets/dbt_proxied.yaml
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
# change this to move execution to Dagster
proxied: True
- id: export_customers
proxied: False
Expand All @@ -438,7 +432,7 @@ For all other operator types, we will need to build our own asset definitions. W
For example, our `load_raw_customers` task uses a custom `LoadCSVToDuckDB` operator. We'll define a function `load_csv_to_duckdb_defs` factory to build corresponding software-defined assets. Similarly for `export_customers` we'll define a function `export_duckdb_to_csv_defs` to build SDAs:

```python
# migrate.py
# tutorial_example/dagster_defs/stages/migrate.py
import os
from pathlib import Path
Expand Down Expand Up @@ -545,20 +539,16 @@ defs = build_defs_from_airflow_instance(
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)
```

We can then toggle the proxied state of the remaining tasks in the `proxied_state` file:

```yaml
# tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml
# tutorial_example/snippets/all_proxied.yaml
tasks:
- id: load_raw_customers
proxied: True
- id: build_dbt_models
# change this to move execution to Dagster
proxied: True
- id: export_customers
proxied: True
Expand All @@ -571,7 +561,7 @@ Once we are confident in our migrated versions of the tasks, we can decommission
Next, we can strip the task associations from our Dagster definitions. This can be done by removing the `assets_with_task_mappings` call. We can use this opportunity to attach our assets to a `ScheduleDefinition` so that Dagster's scheduler can manage their execution:

```python
# standalone.py
# tutorial_example/dagster_defs/stages/standalone.py
import os
from pathlib import Path
Expand Down Expand Up @@ -689,7 +679,6 @@ defs = Definitions(
asset_checks=[validate_exported_csv],
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
)
```

## Addendum: Adding asset checks
Expand All @@ -699,7 +688,7 @@ Once you have peered your Airflow DAGs in Dagster, regardless of migration progr
For example, given a peered version of our DAG, we can add an asset check to ensure that the final `customers` CSV output exists and has a non-zero number of rows:

```python
# peer_with_check.py
# tutorial_example/dagster_defs/stages/peer_with_check.py
import os
from pathlib import Path
Expand Down Expand Up @@ -745,7 +734,6 @@ defs = Definitions.merge(
),
Definitions(asset_checks=[validate_exported_csv]),
)
```

Once we have introduced representations of the assets produced by our Airflow tasks, we can directly attach asset checks to these assets. These checks will run once the corresponding task completes, regardless of whether the task is executed in Airflow or Dagster.
Expand All @@ -756,7 +744,7 @@ Asset checks on an observed or migrated DAG
</summary>

```python
# migrate_with_check.py
# tutorial_example/dagster_defs/stages/migrate_with_check.py
import os
from pathlib import Path
Expand Down Expand Up @@ -890,9 +878,6 @@ defs = build_defs_from_airflow_instance(
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)
```

</details>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
tasks:
- id: load_raw_customers
proxied: True
- id: build_dbt_models
proxied: True
- id: export_customers
proxied: True
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path

from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml

dag = DAG("rebuild_customers_list", ...)

...

# Set this to True to begin the proxying process
PROXYING = False

if PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: True
- id: export_customers
proxied: False
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
def test_dagster_cloud_proxy_operator(mock_airflow_variable: None) -> None:
from tutorial_example.custom_operator_examples.plus_proxy_operator import (
from tutorial_example.snippets.custom_operator_examples.plus_proxy_operator import (
DagsterCloudProxyOperator,
)

Expand Down Expand Up @@ -27,7 +27,7 @@ def test_dagster_cloud_proxy_operator(mock_airflow_variable: None) -> None:


def test_custom_proxy_operator(mock_airflow_variable: None) -> None:
from tutorial_example.custom_operator_examples.custom_proxy import CustomProxyToDagsterOperator
from tutorial_example.snippets.custom_operator_examples.custom_proxy import CustomProxyToDagsterOperator

operator = CustomProxyToDagsterOperator(task_id="test_task")
assert (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import re
import sys
from pathlib import Path
from typing import Optional

from structlog import get_logger

logger = get_logger("update_readme_snippets")

MAIN = 'if __name__ == "__main__":'


def _get_regex_match_snippet(file_name: str) -> re.Pattern[str]:
return re.compile(r"```python\n# " + file_name + "\n([\\S\n\t\v ]*?)\n```", re.MULTILINE)
def _get_regex_match_snippet(file_name: Optional[str] = None) -> re.Pattern[str]:
file_name_capture = "(.+)" if not file_name else re.escape(file_name)
return re.compile(
r"```(python|yaml)\n# " + file_name_capture + "\n(?:[\\S\n\t\v ]*?)\n```", re.MULTILINE
)


def update_readme_snippets(readme_filepath_raw: str, *snippet_files_raw: str):
def update_readme_snippets(readme_filepath_raw: str):
"""Given a path to a README file and a list of paths to snippet files,
update any python code blocks in the README file that match the snippet file names
with the contents of the snippet files.
Expand All @@ -25,25 +33,29 @@ def update_readme_snippets(readme_filepath_raw: str, *snippet_files_raw: str):
Will be replaced with the contents of `my_snippet.py`, if a file with that name is passed.
"""
readme_file = Path(readme_filepath_raw)
snippet_files = [Path(snippet_file_raw) for snippet_file_raw in snippet_files_raw]
base_dir = readme_file.parent

readme_contents = readme_file.read_text()

for snippet_file in snippet_files:
matches = re.findall(_get_regex_match_snippet(), readme_contents)
for file_type, file_name in matches:
# Attempt to find the snippet file in the base dir
snippet_file: Path = base_dir / file_name
if not snippet_file.exists():
raise ValueError(f"Could not find snippet file {snippet_file}.")
snippet_contents = snippet_file.read_text()
# Remove the MAIN block if it exists
if MAIN in snippet_contents:
snippet_contents = snippet_contents[: snippet_contents.index(MAIN)]
# Fix newline escaping
snippet_contents = snippet_contents.replace("\\n", "\\\\n")

file_name = snippet_file.name
regex_match_snippet = _get_regex_match_snippet(file_name)

if re.search(regex_match_snippet, readme_contents):
readme_contents = re.sub(
regex_match_snippet,
f"```python\n# {file_name}\n{snippet_contents}\n```",
readme_contents,
)
# Strip whitespace at the beginning and end of the snippet
snippet_contents = snippet_contents.strip()
readme_contents = re.sub(
_get_regex_match_snippet(file_name),
f"```{file_type}\n# {file_name}\n{snippet_contents}\n```",
readme_contents,
)

readme_file.write_text(readme_contents)

Expand Down

0 comments on commit efe6b1b

Please sign in to comment.