Skip to content

Commit

Permalink
Add Partial to QueryState Enum (#125)
Browse files Browse the repository at this point in the history
Closes #124 (introducing the new execution state to the enum) and adds allow_partial_results=true by default on query executions. The point is so that long running queries don't fail to execute because of a parsing error. I am not sure if there were additional plans to make use of this (like showing progress bar), but generally I would say that this "closes #124".
  • Loading branch information
bh2smith authored Aug 26, 2024
1 parent 775549b commit f71755b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 6 deletions.
2 changes: 2 additions & 0 deletions dune_client/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _build_parameters(
sort_by: Optional[List[str]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
allow_partial_results: str = "true",
) -> Dict[str, Union[str, int]]:
"""
Utility function that builds a dictionary of parameters to be used
Expand All @@ -111,6 +112,7 @@ def _build_parameters(
), "sampling cannot be combined with filters or pagination"

params = params or {}
params["allow_partial_results"] = allow_partial_results
if columns is not None and len(columns) > 0:
params["columns"] = ",".join(columns)
if sample_count is not None:
Expand Down
15 changes: 11 additions & 4 deletions dune_client/api/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ResultsResponse,
ExecutionResultCSV,
DuneError,
ExecutionState,
)
from dune_client.query import QueryBase

Expand Down Expand Up @@ -80,6 +81,7 @@ def get_execution_results(
sample_count: Optional[int] = None,
filters: Optional[str] = None,
sort_by: Optional[List[str]] = None,
allow_partial_results: str = "true",
) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
params = self._build_parameters(
Expand All @@ -89,6 +91,7 @@ def get_execution_results(
sort_by=sort_by,
limit=limit,
offset=offset,
allow_partial_results=allow_partial_results,
)

route = f"/execution/{job_id}/results"
Expand Down Expand Up @@ -126,9 +129,7 @@ def get_execution_results_csv(
return self._get_execution_results_csv_by_url(url=url, params=params)

def _get_execution_results_by_url(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
self, url: str, params: Optional[Dict[str, Any]] = None
) -> ResultsResponse:
"""
GET results from Dune API with a given URL. This is particularly useful for pagination.
Expand All @@ -137,7 +138,13 @@ def _get_execution_results_by_url(

response_json = self._get(url=url, params=params)
try:
return ResultsResponse.from_dict(response_json)
result = ResultsResponse.from_dict(response_json)
if result.state == ExecutionState.PARTIAL:
self.logger.warning(
f"execution {result.execution_id} resulted in a partial "
f"result set (i.e. results too large)."
)
return result
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

Expand Down
5 changes: 4 additions & 1 deletion dune_client/api/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def run_query(
sample_count: Optional[int] = None,
filters: Optional[str] = None,
sort_by: Optional[List[str]] = None,
allow_partial_results: str = "true",
) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
Expand Down Expand Up @@ -83,6 +84,7 @@ def run_query(
filters=filters,
sort_by=sort_by,
limit=limit,
allow_partial_results=allow_partial_results,
),
)

Expand Down Expand Up @@ -416,10 +418,11 @@ def _refresh(
)
time.sleep(ping_frequency)
status = self.get_execution_status(job_id)
if status.state == ExecutionState.PENDING:
self.logger.warning("Partial result set retrieved.")
if status.state == ExecutionState.FAILED:
self.logger.error(status)
raise QueryFailed(f"Error data: {status.error}")

return job_id

def _fetch_entire_result(
Expand Down
1 change: 1 addition & 0 deletions dune_client/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ async def _get() -> Any:
headers=self.default_headers(),
params=params,
)
print(response)
if raw:
return response
return await self._handle_response(response)
Expand Down
4 changes: 3 additions & 1 deletion dune_client/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ExecutionState(Enum):

COMPLETED = "QUERY_STATE_COMPLETED"
EXECUTING = "QUERY_STATE_EXECUTING"
PARTIAL = "QUERY_STATE_COMPLETED_PARTIAL"
PENDING = "QUERY_STATE_PENDING"
CANCELLED = "QUERY_STATE_CANCELLED"
FAILED = "QUERY_STATE_FAILED"
Expand All @@ -57,7 +58,7 @@ def terminal_states(cls) -> set[ExecutionState]:
"""
Returns the terminal states (i.e. when a query execution is no longer executing
"""
return {cls.COMPLETED, cls.CANCELLED, cls.FAILED, cls.EXPIRED}
return {cls.COMPLETED, cls.CANCELLED, cls.FAILED, cls.EXPIRED, cls.PARTIAL}

def is_complete(self) -> bool:
"""Returns True is state is completed, otherwise False."""
Expand Down Expand Up @@ -311,6 +312,7 @@ class ResultsResponse:
@classmethod
def from_dict(cls, data: dict[str, str | int | ResultData]) -> ResultsResponse:
"""Constructor from dictionary. See unit test for sample input."""
print(data)
assert isinstance(data["execution_id"], str)
assert isinstance(data["query_id"], int)
assert isinstance(data["state"], str)
Expand Down

0 comments on commit f71755b

Please sign in to comment.