Skip to content

Commit

Permalink
[dagster-dlift] get_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Oct 24, 2024
1 parent ca2f8d0 commit 1de15bb
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster_dlift.gql_queries import (
GET_DBT_MODELS_QUERY,
GET_DBT_SOURCES_QUERY,
GET_DBT_TESTS_QUERY,
VERIFICATION_QUERY,
)

Expand Down Expand Up @@ -139,3 +140,26 @@ def get_dbt_sources(self, environment_id: int) -> Sequence[Mapping[str, Any]]:
"endCursor"
]
return sources

def get_dbt_tests(self, environment_id: int) -> Sequence[Mapping[str, Any]]:
tests = []
page_size = 100
start_cursor = 0
while response := self.make_discovery_api_query(
GET_DBT_TESTS_QUERY,
{"environmentId": environment_id, "first": page_size, "after": start_cursor},
):
tests.extend(
[
test["node"]
for test in response["data"]["environment"]["definition"]["tests"]["edges"]
]
)
if not response["data"]["environment"]["definition"]["tests"]["pageInfo"][
"hasNextPage"
]:
break
start_cursor = response["data"]["environment"]["definition"]["tests"]["pageInfo"][
"endCursor"
]
return tests
25 changes: 25 additions & 0 deletions examples/experimental/dagster-dlift/dagster_dlift/gql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@
}
"""

GET_DBT_TESTS_QUERY = """
query GetTestsQuery($environmentId: BigInt!, $first: Int) {
environment(id: $environmentId) {
definition {
tests(first: $first) {
pageInfo {
hasNextPage
endCursor
}
edges {
node {
parents {
uniqueId
resourceType
}
uniqueId
name
}
}
}
}
}
}
"""

VERIFICATION_QUERY = """
query VerificationQuery($environmentId: BigInt!) {
environment(id: $environmentId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,16 @@ def build_source_response(
}
}
)


def build_test_response(
unique_id: str, parents: Sequence[str], has_next_page: bool = False, start_cursor: int = 0
) -> Mapping[str, Any]:
return build_definition_response(
{
"tests": {
"pageInfo": build_page_info(has_next_page, start_cursor),
"edges": [build_edge(unique_id, parents)],
}
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dagster_dlift.gql_queries import (
GET_DBT_MODELS_QUERY,
GET_DBT_SOURCES_QUERY,
GET_DBT_TESTS_QUERY,
VERIFICATION_QUERY,
)
from dagster_dlift.test.instance_fake import (
Expand All @@ -11,6 +12,7 @@
ExpectedDiscoveryApiRequest,
build_model_response,
build_source_response,
build_test_response,
)


Expand Down Expand Up @@ -117,3 +119,30 @@ def test_get_sources() -> None:
assert {
source["uniqueId"] for source in fake_instance.get_dbt_sources(1)
} == expected_unique_id_sources


def test_get_tests() -> None:
"""Tests that we can get tests from the instance, even if they are paginated."""
expected_unique_id_tests = {
"test.jaffle_shop.stg_customers.test_stg_customers": {"model.jaffle_shop.stg_customers"},
"test.jaffle_shop.stg_orders.test_stg_orders": {"model.jaffle_shop.stg_orders"},
}

fake_instance = DbtCloudInstanceFake(
access_api_responses={},
discovery_api_responses={
ExpectedDiscoveryApiRequest(
query=GET_DBT_TESTS_QUERY,
variables={"environmentId": 1, "first": 100, "after": idx},
): build_test_response(
unique_id=unique_id,
has_next_page=True if idx < len(expected_unique_id_tests) - 1 else False,
start_cursor=idx,
parents=list(parents),
)
for idx, (unique_id, parents) in enumerate(expected_unique_id_tests.items())
},
)
assert {source["uniqueId"] for source in fake_instance.get_dbt_tests(1)} == set(
expected_unique_id_tests.keys()
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from dagster_dlift.cloud_instance import DbtCloudInstance
from dagster_dlift.test.utils import get_env_var

from dlift_kitchen_sink.constants import TEST_ENV_NAME


def get_instance() -> DbtCloudInstance:
return DbtCloudInstance(
Expand All @@ -9,3 +11,7 @@ def get_instance() -> DbtCloudInstance:
access_url=get_env_var("KS_DBT_CLOUD_ACCESS_URL"),
discovery_api_url=get_env_var("KS_DBT_CLOUD_DISCOVERY_API_URL"),
)


def get_environment_id() -> int:
return get_instance().get_environment_id_by_name(TEST_ENV_NAME)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
from dagster_dlift.cloud_instance import DbtCloudInstance
from dlift_kitchen_sink.instance import get_environment_id, get_instance


@pytest.fixture
def instance() -> DbtCloudInstance:
return get_instance()


@pytest.fixture
def environment_id() -> int:
return get_environment_id()
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster_dlift.cloud_instance import DbtCloudInstance
from dlift_kitchen_sink.constants import TEST_ENV_NAME
from dlift_kitchen_sink.instance import get_instance
from dlift_kitchen_sink.instance import get_environment_id, get_instance


def test_cloud_instance() -> None:
Expand All @@ -13,5 +12,4 @@ def test_cloud_instance() -> None:

def test_get_test_env() -> None:
"""Test that we can get the test environment ID."""
instance = get_instance()
assert instance.get_environment_id_by_name(TEST_ENV_NAME)
assert get_environment_id()
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from dlift_kitchen_sink.constants import TEST_ENV_NAME
from dlift_kitchen_sink.instance import get_instance
from dagster_dlift.cloud_instance import DbtCloudInstance


def test_get_models() -> None:
env_id = get_instance().get_environment_id_by_name(TEST_ENV_NAME)
models_response = get_instance().get_dbt_models(env_id)
def test_get_models(instance: DbtCloudInstance, environment_id: int) -> None:
models_response = instance.get_dbt_models(environment_id)
assert len(models_response) == 3
customers = next(
iter(
Expand Down Expand Up @@ -48,12 +46,27 @@ def test_get_models() -> None:
}


def test_get_sources() -> None:
def test_get_sources(instance: DbtCloudInstance, environment_id: int) -> None:
"""Test that we can get sources from the instance."""
env_id = get_instance().get_environment_id_by_name(TEST_ENV_NAME)
sources_response = get_instance().get_dbt_sources(env_id)
sources_response = instance.get_dbt_sources(environment_id)
assert len(sources_response) == 2
assert {source["uniqueId"] for source in sources_response} == {
"source.jaffle_shop.jaffle_shop.customers_raw",
"source.jaffle_shop.jaffle_shop.orders_raw",
}


def test_get_tests(instance: DbtCloudInstance, environment_id: int) -> None:
"""Test that we can get tests from the instance."""
tests_response = instance.get_dbt_tests(environment_id)
assert {test["name"] for test in tests_response} == {
"accepted_values_stg_orders_status__placed__shipped__completed__return_pending__returned",
"not_null_customers_customer_id",
"not_null_stg_customers_customer_id",
"not_null_stg_orders_customer_id",
"not_null_stg_orders_order_id",
"relationships_stg_orders_customer_id__customer_id__ref_stg_customers_",
"unique_customers_customer_id",
"unique_stg_customers_customer_id",
"unique_stg_orders_order_id",
}

0 comments on commit 1de15bb

Please sign in to comment.