Skip to content

Commit

Permalink
Merge pull request #1825 from Logflare/fix/buffer-counter-not-found-bug
Browse files Browse the repository at this point in the history
fix: :buffer_counter_not_found bug
  • Loading branch information
Ziinc authored Nov 15, 2023
2 parents c0a6243 + 2bc6de4 commit a8c4636
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
51 changes: 28 additions & 23 deletions lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,6 @@ defmodule Logflare.Source.RecentLogsServer do
@broadcast_every 500
@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

def start_link(%__MODULE__{source_id: source_id} = rls) when is_atom(source_id) do
GenServer.start_link(__MODULE__, rls, name: Source.Supervisor.via(__MODULE__, source_id))
end

## Client
@spec init(RLS.t()) :: {:ok, RLS.t(), {:continue, :boot}}
def init(rls) do
Process.flag(:trap_exit, true)

touch()
broadcast()

{:ok, rls, {:continue, :boot}}
end

@spec push(LE.t()) :: :ok
def push(%LE{source: %Source{token: source_id}} = log_event) do
case Source.Supervisor.lookup(__MODULE__, source_id) do
Expand Down Expand Up @@ -134,8 +119,15 @@ defmodule Logflare.Source.RecentLogsServer do

## Server

def handle_continue(:boot, %__MODULE__{source_id: source_id, source: source} = rls)
when is_atom(source_id) do
def start_link(%__MODULE__{source_id: source_id} = rls) when is_atom(source_id) do
GenServer.start_link(__MODULE__, rls, name: Source.Supervisor.via(__MODULE__, source_id))
end

## Client
@spec init(RLS.t()) :: {:ok, RLS.t(), {:continue, :boot}}
def init(%__MODULE__{source_id: _source_id, source: source} = rls) do
Process.flag(:trap_exit, true)

user =
source.user_id
|> Users.get()
Expand All @@ -153,24 +145,37 @@ defmodule Logflare.Source.RecentLogsServer do
notifications_every: source.notifications_every
}

# these go into separate supervisor that blocks
children = [
{BufferCounter, rls},
{Schema, rls},
{Pipeline, rls}
]

Supervisor.start_link(children, strategy: :one_for_one)

touch()
broadcast()

{:ok, rls, {:continue, :boot}}
end

def handle_continue(:boot, rls) do
children = [
{RCS, rls},
{EmailNotificationServer, rls},
{TextNotificationServer, rls},
{WebhookNotificationServer, rls},
{SlackHookServer, rls},
{BufferCounter, rls},
{Schema, rls},
{Pipeline, rls},
{SearchQueryExecutor, rls},
{BillingWriter, rls}
]

Supervisor.start_link(children, strategy: :one_for_one)

load_init_log_message(source_id)
load_init_log_message(rls.source_id)

Logger.info("RecentLogsServer started: #{source_id}")
Logger.info("RecentLogsServer started", source_id: rls.source_id)
{:noreply, rls}
end

Expand Down Expand Up @@ -237,7 +242,7 @@ defmodule Logflare.Source.RecentLogsServer do

def terminate(reason, state) do
# Do Shutdown Stuff
Logger.info("Going Down - #{inspect(reason)} - #{state.source_id}", %{
Logger.info("[#{__MODULE__}] Going Down - #{inspect(reason)} - #{state.source_id}", %{
source_id: state.source_id
})

Expand Down
12 changes: 6 additions & 6 deletions test/logflare/source/bigquery/buffer_test.exs
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
defmodule Logflare.Source.BigQuery.BufferTest do
@moduledoc false
use Logflare.DataCase
alias Logflare.Source.BigQuery
alias Logflare.Source.RecentLogsServer
alias Logflare.Sources.Counters
alias Logflare.Sources.RateCounters
alias Logflare.SystemMetrics.AllLogsLogged
alias Logflare.LogEvent

use Logflare.DataCase

doctest(BigQuery.BufferCounter)

setup do
Goth
|> stub(:fetch, fn _mod -> {:ok, %Goth.Token{token: "auth-token"}} end)

start_supervised!(AllLogsLogged)
start_supervised!(Counters)
start_supervised!(RateCounters)

insert(:plan)
user = insert(:user)

source = insert(:source, user: user)
rls = %RecentLogsServer{source: source, source_id: source.token}
start_supervised!({RecentLogsServer, rls}, id: :source)

:timer.sleep(250)
[source: source]
end

Expand All @@ -36,6 +31,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do
BigQuery.BufferCounter.push(le)

assert 1 = BigQuery.BufferCounter.len(source)
:timer.sleep(2000)
end

test "ack a log event", %{source: source} do
Expand All @@ -46,6 +42,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do
BigQuery.BufferCounter.ack(source.token, "some-uuid")

assert 0 = BigQuery.BufferCounter.len(source)
:timer.sleep(2000)
end

test "ack a batch of log events", %{source: source} do
Expand All @@ -61,6 +58,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do
BigQuery.BufferCounter.ack_batch(source.token, [message])

assert 0 = BigQuery.BufferCounter.len(source)
:timer.sleep(2000)
end

test "push a batch of log events", %{source: source} do
Expand All @@ -80,6 +78,7 @@ defmodule Logflare.Source.BigQuery.BufferTest do
BigQuery.BufferCounter.push_batch(%{source: source, batch: batch, count: 2})

assert 2 = BigQuery.BufferCounter.len(source)
:timer.sleep(2000)
end

test "errors when buffer is full", %{source: source} do
Expand Down Expand Up @@ -108,5 +107,6 @@ defmodule Logflare.Source.BigQuery.BufferTest do
BigQuery.BufferCounter.push_batch(%{source: source, batch: batch, count: 2})

assert %{len: 4, discarded: 2} = BigQuery.BufferCounter.get_counts(source.token)
:timer.sleep(2000)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ defmodule LogflareWeb.EndpointsControllerTest do

# render as unix microsecond
assert inspect(timestamp) |> String.length() == 16
assert "16" <> _ = inspect(timestamp)
assert conn.halted == false

# test a logs ui query
Expand Down

0 comments on commit a8c4636

Please sign in to comment.