Skip to content

Commit

Permalink
Merge branch 'main' into tv/fix-ipv6-and-listening
Browse files Browse the repository at this point in the history
  • Loading branch information
chasers authored Oct 21, 2024
2 parents bc0e375 + 40db716 commit 12593bb
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 34 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.8.11
1.8.12
2 changes: 1 addition & 1 deletion cloudbuild/staging/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ substitutions:
_CLUSTER: main
_COOKIE: default-${_CLUSTER}
_NORMALIZED_IMAGE_TAG: ${_IMAGE_TAG}
_INSTANCE_TYPE: c2d-highcpu-2
_INSTANCE_TYPE: c2d-highcpu-4
_INSTANCE_GROUP: instance-group-staging-${_CLUSTER}
_IMAGE_TAG: $SHORT_SHA
_TEMPLATE_NAME: logflare-staging-${_CLUSTER}-cluster-${_NORMALIZED_IMAGE_TAG}
Expand Down
8 changes: 2 additions & 6 deletions lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ defmodule Logflare.Application do
name: Logflare.V1SourceRegistry,
keys: :unique,
partitions: max(round(System.schedulers_online() / 8), 1)},
{Task.Supervisor, name: Logflare.TaskSupervisor},
{PartitionSupervisor, child_spec: Task.Supervisor, name: Logflare.TaskSupervisors},
{DynamicSupervisor, strategy: :one_for_one, name: Logflare.Endpoints.Cache},
{DynamicSupervisor,
strategy: :one_for_one,
Expand All @@ -75,11 +75,7 @@ defmodule Logflare.Application do
conditional_children() ++
[
Logflare.ErlSysMon,
{Task.Supervisor,
name: Logflare.TaskSupervisor,
spawn_opt: [
fullsweep_after: 1_000
]},
{PartitionSupervisor, child_spec: Task.Supervisor, name: Logflare.TaskSupervisors},
{Cluster.Supervisor, [topologies, [name: Logflare.ClusterSupervisor]]},
Logflare.Repo,
Logflare.Vault,
Expand Down
22 changes: 10 additions & 12 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Logflare.Backends do
@moduledoc false

alias Logflare.Utils.Tasks
alias Logflare.Backends.Adaptor
alias Logflare.Backends.Backend
alias Logflare.Backends.SourceRegistry
Expand All @@ -18,7 +19,6 @@ defmodule Logflare.Backends do
alias Logflare.SystemMetrics
alias Logflare.PubSubRates
alias Logflare.Cluster
alias Logflare.TaskSupervisor
alias Logflare.Source.RecentLogsServer
import Ecto.Query

Expand Down Expand Up @@ -322,13 +322,11 @@ defmodule Logflare.Backends do
end

defp maybe_broadcast_and_route(source, log_events) do
Logflare.Utils.Tasks.start_child(fn ->
if source.metrics.avg < 5 do
Source.ChannelTopics.broadcast_new(log_events)
end
if source.metrics.avg < 5 do
Source.ChannelTopics.broadcast_new(log_events)
end

SourceRouting.route_to_sinks_and_ingest(log_events)
end)
SourceRouting.route_to_sinks_and_ingest(log_events)

:ok
end
Expand Down Expand Up @@ -517,16 +515,16 @@ defmodule Logflare.Backends do
nodes = Cluster.Utils.node_list_all()

task =
Task.async(fn ->
Tasks.async(fn ->
nodes
|> Enum.map(
&Task.Supervisor.async({TaskSupervisor, &1}, __MODULE__, :list_recent_logs_local, [
source
])
&Tasks.async(fn ->
:erpc.call(&1, __MODULE__, :list_recent_logs_local, [source], 10_000)
end)
)
|> Task.yield_many()
|> Enum.map(fn {%Task{pid: pid}, res} ->
res || Task.Supervisor.terminate_child(TaskSupervisor, pid)
res || Task.shutdown(pid)
end)
end)

Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/google/bigquery/bigquery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Logflare.Google.BigQuery do
alias Logflare.Billing.Plan
alias Logflare.TeamUsers
alias Logflare.Source.BigQuery.SchemaBuilder
alias Logflare.Utils.Tasks

@type ok_err_tup :: {:ok, term} | {:error, term}

Expand Down Expand Up @@ -285,7 +286,7 @@ defmodule Logflare.Google.BigQuery do
for x <- [user | team_users], x.provider == "google", do: x.email

if Enum.count(user.sources) > 0 do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn ->
Tasks.start_child(fn ->
patch(dataset_id, emails, project_id, user.id)
end)

Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/google/resource_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Logflare.Google.CloudResourceManager do
alias Logflare.User
alias Logflare.TeamUsers
alias Logflare.Billing
alias Logflare.Utils.Tasks

def list_projects() do
conn = GenUtils.get_conn()
Expand All @@ -32,7 +33,7 @@ defmodule Logflare.Google.CloudResourceManager do
def set_iam_policy(opts \\ [async: true])

def set_iam_policy(async: true) do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn -> set_iam_policy(async: false) end)
Tasks.start_child(fn -> set_iam_policy(async: false) end)
end

def set_iam_policy(async: false) do
Expand Down
7 changes: 4 additions & 3 deletions lib/logflare/source/email_notification_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Logflare.Source.EmailNotificationServer do
alias Logflare.AccountEmail
alias Logflare.Mailer
alias Logflare.Backends
alias Logflare.Utils.Tasks

def start_link(args) do
source = Keyword.get(args, :source)
Expand Down Expand Up @@ -40,7 +41,7 @@ defmodule Logflare.Source.EmailNotificationServer do
user = Users.Cache.get_by(id: source.user_id)

if source.notifications.user_email_notifications do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn ->
Tasks.start_child(fn ->
AccountEmail.source_notification(user, rate, source) |> Mailer.deliver()
end)
end
Expand All @@ -51,7 +52,7 @@ defmodule Logflare.Source.EmailNotificationServer do
other_emails = String.split(stranger_emails, ",")

for email <- other_emails do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn ->
Tasks.start_child(fn ->
AccountEmail.source_notification_for_others(String.trim(email), rate, source)
|> Mailer.deliver()
end)
Expand All @@ -63,7 +64,7 @@ defmodule Logflare.Source.EmailNotificationServer do
team_user = TeamUsers.Cache.get_team_user(x)

if team_user do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn ->
Tasks.start_child(fn ->
AccountEmail.source_notification(team_user, rate, source) |> Mailer.deliver()
end)
end
Expand Down
5 changes: 3 additions & 2 deletions lib/logflare/source/text_notification_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Logflare.Source.TextNotificationServer do
alias LogflareWeb.Router.Helpers, as: Routes
alias LogflareWeb.Endpoint
alias Logflare.Backends
alias Logflare.Utils.Tasks

@twilio_phone "+16026006731"

Expand Down Expand Up @@ -50,7 +51,7 @@ defmodule Logflare.Source.TextNotificationServer do
body = "#{source.name} has #{rate} new event(s). See: #{source_link} "

if source.notifications.user_text_notifications == true do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn ->
Tasks.start_child(fn ->
ExTwilio.Message.create(to: user.phone, from: @twilio_phone, body: body)
end)
end
Expand All @@ -61,7 +62,7 @@ defmodule Logflare.Source.TextNotificationServer do
body = "#{source.name} has #{rate} new event(s). See: #{source_link} "

if team_user do
Task.Supervisor.start_child(Logflare.TaskSupervisor, fn ->
Tasks.start_child(fn ->
ExTwilio.Message.create(to: team_user.phone, from: @twilio_phone, body: body)
end)
end
Expand Down
58 changes: 58 additions & 0 deletions lib/logflare/source_schemas.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Logflare.SourceSchemas do

alias Logflare.Repo
alias Logflare.SourceSchemas.SourceSchema
alias Logflare.Google.BigQuery.SchemaUtils

require Logger

Expand Down Expand Up @@ -101,4 +102,61 @@ defmodule Logflare.SourceSchemas do
def change_source_schema(%SourceSchema{} = source_schema, attrs \\ %{}) do
SourceSchema.changeset(source_schema, attrs)
end

def format_schema(bq_schema, variant, to_merge \\ %{})

def format_schema(%SourceSchema{bigquery_schema: bq_schema}, :dot, to_merge) do
bq_schema
|> SchemaUtils.bq_schema_to_flat_typemap()
|> Enum.filter(fn
{_k, :map} -> false
_ -> true
end)
|> Enum.map(fn
{k, {:list, type}} -> {k, "#{type}[]"}
{k, v} -> {k, Atom.to_string(v)}
end)
|> Map.new()
|> Map.merge(to_merge)
end

def format_schema(%SourceSchema{bigquery_schema: bq_schema}, :json_schema, to_merge) do
bq_schema
|> SchemaUtils.to_typemap()
|> typemap_to_json_schema()
|> Map.merge(to_merge)
|> Map.put(
"$schema",
"https://json-schema.org/draft/2020-12/schema"
)
end

defp typemap_to_json_schema(map) when is_map(map) do
properties = Enum.map(map, &typemap_to_json_schema/1) |> Map.new()

%{
"properties" => properties,
"type" => "object"
}
end

defp typemap_to_json_schema({key, %{fields: fields, t: :map}}) do
{Atom.to_string(key), typemap_to_json_schema(fields)}
end

defp typemap_to_json_schema({key, %{t: {:list, type}}}) do
{Atom.to_string(key), %{"type" => "array", "items" => %{"type" => Atom.to_string(type)}}}
end

defp typemap_to_json_schema({key, %{t: :datetime}}),
do: {Atom.to_string(key), %{"type" => "number"}}

defp typemap_to_json_schema({key, %{t: :integer}}),
do: {Atom.to_string(key), %{"type" => "number"}}

defp typemap_to_json_schema({key, %{t: :float}}),
do: {Atom.to_string(key), %{"type" => "number"}}

defp typemap_to_json_schema({key, %{t: type}}),
do: {Atom.to_string(key), %{"type" => Atom.to_string(type)}}
end
22 changes: 16 additions & 6 deletions lib/logflare/utils/tasks.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Logflare.Utils.Tasks do
@moduledoc """
Utility functions for spawning supervised tasks with `Logflare.TaskSupervisor`
Utility functions for spawning supervised tasks with `Logflare.TaskSupervisors`
https://hexdocs.pm/elixir/1.14/Task.Supervisor.html
Expand All @@ -10,23 +10,33 @@ defmodule Logflare.Utils.Tasks do
Linked to caller, linked to supervisor
"""
def async(func, opts \\ []) do
Task.Supervisor.async(Logflare.TaskSupervisor, func, opts)
Task.Supervisor.async(
{:via, PartitionSupervisor, {Logflare.TaskSupervisors, self()}},
func,
opts
)
end

@doc """
Not linked to caller, only to supervisor.
"""
def start_child(func, opts \\ []) do
Task.Supervisor.start_child(Logflare.TaskSupervisor, func, opts)
Task.Supervisor.start_child(
{:via, PartitionSupervisor, {Logflare.TaskSupervisors, self()}},
func,
opts
)
end

@doc """
Kills all tasks under the supervisor.
Used for test teardown, to prevent ecto sandbox checkout errors.
"""
def kill_all_tasks do
Logflare.TaskSupervisor
|> Task.Supervisor.children()
|> Enum.map(&Task.Supervisor.terminate_child(Logflare.TaskSupervisor, &1))
Logflare.TaskSupervisors
|> PartitionSupervisor.which_children()
|> Enum.map(fn {_, pid, _, _} ->
pid |> Task.Supervisor.children() |> Enum.map(&Task.Supervisor.terminate_child(pid, &1))
end)
end
end
30 changes: 29 additions & 1 deletion lib/logflare_web/controllers/api/source_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule LogflareWeb.Api.SourceController do
use OpenApiSpex.ControllerSpecs

alias Logflare.Sources
alias Logflare.SourceSchemas
alias Logflare.Backends
alias LogflareWeb.OpenApi.Accepted
alias LogflareWeb.OpenApi.Created
Expand All @@ -11,6 +12,7 @@ defmodule LogflareWeb.Api.SourceController do
alias LogflareWeb.OpenApiSchemas.Event

alias LogflareWeb.OpenApiSchemas.Source
alias LogflareWeb.OpenApiSchemas

action_fallback(LogflareWeb.Api.FallbackController)

Expand Down Expand Up @@ -151,7 +153,7 @@ defmodule LogflareWeb.Api.SourceController do
end
end

operation(:removebackend,
operation(:remove_backend,
summary: "Remove source backend",
parameters: [
source_token: [in: :path, description: "Source Token", type: :string],
Expand All @@ -173,4 +175,30 @@ defmodule LogflareWeb.Api.SourceController do
|> json(source)
end
end

operation(:show_schema,
summary: "Show source schema",
parameters: [token: [in: :path, description: "Source Token", type: :string]],
responses: %{
200 => OpenApiSchemas.SourceSchema.response(),
404 => NotFound.response()
}
)

def show_schema(%{assigns: %{user: user}} = conn, %{"source_token" => token} = params) do
with source when not is_nil(source) <- Sources.get_by(token: token, user_id: user.id),
schema = SourceSchemas.get_source_schema_by(source_id: source.id) do
data =
if Map.get(params, "variant") == "dot" do
SourceSchemas.format_schema(schema, :dot)
else
SourceSchemas.format_schema(schema, :json_schema, %{
:title => source.name,
:"$id" => ~p"/sources/#{source.token}/schema"
})
end

json(conn, data)
end
end
end
6 changes: 6 additions & 0 deletions lib/logflare_web/open_api_schemas.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ defmodule LogflareWeb.OpenApiSchemas do
use LogflareWeb.OpenApi, properties: @properties, required: [:name]
end

defmodule SourceSchema do
@properties %{}

use LogflareWeb.OpenApi, properties: @properties, required: []
end

defmodule RuleApiSchema do
@properties %{
id: %Schema{type: :integer},
Expand Down
1 change: 1 addition & 0 deletions lib/logflare_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ defmodule LogflareWeb.Router do
param: "token",
only: [:index, :show, :create, :update, :delete]
) do
get "/schema", Api.SourceController, :show_schema
get "/recent", Api.SourceController, :recent
post "/backends/:backend_token", Api.SourceController, :add_backend
delete "/backends/:backend_token", Api.SourceController, :remove_backend
Expand Down
Loading

0 comments on commit 12593bb

Please sign in to comment.