Skip to content

Commit

Permalink
Merge pull request #1832 from Logflare/fix/add-postgres-channel-prefix
Browse files Browse the repository at this point in the history
fix: add channel prefix for listen/notify channels
  • Loading branch information
Ziinc authored Nov 16, 2023
2 parents ff7688e + 25f8235 commit 29a1e24
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions lib/logflare/cluster/postgres_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ defmodule Logflare.Cluster.PostgresStrategy do
* `heartbeat_interval` - The interval at which to send heartbeat messages in milliseconds (optional; default: 5_000)
The magic cookie is used as the channel name.
The magic cookie is used as the channel name and will be prefixed with "cluster_"
References: https://github.com/supabase/supavisor/blob/main/lib/cluster/strategy/postgres.ex
"""
use GenServer

alias Cluster.Strategy
alias Cluster.Logger
alias Postgrex, as: P
@channel_prefix "cluster_"

def start_link(args), do: GenServer.start_link(__MODULE__, args)

Expand Down Expand Up @@ -50,7 +51,8 @@ defmodule Logflare.Cluster.PostgresStrategy do
def handle_continue(:connect, state) do
with {:ok, conn} <- P.start_link(state.meta.opts.()),
{:ok, conn_notif} <- P.Notifications.start_link(state.meta.opts.()),
{_, _} <- P.Notifications.listen(conn_notif, state.config[:channel_name]) do
{_, _} <-
P.Notifications.listen(conn_notif, @channel_prefix <> state.config[:channel_name]) do
Logger.info(state.topology, "Connected to Postgres database")

meta = %{
Expand All @@ -70,7 +72,13 @@ defmodule Logflare.Cluster.PostgresStrategy do

def handle_info(:heartbeat, state) do
Process.cancel_timer(state.meta.heartbeat_ref)
P.query(state.meta.conn, "NOTIFY #{state.config[:channel_name]}, '#{node()}'", [])

P.query(
state.meta.conn,
"NOTIFY #{@channel_prefix <> state.config[:channel_name]}, '#{node()}'",
[]
)

ref = heartbeat(state.config[:heartbeat_interval])
{:noreply, put_in(state.meta.heartbeat_ref, ref)}
end
Expand Down

0 comments on commit 29a1e24

Please sign in to comment.