Skip to content

Commit

Permalink
[dagster-dlift] get models
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Oct 24, 2024
1 parent 784c97b commit f3c3619
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import requests

from dagster_dlift.gql_queries import VERIFICATION_QUERY
from dagster_dlift.gql_queries import GET_DBT_MODELS_QUERY, VERIFICATION_QUERY

ENVIRONMENTS_SUBPATH = "environments/"

Expand Down Expand Up @@ -63,6 +63,17 @@ def list_environment_ids(self) -> Sequence[int]:
for environment in self.make_access_api_request(ENVIRONMENTS_SUBPATH)["data"]
]

def get_environment_id_by_name(self, environment_name: str) -> int:
return next(
iter(
[
environment["id"]
for environment in self.make_access_api_request(ENVIRONMENTS_SUBPATH)["data"]
if environment["name"] == environment_name
]
)
)

def verify_connections(self) -> None:
# Verifies connection to both the access and discovery APIs.
for environment_id in self.list_environment_ids():
Expand All @@ -78,3 +89,26 @@ def verify_connections(self) -> None:
raise Exception(
f"Failed to verify connection to environment {environment_id}. Response: {response}"
)

def get_dbt_models(self, environment_id: int) -> Sequence[Mapping[str, Any]]:
models = []
page_size = 100
start_cursor = 0
while response := self.make_discovery_api_query(
GET_DBT_MODELS_QUERY,
{"environmentId": environment_id, "first": page_size, "after": start_cursor},
):
models.extend(
[
model["node"]
for model in response["data"]["environment"]["definition"]["models"]["edges"]
]
)
if not response["data"]["environment"]["definition"]["models"]["pageInfo"][
"hasNextPage"
]:
break
start_cursor = response["data"]["environment"]["definition"]["models"]["pageInfo"][
"endCursor"
]
return models
25 changes: 25 additions & 0 deletions examples/experimental/dagster-dlift/dagster_dlift/gql_queries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
GET_DBT_MODELS_QUERY = """
query GetModelsQuery($environmentId: BigInt!, $first: Int) {
environment(id: $environmentId) {
definition {
models(first: $first) {
pageInfo {
hasNextPage
endCursor
}
edges {
node {
schema
parents {
uniqueId
}
uniqueId
tags
}
}
}
}
}
}
"""

VERIFICATION_QUERY = """
query VerificationQuery($environmentId: BigInt!) {
environment(id: $environmentId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Mapping, NamedTuple
from typing import Any, Mapping, NamedTuple, Sequence

from dagster_dlift.cloud_instance import DbtCloudInstance

Expand Down Expand Up @@ -44,3 +44,27 @@ def make_discovery_api_query(
f"ExpectedDiscoveryApiRequest({query}, {variables}) not found in discovery_api_responses"
)
return self.discovery_api_responses[ExpectedDiscoveryApiRequest(query, variables)]


def build_model_response(
unique_id: str, parents: Sequence[str], has_next_page: bool = False, start_cursor: int = 0
) -> Mapping[str, Any]:
return {
"data": {
"environment": {
"definition": {
"models": {
"pageInfo": {"hasNextPage": has_next_page, "endCursor": start_cursor + 1},
"edges": [
{
"node": {
"uniqueId": unique_id,
"parents": [{"uniqueId": parent} for parent in parents],
}
},
],
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import pytest
from dagster_dlift.cloud_instance import ENVIRONMENTS_SUBPATH
from dagster_dlift.gql_queries import VERIFICATION_QUERY
from dagster_dlift.gql_queries import GET_DBT_MODELS_QUERY, VERIFICATION_QUERY
from dagster_dlift.test.instance_fake import (
DbtCloudInstanceFake,
ExpectedAccessApiRequest,
ExpectedDiscoveryApiRequest,
build_model_response,
)


Expand Down Expand Up @@ -52,3 +53,36 @@ def test_verification() -> None:
},
)
fake_instance.verify_connections()


def test_get_models() -> None:
"""Test that we can get models from the instance, even if they are paginated."""
expected_unique_id_dep_graph = {
"model.jaffle_shop.customers": {
"model.jaffle_shop.stg_customers",
"model.jaffle_shop.stg_orders",
},
"model.jaffle_shop.stg_customers": set(),
"model.jaffle_shop.stg_orders": set(),
}
fake_instance = DbtCloudInstanceFake(
access_api_responses={},
discovery_api_responses={
ExpectedDiscoveryApiRequest(
query=GET_DBT_MODELS_QUERY,
variables={"environmentId": 1, "first": 100, "after": idx},
): build_model_response(
unique_id=unique_id,
parents=parents,
has_next_page=True if idx < len(expected_unique_id_dep_graph) - 1 else False,
start_cursor=idx,
)
for idx, (unique_id, parents) in enumerate(expected_unique_id_dep_graph.items())
},
)
models = fake_instance.get_dbt_models(1)
assert len(models) == len(expected_unique_id_dep_graph)
assert {model["uniqueId"] for model in models} == set(expected_unique_id_dep_graph.keys())
assert {
model["uniqueId"]: {parent["uniqueId"] for parent in model["parents"]} for model in models
} == expected_unique_id_dep_graph
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
TEST_ENV_NAME = "test"
EXPECTED_TAG = "test"
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dagster_dlift.cloud_instance import DbtCloudInstance
from dlift_kitchen_sink.constants import TEST_ENV_NAME
from dlift_kitchen_sink.instance import get_instance


Expand All @@ -8,3 +9,9 @@ def test_cloud_instance() -> None:
assert isinstance(instance, DbtCloudInstance)

instance.verify_connections()


def test_get_test_env() -> None:
"""Test that we can get the test environment ID."""
instance = get_instance()
assert instance.get_environment_id_by_name(TEST_ENV_NAME)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from dlift_kitchen_sink.constants import EXPECTED_TAG, TEST_ENV_NAME
from dlift_kitchen_sink.instance import get_instance


def test_get_models() -> None:
env_id = get_instance().get_environment_id_by_name(TEST_ENV_NAME)
# Filter to only the models that we use for testing.
models_response = [
model for model in get_instance().get_dbt_models(env_id) if EXPECTED_TAG in model["tags"]
]

assert len(models_response) == 3
customers = next(
iter(
[
model
for model in models_response
if model["uniqueId"] == "model.test_environment.customers"
]
)
)
assert len(customers["parents"]) == 2
assert {parent["uniqueId"] for parent in customers["parents"]} == {
"model.test_environment.stg_customers",
"model.test_environment.stg_orders",
}
stg_customers = next(
iter(
[
model
for model in models_response
if model["uniqueId"] == "model.test_environment.stg_customers"
]
)
)
assert len(stg_customers["parents"]) == 1
assert {parent["uniqueId"] for parent in stg_customers["parents"]} == {
"source.test_environment.jaffle_shop.customers_raw"
}
stg_orders = next(
iter(
[
model
for model in models_response
if model["uniqueId"] == "model.test_environment.stg_orders"
]
)
)
assert len(stg_orders["parents"]) == 1
assert {parent["uniqueId"] for parent in stg_orders["parents"]} == {
"source.test_environment.jaffle_shop.orders_raw"
}

0 comments on commit f3c3619

Please sign in to comment.