From c82bba1cd649f1a520990d5da53dbb8472bb277a Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 17 Jul 2024 10:03:35 +0800 Subject: [PATCH 1/6] chore: reorganize context cache modules to separate supervision tree --- config/config.exs | 2 +- lib/logflare/application.ex | 55 +------------- .../{ => context_cache}/cache_buster.ex | 2 +- lib/logflare/context_cache/supervisor.ex | 75 +++++++++++++++++++ .../20210729161959_subscribe_to_postgres.exs | 6 +- 5 files changed, 82 insertions(+), 58 deletions(-) rename lib/logflare/{ => context_cache}/cache_buster.ex (99%) create mode 100644 lib/logflare/context_cache/supervisor.ex diff --git a/config/config.exs b/config/config.exs index 351417f33..ce29e0633 100644 --- a/config/config.exs +++ b/config/config.exs @@ -109,7 +109,7 @@ config :scrivener_html, # If you use a single view style everywhere, you can configure it here. See View Styles below for more info. view_style: :bootstrap_v4 -config :logflare, Logflare.CacheBuster, +config :logflare, Logflare.ContextCache.CacheBuster, replication_slot: :temporary, publications: ["logflare_pub"], # remember to add an ALTER PUBLICATION ... migration when changing published tables! diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index 67e63e111..63e018251 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -3,17 +3,9 @@ defmodule Logflare.Application do use Application require Logger - alias Logflare.Billing alias Logflare.ContextCache - alias Logflare.Backends alias Logflare.Logs alias Logflare.SingleTenant - alias Logflare.Sources - alias Logflare.SourceSchemas - alias Logflare.Users - alias Logflare.TeamUsers - alias Logflare.Partners - alias Logflare.Auth alias Logflare.SystemMetricsSup alias Logflare.Sources.Counters alias Logflare.Sources.RateCounters @@ -47,16 +39,8 @@ defmodule Logflare.Application do [ Counters, RateCounters, - ContextCache, - TeamUsers.Cache, - Users.Cache, - Sources.Cache, - Partners.Cache, - Backends.Cache, - Billing.Cache, - SourceSchemas.Cache, - Auth.Cache, Logs.LogEvents.Cache, + ContextCache.Supervisor, {Phoenix.PubSub, name: Logflare.PubSub}, PubSubRates, Logs.RejectedLogEvents, @@ -79,15 +63,6 @@ defmodule Logflare.Application do end defp get_children(_) do - # Database options for Postgres notifications - hostname = ~c"#{Application.get_env(:logflare, Logflare.Repo)[:hostname]}" - username = Application.get_env(:logflare, Logflare.Repo)[:username] - password = Application.get_env(:logflare, Logflare.Repo)[:password] - database = Application.get_env(:logflare, Logflare.Repo)[:database] - - port = Application.get_env(:logflare, Logflare.Repo)[:port] - slot = Application.get_env(:logflare, Logflare.CacheBuster)[:replication_slot] - publications = Application.get_env(:logflare, Logflare.CacheBuster)[:publications] topologies = Application.get_env(:libcluster, :topologies, []) grpc_port = Application.get_env(:grpc, :port) ssl = Application.get_env(:logflare, :ssl) @@ -103,36 +78,10 @@ defmodule Logflare.Application do {Cluster.Supervisor, [topologies, [name: Logflare.ClusterSupervisor]]}, Logflare.Repo, {Phoenix.PubSub, name: Logflare.PubSub, pool_size: pool_size}, - # Context Caches - TeamUsers.Cache, - ContextCache, - Partners.Cache, - Users.Cache, - Backends.Cache, - Sources.Cache, - Billing.Cache, - SourceSchemas.Cache, - Auth.Cache, Logs.LogEvents.Cache, PubSubRates, + ContextCache.Supervisor, - # Follow Postgresql replication log and bust all our context caches - { - Cainophile.Adapters.Postgres, - register: Logflare.PgPublisher, - epgsql: %{ - host: hostname, - port: port, - username: username, - database: database, - password: password - }, - slot: slot, - wal_position: {"0", "0"}, - publications: publications - }, - Logflare.CacheBuster, - # Sources # v1 ingest pipline {Registry, name: Logflare.V1SourceRegistry, diff --git a/lib/logflare/cache_buster.ex b/lib/logflare/context_cache/cache_buster.ex similarity index 99% rename from lib/logflare/cache_buster.ex rename to lib/logflare/context_cache/cache_buster.ex index 4111c81fa..a983d8cc9 100644 --- a/lib/logflare/cache_buster.ex +++ b/lib/logflare/context_cache/cache_buster.ex @@ -1,4 +1,4 @@ -defmodule Logflare.CacheBuster do +defmodule Logflare.ContextCache.CacheBuster do @moduledoc """ Monitors our Postgres replication log and busts the cache accordingly. """ diff --git a/lib/logflare/context_cache/supervisor.ex b/lib/logflare/context_cache/supervisor.ex new file mode 100644 index 000000000..e74d90bbb --- /dev/null +++ b/lib/logflare/context_cache/supervisor.ex @@ -0,0 +1,75 @@ +defmodule Logflare.ContextCache.Supervisor do + @moduledoc false + + use Supervisor + + alias Logflare.Backends + alias Logflare.ContextCache.CacheBuster + alias Logflare.Billing + alias Logflare.ContextCache + alias Logflare.Backends + alias Logflare.Sources + alias Logflare.SourceSchemas + alias Logflare.Users + alias Logflare.TeamUsers + alias Logflare.Partners + alias Logflare.Auth + + alias Logflare.Repo + + def start_link(_) do + Supervisor.start_link(__MODULE__, []) + end + + @env Application.compile_env(:logflare, :env) + + @impl Supervisor + def init(_) do + Supervisor.init(get_children(@env), strategy: :one_for_one) + end + + defp get_children(:test) do + [ + ContextCache, + TeamUsers.Cache, + Partners.Cache, + Users.Cache, + Backends.Cache, + Sources.Cache, + Billing.Cache, + SourceSchemas.Cache, + Auth.Cache + ] + end + + defp get_children(_) do + hostname = ~c"#{Application.get_env(:logflare, Repo)[:hostname]}" + username = Application.get_env(:logflare, Repo)[:username] + password = Application.get_env(:logflare, Repo)[:password] + database = Application.get_env(:logflare, Repo)[:database] + port = Application.get_env(:logflare, Repo)[:port] + + slot = Application.get_env(:logflare, CacheBuster)[:replication_slot] + publications = Application.get_env(:logflare, CacheBuster)[:publications] + + get_children(:test) ++ + [ + # Follow Postgresql replication log and bust all our context caches + { + Cainophile.Adapters.Postgres, + register: Logflare.PgPublisher, + epgsql: %{ + host: hostname, + port: port, + username: username, + database: database, + password: password + }, + slot: slot, + wal_position: {"0", "0"}, + publications: publications + }, + ContextCache.CacheBuster + ] + end +end diff --git a/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs b/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs index c905d96ab..c3957dda4 100644 --- a/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs +++ b/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs @@ -17,10 +17,10 @@ defmodule Logflare.Repo.Migrations.SubscribeToPostgres do @disable_ddl_transaction true @disable_migration_lock true @username Application.get_env(:logflare, Logflare.Repo)[:username] - @slot Application.get_env(:logflare, Logflare.CacheBuster)[:replication_slot] + @slot Application.get_env(:logflare, Logflare.ContextCache.CacheBuster)[:replication_slot] @env Application.get_env(:logflare, :env) - @publications Application.get_env(:logflare, Logflare.CacheBuster)[:publications] - @publication_tables Application.get_env(:logflare, Logflare.CacheBuster)[:publication_tables] + @publications Application.get_env(:logflare, Logflare.ContextCache.CacheBuster)[:publications] + @publication_tables Application.get_env(:logflare, Logflare.ContextCache.CacheBuster)[:publication_tables] def up do if @env in [:dev, :test] do From 7aea58c924f8fc99aadb947d0d61786dab75cb65 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 17 Jul 2024 21:41:52 +0800 Subject: [PATCH 2/6] feat: initial cainophile syn implementation --- config/config.exs | 5 +- lib/logflare/context_cache/cache_buster.ex | 18 +++++- lib/logflare/context_cache/supervisor.ex | 59 +++++++++++------- .../context_cache/transaction_broadcaster.ex | 62 +++++++++++++++++++ lib/logflare/syn_event_handler.ex | 19 ++++++ mix.exs | 2 +- mix.lock | 2 +- 7 files changed, 141 insertions(+), 26 deletions(-) create mode 100644 lib/logflare/context_cache/transaction_broadcaster.ex create mode 100644 lib/logflare/syn_event_handler.ex diff --git a/config/config.exs b/config/config.exs index ce29e0633..4acae64dd 100644 --- a/config/config.exs +++ b/config/config.exs @@ -57,7 +57,10 @@ config :ueberauth, Ueberauth, config :phoenix, :json_library, Jason config :postgrex, :json_library, Jason -config :syn, scopes: [:endpoints] + +config :syn, + scopes: [:endpoints, :context_cache], + event_handler: Logflare.SynEventHandler oauth_common = [ repo: Logflare.Repo, diff --git a/lib/logflare/context_cache/cache_buster.ex b/lib/logflare/context_cache/cache_buster.ex index a983d8cc9..36c238997 100644 --- a/lib/logflare/context_cache/cache_buster.ex +++ b/lib/logflare/context_cache/cache_buster.ex @@ -16,8 +16,7 @@ defmodule Logflare.ContextCache.CacheBuster do def init(state) do Logger.put_process_level(self(), :error) - - Cainophile.Adapters.Postgres.subscribe(Logflare.PgPublisher, self()) + Phoenix.PubSub.subscribe(Logflare.PubSub, "wal") {:ok, state} end @@ -38,6 +37,21 @@ defmodule Logflare.ContextCache.CacheBuster do {:reply, :ok, state} end + def handle_info(:try_subscribe, state) do + try do + ContextCache.Supervisor.maybe_start_cainophile() + + ContextCache.Supervisor.publisher_name() + |> Cainophile.Adapters.Postgres.subscribe(self()) + catch + e -> + Logger.error("Error when trying to create cainophile subscription #{inspect(e)}") + end + + Process.send_after(self(), :try_subscribe, 5_000) + {:noreply, state} + end + def handle_info(%Transaction{changes: []}, state), do: {:noreply, state} def handle_info(%Transaction{changes: changes} = transaction, state) do diff --git a/lib/logflare/context_cache/supervisor.ex b/lib/logflare/context_cache/supervisor.ex index e74d90bbb..32e2b8ca2 100644 --- a/lib/logflare/context_cache/supervisor.ex +++ b/lib/logflare/context_cache/supervisor.ex @@ -18,14 +18,15 @@ defmodule Logflare.ContextCache.Supervisor do alias Logflare.Repo def start_link(_) do - Supervisor.start_link(__MODULE__, []) + Supervisor.start_link(__MODULE__, [], name: __MODULE__) end @env Application.compile_env(:logflare, :env) @impl Supervisor def init(_) do - Supervisor.init(get_children(@env), strategy: :one_for_one) + res = Supervisor.init(get_children(@env), strategy: :one_for_one) + res end defp get_children(:test) do @@ -43,6 +44,27 @@ defmodule Logflare.ContextCache.Supervisor do end defp get_children(_) do + get_children(:test) ++ + [ + ContextCache.TransactionBroadcaster, + ContextCache.CacheBuster + ] + end + + @doc """ + Returns the publisher :via name used for syn registry. + """ + def publisher_name do + {:via, :syn, {:context_cache, Logflare.PgPublisher}} + # {:global, Logflare.PgPublisher} + end + + def maybe_start_cainophile do + spec = cainophile_child_spec() + Supervisor.start_child(__MODULE__, spec) + end + + def cainophile_child_spec do hostname = ~c"#{Application.get_env(:logflare, Repo)[:hostname]}" username = Application.get_env(:logflare, Repo)[:username] password = Application.get_env(:logflare, Repo)[:password] @@ -52,24 +74,19 @@ defmodule Logflare.ContextCache.Supervisor do slot = Application.get_env(:logflare, CacheBuster)[:replication_slot] publications = Application.get_env(:logflare, CacheBuster)[:publications] - get_children(:test) ++ - [ - # Follow Postgresql replication log and bust all our context caches - { - Cainophile.Adapters.Postgres, - register: Logflare.PgPublisher, - epgsql: %{ - host: hostname, - port: port, - username: username, - database: database, - password: password - }, - slot: slot, - wal_position: {"0", "0"}, - publications: publications - }, - ContextCache.CacheBuster - ] + { + Cainophile.Adapters.Postgres, + register: publisher_name(), + epgsql: %{ + host: hostname, + port: port, + username: username, + database: database, + password: password + }, + slot: slot, + wal_position: {"0", "0"}, + publications: publications + } end end diff --git a/lib/logflare/context_cache/transaction_broadcaster.ex b/lib/logflare/context_cache/transaction_broadcaster.ex new file mode 100644 index 000000000..ab9b1a075 --- /dev/null +++ b/lib/logflare/context_cache/transaction_broadcaster.ex @@ -0,0 +1,62 @@ +defmodule Logflare.ContextCache.TransactionBroadcaster do + @moduledoc """ + Subscribes to cainophile and broadcasts all transactions + """ + use GenServer + + require Logger + + alias Logflare.ContextCache + alias Cainophile.Changes.{NewRecord, UpdatedRecord, DeletedRecord, Transaction} + + def start_link(init_args) do + GenServer.start_link(__MODULE__, init_args, name: __MODULE__) + end + + def init(state) do + Logger.put_process_level(self(), :error) + Process.send_after(self(), :try_subscribe, 1_000) + {:ok, state} + end + + @doc """ + Sets the Logger level for this process. It's started with level :error. + + To debug wal records set process to level :info and each transaction will be logged. + """ + + @spec set_log_level(Logger.levels()) :: :ok + def set_log_level(level) when is_atom(level) do + GenServer.call(__MODULE__, {:put_level, level}) + end + + def handle_call({:put_level, level}, _from, state) do + :ok = Logger.put_process_level(self(), level) + + {:reply, :ok, state} + end + + def handle_info(:try_subscribe, state) do + try do + ContextCache.Supervisor.maybe_start_cainophile() + + ContextCache.Supervisor.publisher_name() + |> Cainophile.Adapters.Postgres.subscribe(self()) + catch + e -> + Logger.error("Error when trying to create cainophile subscription #{inspect(e)}") + end + + Process.send_after(self(), :try_subscribe, 5_000) + {:noreply, state} + end + + def handle_info(%Transaction{changes: []}, state), do: {:noreply, state} + + def handle_info(%Transaction{changes: changes} = transaction, state) do + Logger.debug("WAL record received: #{inspect(transaction)}") + # broadcast it + Phoenix.PubSub.broadcast(Logflare.PubSub, "wal", transaction) + {:noreply, state} + end +end diff --git a/lib/logflare/syn_event_handler.ex b/lib/logflare/syn_event_handler.ex new file mode 100644 index 000000000..966902c57 --- /dev/null +++ b/lib/logflare/syn_event_handler.ex @@ -0,0 +1,19 @@ +defmodule Logflare.SynEventHandler do + @moduledoc """ + Event handler for syn + + Always keep oldest proces. + """ + + @behaviour :syn_event_handler + + require Logger + @impl true + def resolve_registry_conflict(scope, name, pid1, pid2) do + Logger.warning( + "Resolving registry conflict for #{scope}, #{inspect(name)}, #{inspect(pid1)} and #{inspect(pid2)}. Keeping #{inspect(pid1)}" + ) + + pid1 + end +end diff --git a/mix.exs b/mix.exs index 517ca5ea4..6cb46fd8e 100644 --- a/mix.exs +++ b/mix.exs @@ -193,7 +193,7 @@ defmodule Logflare.Mixfile do {:contex, "~> 0.3.0"}, # Postgres Subscribe - {:cainophile, github: "Logflare/cainophile"}, + {:cainophile, github: "Logflare/cainophile", ref: "cfe99f9"}, {:open_api_spex, "~> 3.16"}, {:grpc, "~> 0.5.0"}, {:protobuf, "~> 0.10"}, diff --git a/mix.lock b/mix.lock index f7296bf22..f53975061 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "broadway": {:git, "https://github.com/Logflare/broadway.git", "092f4ab5891d37be4fb84f4ad91a329c05419db8", [ref: "092f4ab"]}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"}, - "cainophile": {:git, "https://github.com/Logflare/cainophile.git", "ea2e63639b3ccd6db66069a57e31341bdb863175", []}, + "cainophile": {:git, "https://github.com/Logflare/cainophile.git", "cfe99f9ac95917290f83631c53412da8ac717d7b", [ref: "cfe99f9"]}, "castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, From 7224bfb6b2b7f4f83a530527811a399afd033285 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 18 Jul 2024 14:20:19 +0800 Subject: [PATCH 3/6] feat: transaction broadcasting --- config/test.exs | 1 + lib/logflare/context_cache/cache_buster.ex | 6 +- lib/logflare/context_cache/supervisor.ex | 47 ++++--- .../context_cache/transaction_broadcaster.ex | 40 +++--- mix.exs | 2 +- mix.lock | 2 +- test/logflare/context_cache_test.exs | 119 +++++++++++++++--- 7 files changed, 163 insertions(+), 54 deletions(-) diff --git a/config/test.exs b/config/test.exs index e9a6560a6..9e9ff026a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -28,6 +28,7 @@ config :logflare, Logflare.Repo, password: "postgres", database: "logflare_test", hostname: "localhost", + port: 5432, pool_size: 10, pool: Ecto.Adapters.SQL.Sandbox diff --git a/lib/logflare/context_cache/cache_buster.ex b/lib/logflare/context_cache/cache_buster.ex index 36c238997..496610726 100644 --- a/lib/logflare/context_cache/cache_buster.ex +++ b/lib/logflare/context_cache/cache_buster.ex @@ -16,10 +16,14 @@ defmodule Logflare.ContextCache.CacheBuster do def init(state) do Logger.put_process_level(self(), :error) - Phoenix.PubSub.subscribe(Logflare.PubSub, "wal") + subscribe_to_transactions() {:ok, state} end + def subscribe_to_transactions do + Phoenix.PubSub.subscribe(Logflare.PubSub, "wal_transactions") + end + @doc """ Sets the Logger level for this process. It's started with level :error. diff --git a/lib/logflare/context_cache/supervisor.ex b/lib/logflare/context_cache/supervisor.ex index 32e2b8ca2..1b9c13ff8 100644 --- a/lib/logflare/context_cache/supervisor.ex +++ b/lib/logflare/context_cache/supervisor.ex @@ -59,12 +59,23 @@ defmodule Logflare.ContextCache.Supervisor do # {:global, Logflare.PgPublisher} end + @doc """ + Attempts to start a cainophile child in the ContextCache.Supervisor. + If it already exists, it will return with an error tuple. + """ def maybe_start_cainophile do spec = cainophile_child_spec() + Supervisor.which_children(__MODULE__) Supervisor.start_child(__MODULE__, spec) end - def cainophile_child_spec do + def remove_cainophile do + Supervisor.terminate_child(__MODULE__, Cainophile.Adapters.Postgres) + Supervisor.delete_child(__MODULE__, Cainophile.Adapters.Postgres) + :ok + end + + defp cainophile_child_spec do hostname = ~c"#{Application.get_env(:logflare, Repo)[:hostname]}" username = Application.get_env(:logflare, Repo)[:username] password = Application.get_env(:logflare, Repo)[:password] @@ -74,19 +85,27 @@ defmodule Logflare.ContextCache.Supervisor do slot = Application.get_env(:logflare, CacheBuster)[:replication_slot] publications = Application.get_env(:logflare, CacheBuster)[:publications] - { - Cainophile.Adapters.Postgres, - register: publisher_name(), - epgsql: %{ - host: hostname, - port: port, - username: username, - database: database, - password: password - }, - slot: slot, - wal_position: {"0", "0"}, - publications: publications + %{ + id: Cainophile.Adapters.Postgres, + restart: :permanent, + type: :worker, + start: + {Cainophile.Adapters.Postgres, :start_link, + [ + [ + register: publisher_name(), + epgsql: %{ + host: hostname, + port: port, + username: username, + database: database, + password: password + }, + slot: slot, + wal_position: {"0", "0"}, + publications: publications + ] + ]} } end end diff --git a/lib/logflare/context_cache/transaction_broadcaster.ex b/lib/logflare/context_cache/transaction_broadcaster.ex index ab9b1a075..5081de362 100644 --- a/lib/logflare/context_cache/transaction_broadcaster.ex +++ b/lib/logflare/context_cache/transaction_broadcaster.ex @@ -7,15 +7,17 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do require Logger alias Logflare.ContextCache - alias Cainophile.Changes.{NewRecord, UpdatedRecord, DeletedRecord, Transaction} + alias Cainophile.Changes.Transaction def start_link(init_args) do GenServer.start_link(__MODULE__, init_args, name: __MODULE__) end - def init(state) do + def init(args) do + state = %{interval: Keyword.get(args, :interval, 5_000)} Logger.put_process_level(self(), :error) - Process.send_after(self(), :try_subscribe, 1_000) + attempt_subscribe(self()) + Process.send_after(self(), :try_subscribe, min(state.interval, 5_000)) {:ok, state} end @@ -37,26 +39,30 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do end def handle_info(:try_subscribe, state) do - try do - ContextCache.Supervisor.maybe_start_cainophile() + attempt_subscribe(self()) - ContextCache.Supervisor.publisher_name() - |> Cainophile.Adapters.Postgres.subscribe(self()) - catch - e -> - Logger.error("Error when trying to create cainophile subscription #{inspect(e)}") - end - - Process.send_after(self(), :try_subscribe, 5_000) + Process.send_after(self(), :try_subscribe, state.interval) {:noreply, state} end - def handle_info(%Transaction{changes: []}, state), do: {:noreply, state} + def handle_info(%Transaction{changes: []}, state), do: {:noreply, state |> dbg()} - def handle_info(%Transaction{changes: changes} = transaction, state) do - Logger.debug("WAL record received: #{inspect(transaction)}") + def handle_info(%Transaction{changes: _changes} = transaction, state) do + Logger.info("WAL record received: #{inspect(transaction)}") # broadcast it - Phoenix.PubSub.broadcast(Logflare.PubSub, "wal", transaction) + Phoenix.PubSub.broadcast(Logflare.PubSub, "wal_transactions", transaction) {:noreply, state} end + + defp attempt_subscribe(pid) do + try do + ContextCache.Supervisor.maybe_start_cainophile() + + ContextCache.Supervisor.publisher_name() + |> Cainophile.Adapters.Postgres.subscribe(pid) + rescue + e -> + Logger.warning("Could not subscribe to Cainophile, #{inspect(e)}") + end + end end diff --git a/mix.exs b/mix.exs index 6cb46fd8e..a50757ce0 100644 --- a/mix.exs +++ b/mix.exs @@ -193,7 +193,7 @@ defmodule Logflare.Mixfile do {:contex, "~> 0.3.0"}, # Postgres Subscribe - {:cainophile, github: "Logflare/cainophile", ref: "cfe99f9"}, + {:cainophile, github: "Logflare/cainophile", ref: "c310d69"}, {:open_api_spex, "~> 3.16"}, {:grpc, "~> 0.5.0"}, {:protobuf, "~> 0.10"}, diff --git a/mix.lock b/mix.lock index f53975061..82699a1e4 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "broadway": {:git, "https://github.com/Logflare/broadway.git", "092f4ab5891d37be4fb84f4ad91a329c05419db8", [ref: "092f4ab"]}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"}, - "cainophile": {:git, "https://github.com/Logflare/cainophile.git", "cfe99f9ac95917290f83631c53412da8ac717d7b", [ref: "cfe99f9"]}, + "cainophile": {:git, "https://github.com/Logflare/cainophile.git", "c310d69d753a1d05c78237d2ab4d234b6d4b3bfb", [ref: "c310d69"]}, "castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, diff --git a/test/logflare/context_cache_test.exs b/test/logflare/context_cache_test.exs index 3f6fa1154..dde956489 100644 --- a/test/logflare/context_cache_test.exs +++ b/test/logflare/context_cache_test.exs @@ -3,31 +3,110 @@ defmodule Logflare.ContextCacheTest do alias Logflare.ContextCache alias Logflare.Sources - setup do - user = insert(:user) - insert(:plan, name: "Free") - source = insert(:source, user: user) - args = [token: source.token] - source = Sources.Cache.get_by(args) - fun = :get_by - cache_key = {fun, [args]} - %{source: source, cache_key: cache_key} + describe "ContextCache functions" do + setup do + user = insert(:user) + insert(:plan, name: "Free") + source = insert(:source, user: user) + args = [token: source.token] + source = Sources.Cache.get_by(args) + fun = :get_by + cache_key = {fun, [args]} + %{source: source, cache_key: cache_key} + end + + test "cache_name/1" do + assert Sources.Cache == ContextCache.cache_name(Sources) + end + + test "apply_fun/3", %{cache_key: cache_key} do + # apply_fun was called in the setup when we called `Sources.Cache.get_by/1` + # here's we're making sure it did get cached correctly + assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key) + end + + test "bust_keys/1", %{source: source, cache_key: cache_key} do + assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}]) + assert is_nil(Cachex.get!(Sources.Cache, cache_key)) + match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"} + assert [] = :ets.match(ContextCache, match) + end end - test "cache_name/1" do - assert Sources.Cache == ContextCache.cache_name(Sources) + describe "ContextCache.Supervisor" do + setup do + ContextCache.Supervisor.remove_cainophile() + + on_exit(fn -> + ContextCache.Supervisor.remove_cainophile() + end) + + :ok + end + + test "maybe_start_cainophile will attempt to start a cainophile child" do + assert {:ok, _pid} = ContextCache.Supervisor.maybe_start_cainophile() + + assert {:error, {:already_started, _}} = + ContextCache.Supervisor.maybe_start_cainophile() + + assert get_cainophile_child() + end + + test "remove_cainophile/1 will remove cainophile child from tree" do + refute get_cainophile_child() + assert {:ok, _pid} = ContextCache.Supervisor.maybe_start_cainophile() + assert get_cainophile_child() + assert :ok = ContextCache.Supervisor.remove_cainophile() + refute get_cainophile_child() + end + + test "TransactionBroadcaster will try to start cainophile " do + refute get_cainophile_child() + start_supervised!({ContextCache.TransactionBroadcaster, interval: 100}) + :timer.sleep(500) + assert get_cainophile_child() + end end - test "apply_fun/3", %{cache_key: cache_key} do - # apply_fun was called in the setup when we called `Sources.Cache.get_by/1` - # here's we're making sure it did get cached correctly - assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key) + describe "unboxed transaction" do + setup do + ContextCache.Supervisor.remove_cainophile() + + on_exit(fn -> + ContextCache.Supervisor.remove_cainophile() + + Ecto.Adapters.SQL.Sandbox.unboxed_run(Logflare.Repo, fn -> + for u <- Logflare.Repo.all(Logflare.User) do + Logflare.Repo.delete(u) + end + end) + end) + + :ok + end + + test "TransactionBroadcaster subscribes to wal and broadcasts transactions" do + ContextCache.CacheBuster.subscribe_to_transactions() + start_supervised!({ContextCache.TransactionBroadcaster, interval: 100}) + :timer.sleep(200) + + Ecto.Adapters.SQL.Sandbox.unboxed_run(Logflare.Repo, fn -> + insert(:user) + end) + + :timer.sleep(500) + assert_received %Cainophile.Changes.Transaction{} + end end - test "bust_keys/1", %{source: source, cache_key: cache_key} do - assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}]) - assert is_nil(Cachex.get!(Sources.Cache, cache_key)) - match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"} - assert [] = :ets.match(ContextCache, match) + # describe "ContextCache" + + defp get_cainophile_child() do + for {Cainophile.Adapters.Postgres, _, _, _} = child <- + Supervisor.which_children(ContextCache.Supervisor) do + child + end + |> List.first() end end From bcf3d6a1300309645de1fdc844af665af2358d88 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 18 Jul 2024 15:12:05 +0800 Subject: [PATCH 4/6] fix: don't try to call Supervisor in init/1 --- lib/logflare/context_cache/transaction_broadcaster.ex | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logflare/context_cache/transaction_broadcaster.ex b/lib/logflare/context_cache/transaction_broadcaster.ex index 5081de362..7a3ea813d 100644 --- a/lib/logflare/context_cache/transaction_broadcaster.ex +++ b/lib/logflare/context_cache/transaction_broadcaster.ex @@ -16,8 +16,7 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do def init(args) do state = %{interval: Keyword.get(args, :interval, 5_000)} Logger.put_process_level(self(), :error) - attempt_subscribe(self()) - Process.send_after(self(), :try_subscribe, min(state.interval, 5_000)) + Process.send_after(self(), :try_subscribe, min(state.interval, 1_000)) {:ok, state} end @@ -39,7 +38,8 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do end def handle_info(:try_subscribe, state) do - attempt_subscribe(self()) + pid = self() + attempt_subscribe(pid) Process.send_after(self(), :try_subscribe, state.interval) {:noreply, state} From 2b86c060f892893e729eb915d2e1dd55d0e4b9b2 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 18 Jul 2024 15:33:55 +0800 Subject: [PATCH 5/6] feat: add logging --- lib/logflare/context_cache/cache_buster.ex | 22 ++----------------- .../context_cache/transaction_broadcaster.ex | 14 +++++++++--- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/lib/logflare/context_cache/cache_buster.ex b/lib/logflare/context_cache/cache_buster.ex index 496610726..0da81dd9e 100644 --- a/lib/logflare/context_cache/cache_buster.ex +++ b/lib/logflare/context_cache/cache_buster.ex @@ -15,7 +15,6 @@ defmodule Logflare.ContextCache.CacheBuster do end def init(state) do - Logger.put_process_level(self(), :error) subscribe_to_transactions() {:ok, state} end @@ -27,7 +26,7 @@ defmodule Logflare.ContextCache.CacheBuster do @doc """ Sets the Logger level for this process. It's started with level :error. - To debug wal records set process to level :info and each transaction will be logged. + To debug wal records set process to level :debug and each transaction will be logged. """ @spec set_log_level(Logger.levels()) :: :ok @@ -41,25 +40,8 @@ defmodule Logflare.ContextCache.CacheBuster do {:reply, :ok, state} end - def handle_info(:try_subscribe, state) do - try do - ContextCache.Supervisor.maybe_start_cainophile() - - ContextCache.Supervisor.publisher_name() - |> Cainophile.Adapters.Postgres.subscribe(self()) - catch - e -> - Logger.error("Error when trying to create cainophile subscription #{inspect(e)}") - end - - Process.send_after(self(), :try_subscribe, 5_000) - {:noreply, state} - end - - def handle_info(%Transaction{changes: []}, state), do: {:noreply, state} - def handle_info(%Transaction{changes: changes} = transaction, state) do - Logger.info("WAL record received: #{inspect(transaction)}") + Logger.debug("WAL record received from pubsub: #{inspect(transaction)}") for record <- changes, record = handle_record(record), diff --git a/lib/logflare/context_cache/transaction_broadcaster.ex b/lib/logflare/context_cache/transaction_broadcaster.ex index 7a3ea813d..b7b349e61 100644 --- a/lib/logflare/context_cache/transaction_broadcaster.ex +++ b/lib/logflare/context_cache/transaction_broadcaster.ex @@ -15,7 +15,6 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do def init(args) do state = %{interval: Keyword.get(args, :interval, 5_000)} - Logger.put_process_level(self(), :error) Process.send_after(self(), :try_subscribe, min(state.interval, 1_000)) {:ok, state} end @@ -23,7 +22,7 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do @doc """ Sets the Logger level for this process. It's started with level :error. - To debug wal records set process to level :info and each transaction will be logged. + To debug wal records set process to level :debug and each transaction will be logged. """ @spec set_log_level(Logger.levels()) :: :ok @@ -48,7 +47,7 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do def handle_info(%Transaction{changes: []}, state), do: {:noreply, state |> dbg()} def handle_info(%Transaction{changes: _changes} = transaction, state) do - Logger.info("WAL record received: #{inspect(transaction)}") + Logger.debug("WAL record received from cainophile: #{inspect(transaction)}") # broadcast it Phoenix.PubSub.broadcast(Logflare.PubSub, "wal_transactions", transaction) {:noreply, state} @@ -57,6 +56,15 @@ defmodule Logflare.ContextCache.TransactionBroadcaster do defp attempt_subscribe(pid) do try do ContextCache.Supervisor.maybe_start_cainophile() + |> case do + {:ok, pid} -> + Logger.info( + "Successfully started cainophile on #{inspect(Node.self())}, pid: #{inspect(pid)}" + ) + + {:error, _} -> + :noop + end ContextCache.Supervisor.publisher_name() |> Cainophile.Adapters.Postgres.subscribe(pid) From 74fc0f69620a85ed9b76589e6b082f51d00c7e48 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 18 Jul 2024 15:34:19 +0800 Subject: [PATCH 6/6] chore: version bump --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index cb1ad9b47..310e23359 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.7.9 \ No newline at end of file +1.7.10 \ No newline at end of file