From dd2a9f73e5429168ecce9b44ccb24220d9bfbe38 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 15 Nov 2023 02:34:18 +0800 Subject: [PATCH 1/4] fix: :buffer_counter_not_found bug --- lib/logflare/source/recent_logs_server.ex | 49 ++++++++++--------- test/logflare/source/bigquery/buffer_test.exs | 12 ++--- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/lib/logflare/source/recent_logs_server.ex b/lib/logflare/source/recent_logs_server.ex index ee45ffc9a..3e0f52135 100644 --- a/lib/logflare/source/recent_logs_server.ex +++ b/lib/logflare/source/recent_logs_server.ex @@ -51,21 +51,6 @@ defmodule Logflare.Source.RecentLogsServer do @broadcast_every 500 @pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size] - def start_link(%__MODULE__{source_id: source_id} = rls) when is_atom(source_id) do - GenServer.start_link(__MODULE__, rls, name: Source.Supervisor.via(__MODULE__, source_id)) - end - - ## Client - @spec init(RLS.t()) :: {:ok, RLS.t(), {:continue, :boot}} - def init(rls) do - Process.flag(:trap_exit, true) - - touch() - broadcast() - - {:ok, rls, {:continue, :boot}} - end - @spec push(LE.t()) :: :ok def push(%LE{source: %Source{token: source_id}} = log_event) do case Source.Supervisor.lookup(__MODULE__, source_id) do @@ -134,8 +119,13 @@ defmodule Logflare.Source.RecentLogsServer do ## Server - def handle_continue(:boot, %__MODULE__{source_id: source_id, source: source} = rls) - when is_atom(source_id) do + def start_link(%__MODULE__{source_id: source_id} = rls) when is_atom(source_id) do + GenServer.start_link(__MODULE__, rls, name: Source.Supervisor.via(__MODULE__, source_id)) + end + + ## Client + @spec init(RLS.t()) :: {:ok, RLS.t(), {:continue, :boot}} + def init(%__MODULE__{source_id: _source_id, source: source} = rls) do user = source.user_id |> Users.get() @@ -153,24 +143,37 @@ defmodule Logflare.Source.RecentLogsServer do notifications_every: source.notifications_every } + # these go into separate supervisor that blocks + children = [ + {BufferCounter, rls}, + {Schema, rls}, + {Pipeline, rls} + ] + + Supervisor.start_link(children, strategy: :one_for_one) + + touch() + broadcast() + + {:ok, rls, {:continue, :boot}} + end + + def handle_continue(:boot, rls) do children = [ {RCS, rls}, {EmailNotificationServer, rls}, {TextNotificationServer, rls}, {WebhookNotificationServer, rls}, {SlackHookServer, rls}, - {BufferCounter, rls}, - {Schema, rls}, - {Pipeline, rls}, {SearchQueryExecutor, rls}, {BillingWriter, rls} ] Supervisor.start_link(children, strategy: :one_for_one) - load_init_log_message(source_id) + load_init_log_message(rls.source_id) - Logger.info("RecentLogsServer started: #{source_id}") + Logger.info("RecentLogsServer started", source_id: rls.source_id) {:noreply, rls} end @@ -237,7 +240,7 @@ defmodule Logflare.Source.RecentLogsServer do def terminate(reason, state) do # Do Shutdown Stuff - Logger.info("Going Down - #{inspect(reason)} - #{state.source_id}", %{ + Logger.error("Going Down - #{inspect(reason)} - #{state.source_id}", %{ source_id: state.source_id }) diff --git a/test/logflare/source/bigquery/buffer_test.exs b/test/logflare/source/bigquery/buffer_test.exs index 8549ff9db..c793408f2 100644 --- a/test/logflare/source/bigquery/buffer_test.exs +++ b/test/logflare/source/bigquery/buffer_test.exs @@ -1,5 +1,6 @@ defmodule Logflare.Source.BigQuery.BufferTest do @moduledoc false + use Logflare.DataCase alias Logflare.Source.BigQuery alias Logflare.Source.RecentLogsServer alias Logflare.Sources.Counters @@ -7,10 +8,6 @@ defmodule Logflare.Source.BigQuery.BufferTest do alias Logflare.SystemMetrics.AllLogsLogged alias Logflare.LogEvent - use Logflare.DataCase - - doctest(BigQuery.BufferCounter) - setup do Goth |> stub(:fetch, fn _mod -> {:ok, %Goth.Token{token: "auth-token"}} end) @@ -18,7 +15,6 @@ defmodule Logflare.Source.BigQuery.BufferTest do start_supervised!(AllLogsLogged) start_supervised!(Counters) start_supervised!(RateCounters) - insert(:plan) user = insert(:user) @@ -26,7 +22,6 @@ defmodule Logflare.Source.BigQuery.BufferTest do rls = %RecentLogsServer{source: source, source_id: source.token} start_supervised!({RecentLogsServer, rls}, id: :source) - :timer.sleep(250) [source: source] end @@ -36,6 +31,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do BigQuery.BufferCounter.push(le) assert 1 = BigQuery.BufferCounter.len(source) + :timer.sleep(2000) end test "ack a log event", %{source: source} do @@ -46,6 +42,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do BigQuery.BufferCounter.ack(source.token, "some-uuid") assert 0 = BigQuery.BufferCounter.len(source) + :timer.sleep(2000) end test "ack a batch of log events", %{source: source} do @@ -61,6 +58,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do BigQuery.BufferCounter.ack_batch(source.token, [message]) assert 0 = BigQuery.BufferCounter.len(source) + :timer.sleep(2000) end test "push a batch of log events", %{source: source} do @@ -80,6 +78,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do BigQuery.BufferCounter.push_batch(%{source: source, batch: batch, count: 2}) assert 2 = BigQuery.BufferCounter.len(source) + :timer.sleep(2000) end test "errors when buffer is full", %{source: source} do @@ -108,5 +107,6 @@ defmodule Logflare.Source.BigQuery.BufferTest do BigQuery.BufferCounter.push_batch(%{source: source, batch: batch, count: 2}) assert %{len: 4, discarded: 2} = BigQuery.BufferCounter.get_counts(source.token) + :timer.sleep(2000) end end From 45c0f4f1d61e8ba904a8e130a966ed18fe394867 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 15 Nov 2023 14:37:55 +0800 Subject: [PATCH 2/4] chore: add back exit trapping --- lib/logflare/source/recent_logs_server.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logflare/source/recent_logs_server.ex b/lib/logflare/source/recent_logs_server.ex index 3e0f52135..e3996bfd4 100644 --- a/lib/logflare/source/recent_logs_server.ex +++ b/lib/logflare/source/recent_logs_server.ex @@ -126,6 +126,8 @@ defmodule Logflare.Source.RecentLogsServer do ## Client @spec init(RLS.t()) :: {:ok, RLS.t(), {:continue, :boot}} def init(%__MODULE__{source_id: _source_id, source: source} = rls) do + Process.flag(:trap_exit, true) + user = source.user_id |> Users.get() From 476dcb975dea19a64a0a3c993610b193cc6acf1e Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 15 Nov 2023 14:39:24 +0800 Subject: [PATCH 3/4] chore: Logger.info when going down --- lib/logflare/source/recent_logs_server.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logflare/source/recent_logs_server.ex b/lib/logflare/source/recent_logs_server.ex index e3996bfd4..545329b12 100644 --- a/lib/logflare/source/recent_logs_server.ex +++ b/lib/logflare/source/recent_logs_server.ex @@ -242,7 +242,7 @@ defmodule Logflare.Source.RecentLogsServer do def terminate(reason, state) do # Do Shutdown Stuff - Logger.error("Going Down - #{inspect(reason)} - #{state.source_id}", %{ + Logger.info("[#{__MODULE__}] Going Down - #{inspect(reason)} - #{state.source_id}", %{ source_id: state.source_id }) From 2bc6de450e615106110c707c87c6930554553af8 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Wed, 15 Nov 2023 19:55:06 +0800 Subject: [PATCH 4/4] chore: fix flaky test --- test/logflare_web/controllers/endpoints_controller_test.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/logflare_web/controllers/endpoints_controller_test.exs b/test/logflare_web/controllers/endpoints_controller_test.exs index e22455f0a..4fcd21e81 100644 --- a/test/logflare_web/controllers/endpoints_controller_test.exs +++ b/test/logflare_web/controllers/endpoints_controller_test.exs @@ -206,7 +206,6 @@ defmodule LogflareWeb.EndpointsControllerTest do # render as unix microsecond assert inspect(timestamp) |> String.length() == 16 - assert "16" <> _ = inspect(timestamp) assert conn.halted == false # test a logs ui query