Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: query composition #1778

Merged
merged 7 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions lib/logflare/alerting.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Logflare.Alerting do
alias Logflare.Backends.Adaptor.SlackAdaptor
alias Logflare.Alerting.AlertQuery
alias Logflare.User
alias Logflare.Endpoints

@doc """
Returns the list of alert_queries.
Expand All @@ -21,7 +22,12 @@ defmodule Logflare.Alerting do
[%AlertQuery{}, ...]

"""

def list_alert_queries(%User{id: user_id}) do
list_alert_queries_by_user_id(user_id)
end

def list_alert_queries_by_user_id(user_id) do
from(q in AlertQuery, where: q.user_id == ^user_id)
|> Repo.all()
end
Expand Down Expand Up @@ -228,8 +234,20 @@ defmodule Logflare.Alerting do
def execute_alert_query(%AlertQuery{user: %User{}} = alert_query) do
Logger.info("Executing AlertQuery | #{alert_query.name} | #{alert_query.id}")

with {:ok, transformed_query} <-
Logflare.Sql.transform(:bq_sql, alert_query.query, alert_query.user_id),
endpoints = Endpoints.list_endpoints_by(user_id: alert_query.user_id)

alerts =
list_alert_queries_by_user_id(alert_query.user_id)
|> Enum.filter(&(&1.id != alert_query.id))

