diff --git a/VERSION b/VERSION index cb1ad9b47..310e23359 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.7.9 \ No newline at end of file +1.7.10 \ No newline at end of file diff --git a/config/config.exs b/config/config.exs index 351417f33..4acae64dd 100644 --- a/config/config.exs +++ b/config/config.exs @@ -57,7 +57,10 @@ config :ueberauth, Ueberauth, config :phoenix, :json_library, Jason config :postgrex, :json_library, Jason -config :syn, scopes: [:endpoints] + +config :syn, + scopes: [:endpoints, :context_cache], + event_handler: Logflare.SynEventHandler oauth_common = [ repo: Logflare.Repo, @@ -109,7 +112,7 @@ config :scrivener_html, # If you use a single view style everywhere, you can configure it here. See View Styles below for more info. view_style: :bootstrap_v4 -config :logflare, Logflare.CacheBuster, +config :logflare, Logflare.ContextCache.CacheBuster, replication_slot: :temporary, publications: ["logflare_pub"], # remember to add an ALTER PUBLICATION ... migration when changing published tables! diff --git a/config/test.exs b/config/test.exs index e9a6560a6..9e9ff026a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -28,6 +28,7 @@ config :logflare, Logflare.Repo, password: "postgres", database: "logflare_test", hostname: "localhost", + port: 5432, pool_size: 10, pool: Ecto.Adapters.SQL.Sandbox diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index 67e63e111..63e018251 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -3,17 +3,9 @@ defmodule Logflare.Application do use Application require Logger - alias Logflare.Billing alias Logflare.ContextCache - alias Logflare.Backends alias Logflare.Logs alias Logflare.SingleTenant - alias Logflare.Sources - alias Logflare.SourceSchemas - alias Logflare.Users - alias Logflare.TeamUsers - alias Logflare.Partners - alias Logflare.Auth alias Logflare.SystemMetricsSup alias Logflare.Sources.Counters alias Logflare.Sources.RateCounters @@ -47,16 +39,8 @@ defmodule Logflare.Application do [ Counters, RateCounters, - ContextCache, - TeamUsers.Cache, - Users.Cache, - Sources.Cache, - Partners.Cache, - Backends.Cache, - Billing.Cache, - SourceSchemas.Cache, - Auth.Cache, Logs.LogEvents.Cache, + ContextCache.Supervisor, {Phoenix.PubSub, name: Logflare.PubSub}, PubSubRates, Logs.RejectedLogEvents, @@ -79,15 +63,6 @@ defmodule Logflare.Application do end defp get_children(_) do - # Database options for Postgres notifications - hostname = ~c"#{Application.get_env(:logflare, Logflare.Repo)[:hostname]}" - username = Application.get_env(:logflare, Logflare.Repo)[:username] - password = Application.get_env(:logflare, Logflare.Repo)[:password] - database = Application.get_env(:logflare, Logflare.Repo)[:database] - - port = Application.get_env(:logflare, Logflare.Repo)[:port] - slot = Application.get_env(:logflare, Logflare.CacheBuster)[:replication_slot] - publications = Application.get_env(:logflare, Logflare.CacheBuster)[:publications] topologies = Application.get_env(:libcluster, :topologies, []) grpc_port = Application.get_env(:grpc, :port) ssl = Application.get_env(:logflare, :ssl) @@ -103,36 +78,10 @@ defmodule Logflare.Application do {Cluster.Supervisor, [topologies, [name: Logflare.ClusterSupervisor]]}, Logflare.Repo, {Phoenix.PubSub, name: Logflare.PubSub, pool_size: pool_size}, - # Context Caches - TeamUsers.Cache, - ContextCache, - Partners.Cache, - Users.Cache, - Backends.Cache, - Sources.Cache, - Billing.Cache, - SourceSchemas.Cache, - Auth.Cache, Logs.LogEvents.Cache, PubSubRates, + ContextCache.Supervisor, - # Follow Postgresql replication log and bust all our context caches - { - Cainophile.Adapters.Postgres, - register: Logflare.PgPublisher, - epgsql: %{ - host: hostname, - port: port, - username: username, - database: database, - password: password - }, - slot: slot, - wal_position: {"0", "0"}, - publications: publications - }, - Logflare.CacheBuster, - # Sources # v1 ingest pipline {Registry, name: Logflare.V1SourceRegistry, diff --git a/lib/logflare/cache_buster.ex b/lib/logflare/context_cache/cache_buster.ex similarity index 94% rename from lib/logflare/cache_buster.ex rename to lib/logflare/context_cache/cache_buster.ex index 4111c81fa..0da81dd9e 100644 --- a/lib/logflare/cache_buster.ex +++ b/lib/logflare/context_cache/cache_buster.ex @@ -1,4 +1,4 @@ -defmodule Logflare.CacheBuster do +defmodule Logflare.ContextCache.CacheBuster do @moduledoc """ Monitors our Postgres replication log and busts the cache accordingly. """ @@ -15,16 +15,18 @@ defmodule Logflare.CacheBuster do end def init(state) do - Logger.put_process_level(self(), :error) - - Cainophile.Adapters.Postgres.subscribe(Logflare.PgPublisher, self()) + subscribe_to_transactions() {:ok, state} end + def subscribe_to_transactions do + Phoenix.PubSub.subscribe(Logflare.PubSub, "wal_transactions") + end + @doc """ Sets the Logger level for this process. It's started with level :error. - To debug wal records set process to level :info and each transaction will be logged. + To debug wal records set process to level :debug and each transaction will be logged. """ @spec set_log_level(Logger.levels()) :: :ok @@ -38,10 +40,8 @@ defmodule Logflare.CacheBuster do {:reply, :ok, state} end - def handle_info(%Transaction{changes: []}, state), do: {:noreply, state} - def handle_info(%Transaction{changes: changes} = transaction, state) do - Logger.info("WAL record received: #{inspect(transaction)}") + Logger.debug("WAL record received from pubsub: #{inspect(transaction)}") for record <- changes, record = handle_record(record), diff --git a/lib/logflare/context_cache/supervisor.ex b/lib/logflare/context_cache/supervisor.ex new file mode 100644 index 000000000..1b9c13ff8 --- /dev/null +++ b/lib/logflare/context_cache/supervisor.ex @@ -0,0 +1,111 @@ +defmodule Logflare.ContextCache.Supervisor do + @moduledoc false + + use Supervisor + + alias Logflare.Backends + alias Logflare.ContextCache.CacheBuster + alias Logflare.Billing + alias Logflare.ContextCache + alias Logflare.Backends + alias Logflare.Sources + alias Logflare.SourceSchemas + alias Logflare.Users + alias Logflare.TeamUsers + alias Logflare.Partners + alias Logflare.Auth + + alias Logflare.Repo + + def start_link(_) do + Supervisor.start_link(__MODULE__, [], name: __MODULE__) + end + + @env Application.compile_env(:logflare, :env) + + @impl Supervisor + def init(_) do + res = Supervisor.init(get_children(@env), strategy: :one_for_one) + res + end + + defp get_children(:test) do + [ + ContextCache, + TeamUsers.Cache, + Partners.Cache, + Users.Cache, + Backends.Cache, + Sources.Cache, + Billing.Cache, + SourceSchemas.Cache, + Auth.Cache + ] + end + + defp get_children(_) do + get_children(:test) ++ + [ + ContextCache.TransactionBroadcaster, + ContextCache.CacheBuster + ] + end + + @doc """ + Returns the publisher :via name used for syn registry. + """ + def publisher_name do + {:via, :syn, {:context_cache, Logflare.PgPublisher}} + # {:global, Logflare.PgPublisher} + end + + @doc """ + Attempts to start a cainophile child in the ContextCache.Supervisor. + If it already exists, it will return with an error tuple. + """ + def maybe_start_cainophile do + spec = cainophile_child_spec() + Supervisor.which_children(__MODULE__) + Supervisor.start_child(__MODULE__, spec) + end + + def remove_cainophile do + Supervisor.terminate_child(__MODULE__, Cainophile.Adapters.Postgres) + Supervisor.delete_child(__MODULE__, Cainophile.Adapters.Postgres) + :ok + end + + defp cainophile_child_spec do + hostname = ~c"#{Application.get_env(:logflare, Repo)[:hostname]}" + username = Application.get_env(:logflare, Repo)[:username] + password = Application.get_env(:logflare, Repo)[:password] + database = Application.get_env(:logflare, Repo)[:database] + port = Application.get_env(:logflare, Repo)[:port] + + slot = Application.get_env(:logflare, CacheBuster)[:replication_slot] + publications = Application.get_env(:logflare, CacheBuster)[:publications] + + %{ + id: Cainophile.Adapters.Postgres, + restart: :permanent, + type: :worker, + start: + {Cainophile.Adapters.Postgres, :start_link, + [ + [ + register: publisher_name(), + epgsql: %{ + host: hostname, + port: port, + username: username, + database: database, + password: password + }, + slot: slot, + wal_position: {"0", "0"}, + publications: publications + ] + ]} + } + end +end diff --git a/lib/logflare/context_cache/transaction_broadcaster.ex b/lib/logflare/context_cache/transaction_broadcaster.ex new file mode 100644 index 000000000..b7b349e61 --- /dev/null +++ b/lib/logflare/context_cache/transaction_broadcaster.ex @@ -0,0 +1,76 @@ +defmodule Logflare.ContextCache.TransactionBroadcaster do + @moduledoc """ + Subscribes to cainophile and broadcasts all transactions + """ + use GenServer + + require Logger + + alias Logflare.ContextCache + alias Cainophile.Changes.Transaction + + def start_link(init_args) do + GenServer.start_link(__MODULE__, init_args, name: __MODULE__) + end + + def init(args) do + state = %{interval: Keyword.get(args, :interval, 5_000)} + Process.send_after(self(), :try_subscribe, min(state.interval, 1_000)) + {:ok, state} + end + + @doc """ + Sets the Logger level for this process. It's started with level :error. + + To debug wal records set process to level :debug and each transaction will be logged. + """ + + @spec set_log_level(Logger.levels()) :: :ok + def set_log_level(level) when is_atom(level) do + GenServer.call(__MODULE__, {:put_level, level}) + end + + def handle_call({:put_level, level}, _from, state) do + :ok = Logger.put_process_level(self(), level) + + {:reply, :ok, state} + end + + def handle_info(:try_subscribe, state) do + pid = self() + attempt_subscribe(pid) + + Process.send_after(self(), :try_subscribe, state.interval) + {:noreply, state} + end + + def handle_info(%Transaction{changes: []}, state), do: {:noreply, state |> dbg()} + + def handle_info(%Transaction{changes: _changes} = transaction, state) do + Logger.debug("WAL record received from cainophile: #{inspect(transaction)}") + # broadcast it + Phoenix.PubSub.broadcast(Logflare.PubSub, "wal_transactions", transaction) + {:noreply, state} + end + + defp attempt_subscribe(pid) do + try do + ContextCache.Supervisor.maybe_start_cainophile() + |> case do + {:ok, pid} -> + Logger.info( + "Successfully started cainophile on #{inspect(Node.self())}, pid: #{inspect(pid)}" + ) + + {:error, _} -> + :noop + end + + ContextCache.Supervisor.publisher_name() + |> Cainophile.Adapters.Postgres.subscribe(pid) + rescue + e -> + Logger.warning("Could not subscribe to Cainophile, #{inspect(e)}") + end + end +end diff --git a/lib/logflare/syn_event_handler.ex b/lib/logflare/syn_event_handler.ex new file mode 100644 index 000000000..966902c57 --- /dev/null +++ b/lib/logflare/syn_event_handler.ex @@ -0,0 +1,19 @@ +defmodule Logflare.SynEventHandler do + @moduledoc """ + Event handler for syn + + Always keep oldest proces. + """ + + @behaviour :syn_event_handler + + require Logger + @impl true + def resolve_registry_conflict(scope, name, pid1, pid2) do + Logger.warning( + "Resolving registry conflict for #{scope}, #{inspect(name)}, #{inspect(pid1)} and #{inspect(pid2)}. Keeping #{inspect(pid1)}" + ) + + pid1 + end +end diff --git a/mix.exs b/mix.exs index 517ca5ea4..a50757ce0 100644 --- a/mix.exs +++ b/mix.exs @@ -193,7 +193,7 @@ defmodule Logflare.Mixfile do {:contex, "~> 0.3.0"}, # Postgres Subscribe - {:cainophile, github: "Logflare/cainophile"}, + {:cainophile, github: "Logflare/cainophile", ref: "c310d69"}, {:open_api_spex, "~> 3.16"}, {:grpc, "~> 0.5.0"}, {:protobuf, "~> 0.10"}, diff --git a/mix.lock b/mix.lock index f7296bf22..82699a1e4 100644 --- a/mix.lock +++ b/mix.lock @@ -8,7 +8,7 @@ "broadway": {:git, "https://github.com/Logflare/broadway.git", "092f4ab5891d37be4fb84f4ad91a329c05419db8", [ref: "092f4ab"]}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cachex": {:hex, :cachex, "3.6.0", "14a1bfbeee060dd9bec25a5b6f4e4691e3670ebda28c8ba2884b12fe30b36bf8", [:mix], [{:eternal, "~> 1.2", [hex: :eternal, repo: "hexpm", optional: false]}, {:jumper, "~> 1.0", [hex: :jumper, repo: "hexpm", optional: false]}, {:sleeplocks, "~> 1.1", [hex: :sleeplocks, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "ebf24e373883bc8e0c8d894a63bbe102ae13d918f790121f5cfe6e485cc8e2e2"}, - "cainophile": {:git, "https://github.com/Logflare/cainophile.git", "ea2e63639b3ccd6db66069a57e31341bdb863175", []}, + "cainophile": {:git, "https://github.com/Logflare/cainophile.git", "c310d69d753a1d05c78237d2ab4d234b6d4b3bfb", [ref: "c310d69"]}, "castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, diff --git a/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs b/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs index c905d96ab..c3957dda4 100644 --- a/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs +++ b/priv/repo/migrations/20210729161959_subscribe_to_postgres.exs @@ -17,10 +17,10 @@ defmodule Logflare.Repo.Migrations.SubscribeToPostgres do @disable_ddl_transaction true @disable_migration_lock true @username Application.get_env(:logflare, Logflare.Repo)[:username] - @slot Application.get_env(:logflare, Logflare.CacheBuster)[:replication_slot] + @slot Application.get_env(:logflare, Logflare.ContextCache.CacheBuster)[:replication_slot] @env Application.get_env(:logflare, :env) - @publications Application.get_env(:logflare, Logflare.CacheBuster)[:publications] - @publication_tables Application.get_env(:logflare, Logflare.CacheBuster)[:publication_tables] + @publications Application.get_env(:logflare, Logflare.ContextCache.CacheBuster)[:publications] + @publication_tables Application.get_env(:logflare, Logflare.ContextCache.CacheBuster)[:publication_tables] def up do if @env in [:dev, :test] do diff --git a/test/logflare/context_cache_test.exs b/test/logflare/context_cache_test.exs index 3f6fa1154..dde956489 100644 --- a/test/logflare/context_cache_test.exs +++ b/test/logflare/context_cache_test.exs @@ -3,31 +3,110 @@ defmodule Logflare.ContextCacheTest do alias Logflare.ContextCache alias Logflare.Sources - setup do - user = insert(:user) - insert(:plan, name: "Free") - source = insert(:source, user: user) - args = [token: source.token] - source = Sources.Cache.get_by(args) - fun = :get_by - cache_key = {fun, [args]} - %{source: source, cache_key: cache_key} + describe "ContextCache functions" do + setup do + user = insert(:user) + insert(:plan, name: "Free") + source = insert(:source, user: user) + args = [token: source.token] + source = Sources.Cache.get_by(args) + fun = :get_by + cache_key = {fun, [args]} + %{source: source, cache_key: cache_key} + end + + test "cache_name/1" do + assert Sources.Cache == ContextCache.cache_name(Sources) + end + + test "apply_fun/3", %{cache_key: cache_key} do + # apply_fun was called in the setup when we called `Sources.Cache.get_by/1` + # here's we're making sure it did get cached correctly + assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key) + end + + test "bust_keys/1", %{source: source, cache_key: cache_key} do + assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}]) + assert is_nil(Cachex.get!(Sources.Cache, cache_key)) + match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"} + assert [] = :ets.match(ContextCache, match) + end end - test "cache_name/1" do - assert Sources.Cache == ContextCache.cache_name(Sources) + describe "ContextCache.Supervisor" do + setup do + ContextCache.Supervisor.remove_cainophile() + + on_exit(fn -> + ContextCache.Supervisor.remove_cainophile() + end) + + :ok + end + + test "maybe_start_cainophile will attempt to start a cainophile child" do + assert {:ok, _pid} = ContextCache.Supervisor.maybe_start_cainophile() + + assert {:error, {:already_started, _}} = + ContextCache.Supervisor.maybe_start_cainophile() + + assert get_cainophile_child() + end + + test "remove_cainophile/1 will remove cainophile child from tree" do + refute get_cainophile_child() + assert {:ok, _pid} = ContextCache.Supervisor.maybe_start_cainophile() + assert get_cainophile_child() + assert :ok = ContextCache.Supervisor.remove_cainophile() + refute get_cainophile_child() + end + + test "TransactionBroadcaster will try to start cainophile " do + refute get_cainophile_child() + start_supervised!({ContextCache.TransactionBroadcaster, interval: 100}) + :timer.sleep(500) + assert get_cainophile_child() + end end - test "apply_fun/3", %{cache_key: cache_key} do - # apply_fun was called in the setup when we called `Sources.Cache.get_by/1` - # here's we're making sure it did get cached correctly - assert {:cached, %Logflare.Source{}} = Cachex.get!(Sources.Cache, cache_key) + describe "unboxed transaction" do + setup do + ContextCache.Supervisor.remove_cainophile() + + on_exit(fn -> + ContextCache.Supervisor.remove_cainophile() + + Ecto.Adapters.SQL.Sandbox.unboxed_run(Logflare.Repo, fn -> + for u <- Logflare.Repo.all(Logflare.User) do + Logflare.Repo.delete(u) + end + end) + end) + + :ok + end + + test "TransactionBroadcaster subscribes to wal and broadcasts transactions" do + ContextCache.CacheBuster.subscribe_to_transactions() + start_supervised!({ContextCache.TransactionBroadcaster, interval: 100}) + :timer.sleep(200) + + Ecto.Adapters.SQL.Sandbox.unboxed_run(Logflare.Repo, fn -> + insert(:user) + end) + + :timer.sleep(500) + assert_received %Cainophile.Changes.Transaction{} + end end - test "bust_keys/1", %{source: source, cache_key: cache_key} do - assert {:ok, :busted} = ContextCache.bust_keys([{Sources, source.id}]) - assert is_nil(Cachex.get!(Sources.Cache, cache_key)) - match = {:entry, {{Sources, source.id}, :_}, :_, :_, :"$1"} - assert [] = :ets.match(ContextCache, match) + # describe "ContextCache" + + defp get_cainophile_child() do + for {Cainophile.Adapters.Postgres, _, _, _} = child <- + Supervisor.which_children(ContextCache.Supervisor) do + child + end + |> List.first() end end