Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: single cainophile per cluster #2145

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.7.9
1.7.10
7 changes: 5 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ config :ueberauth, Ueberauth,

config :phoenix, :json_library, Jason
config :postgrex, :json_library, Jason
config :syn, scopes: [:endpoints]

config :syn,
scopes: [:endpoints, :context_cache],
event_handler: Logflare.SynEventHandler

oauth_common = [
repo: Logflare.Repo,
Expand Down Expand Up @@ -109,7 +112,7 @@ config :scrivener_html,
# If you use a single view style everywhere, you can configure it here. See View Styles below for more info.
view_style: :bootstrap_v4

config :logflare, Logflare.CacheBuster,
config :logflare, Logflare.ContextCache.CacheBuster,
replication_slot: :temporary,
publications: ["logflare_pub"],
# remember to add an ALTER PUBLICATION ... migration when changing published tables!
Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ config :logflare, Logflare.Repo,
password: "postgres",
database: "logflare_test",
hostname: "localhost",
port: 5432,
pool_size: 10,
pool: Ecto.Adapters.SQL.Sandbox

Expand Down
55 changes: 2 additions & 53 deletions lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,9 @@ defmodule Logflare.Application do
use Application
require Logger

alias Logflare.Billing
alias Logflare.ContextCache
alias Logflare.Backends
alias Logflare.Logs
alias Logflare.SingleTenant
alias Logflare.Sources
alias Logflare.SourceSchemas
alias Logflare.Users
alias Logflare.TeamUsers
alias Logflare.Partners
alias Logflare.Auth
alias Logflare.SystemMetricsSup
alias Logflare.Sources.Counters
alias Logflare.Sources.RateCounters
Expand Down Expand Up @@ -47,16 +39,8 @@ defmodule Logflare.Application do
[
Counters,
RateCounters,
ContextCache,
TeamUsers.Cache,
Users.Cache,
Sources.Cache,
Partners.Cache,
Backends.Cache,
Billing.Cache,
SourceSchemas.Cache,
Auth.Cache,
Logs.LogEvents.Cache,
ContextCache.Supervisor,
{Phoenix.PubSub, name: Logflare.PubSub},
PubSubRates,
Logs.RejectedLogEvents,
Expand All @@ -79,15 +63,6 @@ defmodule Logflare.Application do
end

defp get_children(_) do
# Database options for Postgres notifications
hostname = ~c"#{Application.get_env(:logflare, Logflare.Repo)[:hostname]}"
username = Application.get_env(:logflare, Logflare.Repo)[:username]
password = Application.get_env(:logflare, Logflare.Repo)[:password]
database = Application.get_env(:logflare, Logflare.Repo)[:database]

port = Application.get_env(:logflare, Logflare.Repo)[:port]
slot = Application.get_env(:logflare, Logflare.CacheBuster)[:replication_slot]
publications = Application.get_env(:logflare, Logflare.CacheBuster)[:publications]
topologies = Application.get_env(:libcluster, :topologies, [])
grpc_port = Application.get_env(:grpc, :port)
ssl = Application.get_env(:logflare, :ssl)
Expand All @@ -103,36 +78,10 @@ defmodule Logflare.Application do
{Cluster.Supervisor, [topologies, [name: Logflare.ClusterSupervisor]]},
Logflare.Repo,
{Phoenix.PubSub, name: Logflare.PubSub, pool_size: pool_size},
# Context Caches
TeamUsers.Cache,
ContextCache,
Partners.Cache,
Users.Cache,
Backends.Cache,
Sources.Cache,
Billing.Cache,
SourceSchemas.Cache,
Auth.Cache,
Logs.LogEvents.Cache,
PubSubRates,
ContextCache.Supervisor,

# Follow Postgresql replication log and bust all our context caches
{
Cainophile.Adapters.Postgres,
register: Logflare.PgPublisher,
epgsql: %{
host: hostname,
port: port,
username: username,
database: database,
password: password
},
slot: slot,
wal_position: {"0", "0"},
publications: publications
},
Logflare.CacheBuster,
# Sources
# v1 ingest pipline
{Registry,
name: Logflare.V1SourceRegistry,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Logflare.CacheBuster do
defmodule Logflare.ContextCache.CacheBuster do
@moduledoc """
Monitors our Postgres replication log and busts the cache accordingly.
"""
Expand All @@ -15,16 +15,18 @@ defmodule Logflare.CacheBuster do
end

def init(state) do
Logger.put_process_level(self(), :error)

Cainophile.Adapters.Postgres.subscribe(Logflare.PgPublisher, self())
subscribe_to_transactions()
{:ok, state}
end

def subscribe_to_transactions do
Phoenix.PubSub.subscribe(Logflare.PubSub, "wal_transactions")
end

@doc """
Sets the Logger level for this process. It's started with level :error.

To debug wal records set process to level :info and each transaction will be logged.
To debug wal records set process to level :debug and each transaction will be logged.
"""

@spec set_log_level(Logger.levels()) :: :ok
Expand All @@ -38,10 +40,8 @@ defmodule Logflare.CacheBuster do
{:reply, :ok, state}
end

def handle_info(%Transaction{changes: []}, state), do: {:noreply, state}

def handle_info(%Transaction{changes: changes} = transaction, state) do
Logger.info("WAL record received: #{inspect(transaction)}")
Logger.debug("WAL record received from pubsub: #{inspect(transaction)}")

for record <- changes,
record = handle_record(record),
Expand Down
111 changes: 111 additions & 0 deletions lib/logflare/context_cache/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule Logflare.ContextCache.Supervisor do
@moduledoc false

use Supervisor

alias Logflare.Backends
alias Logflare.ContextCache.CacheBuster
alias Logflare.Billing
alias Logflare.ContextCache
alias Logflare.Backends
alias Logflare.Sources
alias Logflare.SourceSchemas
alias Logflare.Users
alias Logflare.TeamUsers
alias Logflare.Partners
alias Logflare.Auth

alias Logflare.Repo

def start_link(_) do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

@env Application.compile_env(:logflare, :env)

@impl Supervisor
def init(_) do
res = Supervisor.init(get_children(@env), strategy: :one_for_one)
res
end

defp get_children(:test) do
[
ContextCache,
TeamUsers.Cache,
Partners.Cache,
Users.Cache,
Backends.Cache,
Sources.Cache,
Billing.Cache,
SourceSchemas.Cache,
Auth.Cache
]
end

defp get_children(_) do
get_children(:test) ++
[
ContextCache.TransactionBroadcaster,
ContextCache.CacheBuster
]
end

@doc """
Returns the publisher :via name used for syn registry.
"""
def publisher_name do
{:via, :syn, {:context_cache, Logflare.PgPublisher}}
# {:global, Logflare.PgPublisher}
end

@doc """
Attempts to start a cainophile child in the ContextCache.Supervisor.
If it already exists, it will return with an error tuple.
"""
def maybe_start_cainophile do
spec = cainophile_child_spec()
Supervisor.which_children(__MODULE__)
Supervisor.start_child(__MODULE__, spec)
end

def remove_cainophile do
Supervisor.terminate_child(__MODULE__, Cainophile.Adapters.Postgres)
Supervisor.delete_child(__MODULE__, Cainophile.Adapters.Postgres)
:ok
end

defp cainophile_child_spec do
hostname = ~c"#{Application.get_env(:logflare, Repo)[:hostname]}"
username = Application.get_env(:logflare, Repo)[:username]
password = Application.get_env(:logflare, Repo)[:password]
database = Application.get_env(:logflare, Repo)[:database]
port = Application.get_env(:logflare, Repo)[:port]

slot = Application.get_env(:logflare, CacheBuster)[:replication_slot]
publications = Application.get_env(:logflare, CacheBuster)[:publications]

%{
id: Cainophile.Adapters.Postgres,
restart: :permanent,
type: :worker,
start:
{Cainophile.Adapters.Postgres, :start_link,
[
[
register: publisher_name(),
epgsql: %{
host: hostname,
port: port,
username: username,
database: database,
password: password
},
slot: slot,
wal_position: {"0", "0"},
publications: publications
]
]}
}
end
end
76 changes: 76 additions & 0 deletions lib/logflare/context_cache/transaction_broadcaster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Logflare.ContextCache.TransactionBroadcaster do
@moduledoc """
Subscribes to cainophile and broadcasts all transactions
"""
use GenServer

require Logger

alias Logflare.ContextCache
alias Cainophile.Changes.Transaction

def start_link(init_args) do
GenServer.start_link(__MODULE__, init_args, name: __MODULE__)
end

def init(args) do
state = %{interval: Keyword.get(args, :interval, 5_000)}
Process.send_after(self(), :try_subscribe, min(state.interval, 1_000))
{:ok, state}
end

@doc """
Sets the Logger level for this process. It's started with level :error.

To debug wal records set process to level :debug and each transaction will be logged.
"""

@spec set_log_level(Logger.levels()) :: :ok
def set_log_level(level) when is_atom(level) do
GenServer.call(__MODULE__, {:put_level, level})
end

def handle_call({:put_level, level}, _from, state) do
:ok = Logger.put_process_level(self(), level)

{:reply, :ok, state}
end

def handle_info(:try_subscribe, state) do
pid = self()
attempt_subscribe(pid)

Process.send_after(self(), :try_subscribe, state.interval)
{:noreply, state}
end

def handle_info(%Transaction{changes: []}, state), do: {:noreply, state |> dbg()}

def handle_info(%Transaction{changes: _changes} = transaction, state) do
Logger.debug("WAL record received from cainophile: #{inspect(transaction)}")
# broadcast it
Phoenix.PubSub.broadcast(Logflare.PubSub, "wal_transactions", transaction)
{:noreply, state}
end

defp attempt_subscribe(pid) do
try do
ContextCache.Supervisor.maybe_start_cainophile()
|> case do
{:ok, pid} ->
Logger.info(
"Successfully started cainophile on #{inspect(Node.self())}, pid: #{inspect(pid)}"
)

{:error, _} ->
:noop
end

ContextCache.Supervisor.publisher_name()
|> Cainophile.Adapters.Postgres.subscribe(pid)
rescue
e ->
Logger.warning("Could not subscribe to Cainophile, #{inspect(e)}")
end
end
end
19 changes: 19 additions & 0 deletions lib/logflare/syn_event_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Logflare.SynEventHandler do
@moduledoc """
Event handler for syn

Always keep oldest proces.
"""

@behaviour :syn_event_handler

require Logger
@impl true
def resolve_registry_conflict(scope, name, pid1, pid2) do
Logger.warning(
"Resolving registry conflict for #{scope}, #{inspect(name)}, #{inspect(pid1)} and #{inspect(pid2)}. Keeping #{inspect(pid1)}"
)

pid1
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"test.only": :test,
"test.format": :test,
"test.compile": :test,
"test.security": :test,

Check warning on line 20 in mix.exs

View workflow job for this annotation

GitHub Actions / Build and test

~R/.../ is deprecated, use ~r/.../ instead

Check warning on line 20 in mix.exs

View workflow job for this annotation

GitHub Actions / Build and test

~R/.../ is deprecated, use ~r/.../ instead
"test.typings": :test,
coveralls: :test,
"coveralls.detail": :test,
Expand Down Expand Up @@ -193,7 +193,7 @@
{:contex, "~> 0.3.0"},

# Postgres Subscribe
{:cainophile, github: "Logflare/cainophile"},
{:cainophile, github: "Logflare/cainophile", ref: "c310d69"},
{:open_api_spex, "~> 3.16"},
{:grpc, "~> 0.5.0"},
{:protobuf, "~> 0.10"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"broadway": {:git, "https://github.com/Logflare/broadway.git", "092f4ab5891d37be4fb84f4ad91a329c05419db8", [ref: "092f4ab"]},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"},
"cainophile": {:git, "https://github.com/Logflare/cainophile.git", "ea2e63639b3ccd6db66069a57e31341bdb863175", []},
"cainophile": {:git, "https://github.com/Logflare/cainophile.git", "c310d69d753a1d05c78237d2ab4d234b6d4b3bfb", [ref: "c310d69"]},
"castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"},
"certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"},
"chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"},
Expand Down
Loading
Loading