with {:ok, expanded_query} <-
Logflare.Sql.expand_subqueries(
alert_query.language,
alert_query.query,
endpoints ++ alerts
),
{:ok, transformed_query} <-
Logflare.Sql.transform(alert_query.language, expanded_query, alert_query.user_id),
{:ok, %{rows: rows}} <-
Logflare.BqRepo.query_with_sql_and_params(
alert_query.user,
Expand Down
16 changes: 14 additions & 2 deletions lib/logflare/endpoints.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Logflare.Endpoints do
alias Logflare.Backends
alias Logflare.Backends.Adaptor.PostgresAdaptor
alias Logflare.SingleTenant
alias Logflare.Alerting

import Ecto.Query
@typep run_query_return :: {:ok, %{rows: [map()]}} | {:error, String.t()}
Expand Down Expand Up @@ -159,10 +160,21 @@ defmodule Logflare.Endpoints do
%Query{query: query_string, user_id: user_id, sandboxable: sandboxable} = endpoint_query
sql_param = Map.get(params, "sql")

transform_input =
if(sandboxable && sql_param, do: {query_string, sql_param}, else: query_string)
endpoints =
list_endpoints_by(user_id: endpoint_query.user_id)
|> Enum.filter(&(&1.id != endpoint_query.id))

alerts = Alerting.list_alert_queries_by_user_id(endpoint_query.user_id)

with {:ok, declared_params} <- Logflare.Sql.parameters(query_string),
{:ok, expanded_query} <-
Logflare.Sql.expand_subqueries(
endpoint_query.language,
query_string,
endpoints ++ alerts
),
transform_input =
if(sandboxable && sql_param, do: {expanded_query, sql_param}, else: expanded_query),
{:ok, transformed_query} <-
Logflare.Sql.transform(endpoint_query.language, transform_input, user_id) do
{endpoint, query_string} =
Expand Down
27 changes: 26 additions & 1 deletion lib/logflare/endpoints/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Logflare.Endpoints.Query do
require Logger

alias Logflare.Endpoints.Query
alias Logflare.Endpoints
alias Logflare.Alerting

@derive {Jason.Encoder,
only: [
Expand Down Expand Up @@ -117,9 +119,32 @@ defmodule Logflare.Endpoints.Query do

def validate_query(changeset, field) when is_atom(field) do
language = Ecto.Changeset.get_field(changeset, :language, :bq_sql)
user = get_field(changeset, :user)
endpoint_name = get_field(changeset, :name)

# TODO abstract out to separate Query context

queries =
if user do
endpoints =
Endpoints.list_endpoints_by(user_id: user.id)
|> Enum.filter(&(&1.id != endpoint_name))

alerts = Alerting.list_alert_queries_by_user_id(user.id)
endpoints ++ alerts
else
[]
end

validate_change(changeset, field, fn field, value ->
case Logflare.Sql.transform(language, value, get_field(changeset, :user)) do
{:ok, expanded_query} =
Logflare.Sql.expand_subqueries(
language,
value,
queries
)

case Logflare.Sql.transform(language, expanded_query, user) do
{:ok, _} -> []
{:error, error} -> [{field, error}]
end
Expand Down
76 changes: 75 additions & 1 deletion lib/logflare/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,81 @@ defmodule Logflare.Sql do
alias Logflare.SingleTenant
alias Logflare.Sql.Parser
alias Logflare.Backends.Adaptor.PostgresAdaptor
alias Logflare.Endpoints
alias Logflare.Alerts.Alert

@type language :: :pg_sql | :bq_sql

@doc """
Expands entity names that match query names into a subquery
"""
@spec expand_subqueries(language(), String.t(), [Alert.t() | Endpoints.Query.t()]) ::
{:ok, String.t()}
def expand_subqueries(_language, input, []), do: {:ok, input}

def expand_subqueries(language, input, queries)
when is_atom(language) and is_list(queries) and is_binary(input) do
parser_dialect =
case language do
:bq_sql -> "bigquery"
:pg_sql -> "postgres"
end

with {:ok, statements} <- Parser.parse(parser_dialect, input),
eligible_queries <- Enum.filter(queries, &(&1.language == language)) do
statements
|> replace_names_with_subqueries(%{
language: language,
queries: eligible_queries
})
|> Parser.to_string()
end
end

defp replace_names_with_subqueries(
{"relation" = k,
%{"Table" => %{"alias" => table_alias, "name" => [%{"value" => table_name}]}} = v},
data
) do
query = Enum.find(data.queries, &(&1.name == table_name))

if query do
parser_language =
case data.language do
:pg_sql -> "postgres"
:bq_sql -> "bigquery"
end

{:ok, [%{"Query" => body}]} = Parser.parse(parser_language, query.query)

{k,
%{
"Derived" => %{
"alias" => table_alias,
"lateral" => false,
"subquery" => body
}
}}
else
{k, v}
end
end

defp replace_names_with_subqueries({k, v}, data) when is_list(v) or is_map(v) do
{k, replace_names_with_subqueries(v, data)}
end

defp replace_names_with_subqueries(kv, data) when is_list(kv) do
Enum.map(kv, fn kv -> replace_names_with_subqueries(kv, data) end)
end

defp replace_names_with_subqueries(kv, data) when is_map(kv) do
Map.new(kv, &replace_names_with_subqueries(&1, data))
end

defp replace_names_with_subqueries(kv, _data), do: kv

# replaces all table names with subqueries

@doc """
Transforms and validates an SQL query for querying with bigquery.any()
Expand All @@ -29,7 +104,6 @@ defmodule Logflare.Sql do
{:ok, "..."}
"""
@typep input :: String.t() | {String.t(), String.t()}
@typep language :: :pg_sql | :bq_sql
@spec transform(language(), input(), User.t() | pos_integer()) :: {:ok, String.t()}
def transform(lang, input, user_id) when is_integer(user_id) do
user = Logflare.Users.get(user_id)
Expand Down
21 changes: 21 additions & 0 deletions test/logflare/alerting_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ defmodule Logflare.AlertingTest do
assert {:ok, [%{"testing" => "123"}]} = Alerting.execute_alert_query(alert_query)
end

test "execute_alert_query with query composition" do
expect(GoogleApi.BigQuery.V2.Api.Jobs, :bigquery_jobs_query, 1, fn _conn, _proj_id, opts ->
assert opts[:body].query =~ "current_datetime"
{:ok, TestUtils.gen_bq_response([%{"testing" => "123"}])}
end)

user = insert(:user)

insert(:endpoint,
user: user,
name: "my.date",
query: "select current_datetime() as testing"
)

alert_query =
insert(:alert, user: user, query: "select testing from `my.date`")
|> Logflare.Repo.preload([:user])

assert {:ok, [%{"testing" => "123"}]} = Alerting.execute_alert_query(alert_query)
end

test "run_alert_query/1 runs the entire alert", %{user: user} do
alert_query = insert(:alert, user: user)

Expand Down
40 changes: 40 additions & 0 deletions test/logflare/endpoints_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ defmodule Logflare.EndpointsTest do
assert stored_sql =~ "myproject"
end

test "create an endpoint query with query composition" do
insert(:plan)
user = insert(:user)

insert(:endpoint,
user: user,
name: "my.date",
query: "select current_datetime() as testing"
)

assert {:ok, %_{query: stored_sql, source_mapping: mapping}} =
Endpoints.create_query(user, %{
name: "fully-qualified.name",
query: "select testing from `my.date`",
language: :bq_sql
})

assert mapping == %{}
assert stored_sql =~ "my.date"
end

describe "running queries in bigquery backends" do
setup do
# mock goth behaviour
Expand All @@ -138,6 +159,25 @@ defmodule Logflare.EndpointsTest do
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_query(endpoint)
end

test "run an endpoint query with query composition" do
expect(GoogleApi.BigQuery.V2.Api.Jobs, :bigquery_jobs_query, 1, fn _conn, _proj_id, opts ->
assert opts[:body].query =~ "current_datetime"
{:ok, TestUtils.gen_bq_response([%{"testing" => "123"}])}
end)

insert(:plan)
user = insert(:user)

insert(:endpoint,
user: user,
name: "my.date",
query: "select current_datetime() as testing"
)

endpoint2 = insert(:endpoint, user: user, query: "select testing from `my.date`")
assert {:ok, %{rows: [%{"testing" => _}]}} = Endpoints.run_query(endpoint2)
end

test "run_query_string/3" do
expect(GoogleApi.BigQuery.V2.Api.Jobs, :bigquery_jobs_query, 1, fn _conn, _proj_id, _opts ->
{:ok, TestUtils.gen_bq_response([%{"testing" => "123"}])}
Expand Down
36 changes: 36 additions & 0 deletions test/logflare/sql_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,42 @@ defmodule Logflare.SqlTest do
"`#{project_id}.#{dataset_id}.#{token}`"
end

test "expand_subqueries/2 for :bq_sql will expand an alert/endpoint query into a subquery" do
alert = build(:alert, name: "my.alert", query: "select 'id' as id", language: :bq_sql)

endpoint =
build(:endpoint, name: "my.endpoint", query: "select 'val' as val", language: :bq_sql)

assert {:ok, result} =
Sql.expand_subqueries(:bq_sql, "select test from `my.alert` as tester", [alert])

assert String.downcase(result) =~ "from (select 'id' as id) as tester"

assert {:ok, result} =
Sql.expand_subqueries(:bq_sql, "select test from `my.endpoint` as tester", [endpoint])

assert String.downcase(result) =~ "from (select 'val' as val) as tester"
end

test "expand_subqueries/2 for :pg_sql will expand an alert/endpoint query into a subquery" do
alert = build(:alert, name: "my.alert", query: "select 'id' as id", language: :pg_sql)

endpoint =
build(:endpoint, name: "my.endpoint", query: "select 'val' as val", language: :pg_sql)

assert {:ok, result} =
Sql.expand_subqueries(:pg_sql, ~s(select test from "my.alert" as tester), [alert])

assert String.downcase(result) =~ "from (select 'id' as id) as tester"

assert {:ok, result} =
Sql.expand_subqueries(:pg_sql, ~s(select test from "my.endpoint" as tester), [
endpoint
])

assert String.downcase(result) =~ "from (select 'val' as val) as tester"
end

describe "transform/3 for :postgres backends" do
setup do
user = insert(:user)
Expand Down
Loading