An implementation of the dagster-pipes protocol using pgrx to report an asset materialization from a PostgreSQL user defined function. Inspired by:
- PGRX bad ideas
- Dagster Pipes rust example
The pg_pipes
extension exposes a pipes_session
postgres UDF that takes a query, calls EXPLAIN ANALYZE
on it, and reports an AssetMaterialization
with the query plan & execution details as metadata.
The pipes_session
can be invoked from a Dagster orchestration process like this:
from typing import Iterator, Union
from pathlib import Path
from dagster import (
AssetExecutionContext,
PipesTempFileContextInjector,
PipesTempFileMessageReader,
MaterializeResult,
AssetCheckResult,
asset,
open_pipes_session,
)
import psycopg2
@asset
def pg_pipes_asset(
context: AssetExecutionContext,
) -> Iterator[Union[MaterializeResult, AssetCheckResult]]:
with open_pipes_session(
context=context,
context_injector=PipesTempFileContextInjector(),
message_reader=PipesTempFileMessageReader(),
) as pipes_session:
context_path = pipes_session.context_injector_params["path"]
messages_path = pipes_session.message_reader_params["path"]
conn = psycopg2.connect(
database="pg_pipes", host=Path.home() / ".pgrx", port=28813
)
cursor = conn.cursor()
# make sure the extension is enabled
cursor.execute("drop extension if exists pg_pipes; create extension pg_pipes;")
query = "INSERT INTO example (value) VALUES (''1''), (''2''), (''3'')"
# start the pipes_session in postgres
cursor.execute(
f"select pipes_session('{context_path}', '{messages_path}', '{query}');"
)
yield from pipes_session.get_results()
The Dagster event log will now contain the ASSET_MATERIALIZATION
event plus the query plan & execution information as metadata:
Follow the system requirements instructions from the pgrx installation docs.
Then install cargo-pgrx
and use it to install a postgres instance that we can load our extension into:
cd pg_pipes
cargo install --locked cargo-pgrx
cargo pgrx init --pg13=download
Now we can start the instance with our extension:
cargo pgrx run
Follow the pdm installation instructions.
Then install & activate:
cd orchestration
pdm install
eval $(pdm venv activate)
Now start Dagster with:
dagster dev -m orchestration