Skip to content

Commit

Permalink
Function Arguments for all Client Methods (#37)
Browse files Browse the repository at this point in the history
Cleaning up the function arguments for ergonomic and stylistic reasons.
  • Loading branch information
bh2smith authored Mar 25, 2024
1 parent 86d0349 commit 6e3b5dd
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 171 deletions.
208 changes: 83 additions & 125 deletions src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,14 @@ import {
ExecutionResponseCSV,
SuccessResponse,
} from "../types";
import { ageInHours, sleep } from "../utils";
import { sleep } from "../utils";
import log from "loglevel";
import { logPrefix } from "../utils";
import { ExecutionAPI } from "./execution";
import {
MAX_NUM_ROWS_PER_BATCH,
POLL_FREQUENCY_SECONDS,
THREE_MONTHS_IN_HOURS,
} from "../constants";
import {
ExecutionParams,
ExecutionPerformance,
UploadCSVParams,
} from "../types/requestPayload";
import { POLL_FREQUENCY_SECONDS } from "../constants";
import { ExecutionParams, UploadCSVArgs } from "../types/requestPayload";
import { QueryAPI } from "./query";
import { RunQueryArgs, RunSqlArgs } from "../types/client";

/// Various states of query execution that are "terminal".
const TERMINAL_STATES = [
Expand All @@ -55,37 +48,27 @@ export class DuneClient {

/**
* Runs an existing query by ID via execute, await, return results.
* @param queryID id of the query to be executed
* @param params execution parameters (includes query parameters and execution performance)
* @param batchSize puts a limit on the number of results
* @param pingFrequency how frequently should we check execution status (default: 1s)
*
* @param {RunQueryArgs} args
* @returns Execution Results
*/
async runQuery(
queryID: number,
params?: ExecutionParams,
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<ResultsResponse> {
const { state, execution_id: jobID } = await this._runInner(
queryID,
async runQuery(args: RunQueryArgs): Promise<ResultsResponse> {
const { queryId, params, opts } = args;
const { state, execution_id } = await this._runInner(
queryId,
params,
pingFrequency,
opts?.pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
const result = await this.getLatestResult(
queryID,
params?.query_parameters,
batchSize,
);
if (result.execution_id !== jobID) {
const result = await this.getLatestResult(args);
if (result.execution_id !== execution_id) {
throw new DuneError(
`invalid execution ID: expected ${jobID}, got ${result.execution_id}`,
`invalid execution ID: expected ${execution_id}, got ${result.execution_id}`,
);
}
return result;
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
const message = `refresh (execution ${execution_id}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
log.error(logPrefix, message);
throw new DuneError(message);
Expand All @@ -94,28 +77,25 @@ export class DuneClient {

/**
* Runs an existing query by ID via execute, await, return Result CSV.
* @param queryID id of the query to be executed
* @param params execution parameters (includes query parameters and execution performance)
* @param pingFrequency how frequently should we check execution status (default: 1s)
*
* @param {RunQueryArgs} args
* @returns Execution Results as CSV
*/
async runQueryCSV(
queryID: number,
params?: ExecutionParams,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<ExecutionResponseCSV> {
const { state, execution_id: jobID } = await this._runInner(
queryID,
async runQueryCSV(args: RunQueryArgs): Promise<ExecutionResponseCSV> {
const { queryId, params, opts } = args;
const { state, execution_id } = await this._runInner(
queryId,
params,
pingFrequency,
opts?.pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
// we can't assert that the execution ids agree here, so we use max age hours as a "safe guard"
return this.exec.getResultCSV(jobID, {
// we can't assert that the execution ids agree here,
// so we use max age hours as a "safe guard"
return this.exec.getResultCSV(execution_id, {
query_parameters: params?.query_parameters,
});
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
const message = `refresh (execution ${execution_id}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
log.error(logPrefix, message);
throw new DuneError(message);
Expand All @@ -125,62 +105,55 @@ export class DuneClient {
/**
* Goes a bit beyond the internal call which returns that last execution results.
* Here contains additional logic to refresh the results if they are too old.
* @param queryId - query to get results of.
* @param parameters - parameters for which they were called.
* @param limit - the number of rows to retrieve
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
*
* @param {RunQueryArgs} args
* @returns Latest execution results for the given parameters.
*/
async getLatestResult(
queryId: number,
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<ResultsResponse> {
let results = await this.exec.getLastExecutionResults(queryId, {
query_parameters: parameters,
limit: batchSize,
});
const lastRun: Date = results.execution_ended_at!;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = await this.runQuery(queryId, { query_parameters: parameters }, batchSize);
async getLatestResult(args: RunQueryArgs): Promise<ResultsResponse> {
const { queryId, params, opts } = args;
const lastestResults = await this.exec.getLastExecutionResults(
queryId,
{
query_parameters: params?.query_parameters,
limit: opts?.batchSize,
},
opts?.maxAgeHours,
);
let results: ResultsResponse;
if (lastestResults.isExpired) {
log.info(logPrefix, `results expired, re-running query.`);
results = await this.runQuery({ queryId, params, opts });
} else {
results = lastestResults.results;
}
return results;
}

/**
* Get the lastest execution results in CSV format and saves to disk.
* @param queryId - query to get results of.
*
* @param {RunQueryArgs} args
* @param outFile - location to save CSV.
* @param parameters - parameters for which they were called.
* @param batchSize - the page size when retriving results.
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
* @returns Latest execution results for the given parameters.
*/
async downloadCSV(
queryId: number,
outFile: string,
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<void> {
const params = { query_parameters: parameters, limit: batchSize };
const lastResults = await this.exec.getLastExecutionResults(queryId, params);
const lastRun: Date = lastResults.execution_ended_at!;
async downloadCSV(args: RunQueryArgs, outFile: string): Promise<void> {
const { queryId, params, opts } = args;
const { isExpired } = await this.exec.getLastExecutionResults(
queryId,
{
query_parameters: params?.query_parameters,
limit: opts?.batchSize,
},
args.opts?.maxAgeHours,
);
let results: Promise<ExecutionResponseCSV>;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = this.runQueryCSV(queryId, { query_parameters: parameters }, batchSize);
if (isExpired) {
results = this.runQueryCSV(args);
} else {
// TODO (user cost savings): transform the lastResults into CSV instead of refetching
results = this.exec.getLastResultCSV(queryId, params);
results = this.exec.getLastResultCSV(args.queryId, {
query_parameters: args.params?.query_parameters,
limit: args.opts?.batchSize,
});
}
// Wait for the results promise to resolve and then write the CSV data to the specified outFile
const csvData = (await results).data;
Expand All @@ -193,59 +166,40 @@ export class DuneClient {
* - create, run, get results with optional archive/delete.
* - Query is by default made private and archived after execution.
* Requires Plus subscription!
* @param query_sql - raw sql of query to run
* @param params - query parameters
* @param isPrivate - whether the created query should be private
* @param archiveAfter - whether the created query should be archived immediately after execution
* @param performance - performance tier of execution engine
* @param batchSize - the page size when retriving results.
* @param pingFrequency - how frequently should we check execution status
* @param name - name of the query
* @returns
*
* @returns {Promise<ResultsResponse>}
*/
public async runSql(
query_sql: string,
params?: QueryParameter[],
isPrivate: boolean = true,
archiveAfter: boolean = true,
performance?: ExecutionPerformance,
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
name: string = "API Query",
): Promise<ResultsResponse> {
const queryID = await this.query.createQuery({
name,
public async runSql(args: RunSqlArgs): Promise<ResultsResponse> {
const { name, query_sql, params, isPrivate, archiveAfter, opts } = args;
const queryId = await this.query.createQuery({
name: name ? name : "API Query",
query_sql,
query_parameters: params,
query_parameters: params?.query_parameters,
is_private: isPrivate,
});
let results: ResultsResponse;

try {
results = await this.runQuery(
queryID,
{ query_parameters: params, performance },
batchSize,
pingFrequency,
);
results = await this.runQuery({ queryId, params, opts });
} finally {
if (archiveAfter) {
this.query.archiveQuery(queryID);
this.query.archiveQuery(queryId);
}
}

return results;
}

/**
* Allows for anyone to upload a CSV as a table in Dune.
* The size limit per upload is currently 200MB.
* Allows for anyone to upload a CSV as a table in Dune.
* The size limit per upload is currently 200MB.
* Storage is limited by plan, 1MB on free, 15GB on plus, and 50GB on premium.
* @param params UploadCSVParams relevant fields related to dataset upload.
*
* @param args UploadCSVParams relevant fields related to dataset upload.
* @returns boolean representing if upload was successful.
*/
async uploadCsv(params: UploadCSVParams): Promise<boolean> {
const response = await this.exec.post<SuccessResponse>("table/upload/csv", params);
async uploadCsv(args: UploadCSVArgs): Promise<boolean> {
const response = await this.exec.post<SuccessResponse>("table/upload/csv", args);
try {
return Boolean(response.success);
} catch (err) {
Expand Down Expand Up @@ -283,8 +237,12 @@ export class DuneClient {
async refresh(
queryID: number,
parameters: QueryParameter[] = [],
pingFrequency: number = 1,
pingFrequency?: number,
): Promise<ResultsResponse> {
return this.runQuery(queryID, { query_parameters: parameters }, pingFrequency);
return this.runQuery({
queryId: queryID,
params: { query_parameters: parameters },
opts: { pingFrequency },
});
}
}
30 changes: 21 additions & 9 deletions src/api/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import {
concatResultResponse,
concatResultCSV,
SuccessResponse,
LatestResultsResponse,
} from "../types";
import log from "loglevel";
import { logPrefix } from "../utils";
import { ageInHours, logPrefix, withDefaults } from "../utils";
import { Router } from "./router";
import {
ExecutionParams,
Expand All @@ -20,6 +21,8 @@ import {
DEFAULT_GET_PARAMS,
DUNE_CSV_NEXT_OFFSET_HEADER,
DUNE_CSV_NEXT_URI_HEADER,
MAX_NUM_ROWS_PER_BATCH,
THREE_MONTHS_IN_HOURS,
} from "../constants";

/**
Expand Down Expand Up @@ -85,7 +88,7 @@ export class ExecutionAPI extends Router {
* Retrieve results of a query execution by executionID:
* https://docs.dune.com/api-reference/executions/endpoint/get-execution-result
* @param {string} executionId string representig ID of query execution
* @param {GetResultParams} params including limit, offset and expectedID.
* @param {GetResultParams} params including limit, offset
* @returns {ResultsResponse} response containing execution results.
*/
async getExecutionResults(
Expand All @@ -94,7 +97,7 @@ export class ExecutionAPI extends Router {
): Promise<ResultsResponse> {
const response: ResultsResponse = await this._get(
`execution/${executionId}/results`,
params,
withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }),
);
log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`);
return response as ResultsResponse;
Expand All @@ -104,7 +107,7 @@ export class ExecutionAPI extends Router {
* Retrieve results of a query execution (in CSV format) by executionID:
* https://docs.dune.com/api-reference/executions/endpoint/get-execution-result-csv
* @param {string} executionId string representig ID of query execution.
* @param {GetResultParams} params including limit, offset and expectedID.
* @param {GetResultParams} params including limit, offset
* @returns {ExecutionResponseCSV} execution results as CSV.
*/
async getResultCSV(
Expand All @@ -124,15 +127,24 @@ export class ExecutionAPI extends Router {
* Retrieves results from query's last execution
* @param {number} queryID id of query to get results for.
* @param {GetResultParams} params parameters for retrieval.
* @returns {ResultsResponse} response containing execution results.
* @param {number} expiryAgeHours What is considered to be an expired result set.
* @returns {LatestResultsResponse} response containing execution results and boolean field
*/
async getLastExecutionResults(
queryId: number,
params: GetResultParams = DEFAULT_GET_PARAMS,
): Promise<ResultsResponse> {
/// What is considered to be an expired result set.
expiryAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<LatestResultsResponse> {
// The first bit might only return a page.
const results = await this._get<ResultsResponse>(`query/${queryId}/results`, params);
return this._fetchEntireResult(results);
const results = await this._get<ResultsResponse>(
`query/${queryId}/results`,
withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }),
);
const lastRun: Date = results.execution_ended_at!;
const maxAge = expiryAgeHours;
const isExpired = lastRun !== undefined && ageInHours(lastRun) > maxAge;
return { results: await this._fetchEntireResult(results), isExpired };
}

/**
Expand All @@ -147,7 +159,7 @@ export class ExecutionAPI extends Router {
): Promise<ExecutionResponseCSV> {
const response = await this._get<Response>(
`query/${queryId}/results/csv`,
params,
withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }),
true,
);
return this._fetchEntireResultCSV(await this.buildCSVResponse(response));
Expand Down
Loading

0 comments on commit 6e3b5dd

Please sign in to comment.