Skip to content

Commit

Permalink
init: partition PubSubRates.Buffers and PubSubRates.Inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
chasers committed Oct 23, 2024
1 parent 19e58e0 commit 80d3307
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 34 deletions.
45 changes: 42 additions & 3 deletions lib/logflare/pubsub_rates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ defmodule Logflare.PubSubRates do
PubSubRates.Cache,
{PartitionSupervisor,
child_spec: PubSubRates.Rates,
name: PubSubRates.Supervisors,
name: PubSubRates.Rates.Supervisors,
partitions: partitions(),
with_arguments: fn [opts], partition ->
[Keyword.put(opts, :partition, Integer.to_string(partition))]
end},
PubSubRates.Buffers,
PubSubRates.Inserts
{PartitionSupervisor,
child_spec: PubSubRates.Buffers,
name: PubSubRates.Buffers.Supervisors,
partitions: partitions(),
with_arguments: fn [opts], partition ->
[Keyword.put(opts, :partition, Integer.to_string(partition))]
end},
{PartitionSupervisor,
child_spec: PubSubRates.Inserts,
name: PubSubRates.Inserts.Supervisors,
partitions: partitions(),
with_arguments: fn [opts], partition ->
[Keyword.put(opts, :partition, Integer.to_string(partition))]
end}
]

Supervisor.init(children, strategy: :one_for_one)
Expand All @@ -47,6 +59,12 @@ defmodule Logflare.PubSubRates do
def subscribe("rates" <> _partition = topic),
do: PubSub.subscribe(Logflare.PubSub, topic)

def subscribe("buffers" <> _partition = topic),
do: PubSub.subscribe(Logflare.PubSub, topic)

def subscribe("inserts" <> _partition = topic),
do: PubSub.subscribe(Logflare.PubSub, topic)

def subscribe(topics) when is_list(topics), do: Enum.map(topics, &subscribe/1)

def subscribe(topic) when topic in @topics do
Expand All @@ -61,6 +79,13 @@ defmodule Logflare.PubSubRates do
| {binary(), integer(), any(), any()}
) :: :ok | {:error, any()}

def global_broadcast_rate({"buffers" = topic, source_id, backend_id, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, {source_id, backend_id})

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({topic, source_id, _backend_id, _payload} = data)
when topic in @topics and is_integer(source_id) do
Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data)
Expand All @@ -73,6 +98,20 @@ defmodule Logflare.PubSubRates do
Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({"buffers" = topic, source_token, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, source_token)

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({"inserts" = topic, source_token, _payload} = data)
when topic in @topics do
partitioned_topic = partitioned_topic(topic, source_token)

Phoenix.PubSub.broadcast(Logflare.PubSub, partitioned_topic, data)
end

def global_broadcast_rate({topic, _source_token, _payload} = data) when topic in @topics do
Phoenix.PubSub.broadcast(Logflare.PubSub, topic, data)
end
Expand Down
41 changes: 28 additions & 13 deletions lib/logflare/pubsub_rates/buffers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,42 @@ defmodule Logflare.PubSubRates.Buffers do

use GenServer

@topic "buffers"

def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end

def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
partition = get_partition_opt(args)
name = :"#{__MODULE__}#{partition}"

GenServer.start_link(__MODULE__, args, name: name)
end

@impl GenServer
def init(_state) do
PubSubRates.subscribe("buffers")
{:ok, %{}}
def init(args) do
partition = get_partition_opt(args)
topic = @topic <> partition
PubSubRates.subscribe(topic)
{:ok, args}
end

@impl GenServer
def handle_info({"buffers", source_id, backend_id, buffers}, state)
when is_integer(source_id) and is_map(buffers) do
Cache.cache_buffers(source_id, backend_id, buffers)
def handle_info({@topic, _source_token, _buffers} = msg, state) do
{:noreply, state}
end

# TODO: remove in >v1.8.x
@impl GenServer
def handle_info(_, state) do
# don't handle old format of 3-elem or 4-elem tuples.
def handle_info({@topic, source_id, backend_id, buffers}, state) do
Cache.cache_buffers(source_id, backend_id, buffers)
{:noreply, state}
end

defp get_partition_opt(args) do
Keyword.get(args, :partition, "0")
end
end
35 changes: 26 additions & 9 deletions lib/logflare/pubsub_rates/inserts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,38 @@ defmodule Logflare.PubSubRates.Inserts do

use GenServer

@topic "inserts"

def child_spec(args) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end

def start_link(args \\ []) do
GenServer.start_link(
__MODULE__,
args,
name: __MODULE__
)
partition = get_partition_opt(args)
name = :"#{__MODULE__}#{partition}"

GenServer.start_link(__MODULE__, args, name: name)
end

def init(state) do
PubSubRates.subscribe("inserts")
{:ok, state}
def init(args) do
partition = get_partition_opt(args)
topic = @topic <> partition
PubSubRates.subscribe(topic)
{:ok, args}
end

def handle_info({"inserts", source_token, inserts}, state) do
def handle_info({@topic, source_token, inserts}, state) do
Cache.cache_inserts(source_token, inserts)
{:noreply, state}
end

defp get_partition_opt(args) do
Keyword.get(args, :partition, "0")
end
end
20 changes: 11 additions & 9 deletions test/logflare/cluster_pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule Logflare.ClusterPubSubTest do
end

test "subscribe/1 inserts", %{source: %{token: source_token}} do
PubSubRates.subscribe("inserts")
PubSubRates.partitioned_topic("inserts", source_token)
|> PubSubRates.subscribe()

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({"inserts", source_token, %{data: "some val"}})
Expand All @@ -39,24 +40,25 @@ defmodule Logflare.ClusterPubSubTest do
end

test "subscribe/1 buffers", %{source: %{id: source_id}} do
PubSubRates.subscribe("buffers")
backend_id = 1

PubSubRates.partitioned_topic("buffers", {source_id, backend_id})
|> PubSubRates.subscribe()

TestUtils.retry_assert(fn ->
PubSubRates.global_broadcast_rate({"buffers", source_id, nil, %{data: "some val"}})
assert_received {"buffers", ^source_id, nil, %{data: "some val"}}
PubSubRates.global_broadcast_rate({"buffers", source_id, backend_id, %{data: "some val"}})
assert_received {"buffers", ^source_id, backend_id, %{data: "some val"}}
end)
end

test "buffers 3-elem tuple is no op", %{source: source} do
Phoenix.PubSub.broadcast(
Logflare.PubSub,
"buffers",
{"buffers", source.token, %{Node.self() => %{len: 5}}}
)
PubSubRates.global_broadcast_rate({"buffers", source.token, %{Node.self() => %{len: 5}}})

:timer.sleep(100)
assert PubSubRates.Cache.get_cluster_buffers(source.id, nil) == 0

PubSubRates.global_broadcast_rate({"buffers", source.id, nil, %{Node.self() => %{len: 5}}})

:timer.sleep(100)
assert PubSubRates.Cache.get_cluster_buffers(source.id, nil) == 5
end
Expand Down

0 comments on commit 80d3307

Please sign in to comment.