From 25f823528d5bda0ccdb2b75dd2e7bd679ee8407f Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 16 Nov 2023 12:23:59 +0800 Subject: [PATCH] fix: add channel prefix for listen/notify channels --- lib/logflare/cluster/postgres_strategy.ex | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/logflare/cluster/postgres_strategy.ex b/lib/logflare/cluster/postgres_strategy.ex index 5e38b9dbe..88e4ef741 100644 --- a/lib/logflare/cluster/postgres_strategy.ex +++ b/lib/logflare/cluster/postgres_strategy.ex @@ -12,7 +12,7 @@ defmodule Logflare.Cluster.PostgresStrategy do * `heartbeat_interval` - The interval at which to send heartbeat messages in milliseconds (optional; default: 5_000) - The magic cookie is used as the channel name. + The magic cookie is used as the channel name and will be prefixed with "cluster_" References: https://github.com/supabase/supavisor/blob/main/lib/cluster/strategy/postgres.ex """ use GenServer @@ -20,6 +20,7 @@ defmodule Logflare.Cluster.PostgresStrategy do alias Cluster.Strategy alias Cluster.Logger alias Postgrex, as: P + @channel_prefix "cluster_" def start_link(args), do: GenServer.start_link(__MODULE__, args) @@ -50,7 +51,8 @@ defmodule Logflare.Cluster.PostgresStrategy do def handle_continue(:connect, state) do with {:ok, conn} <- P.start_link(state.meta.opts.()), {:ok, conn_notif} <- P.Notifications.start_link(state.meta.opts.()), - {_, _} <- P.Notifications.listen(conn_notif, state.config[:channel_name]) do + {_, _} <- + P.Notifications.listen(conn_notif, @channel_prefix <> state.config[:channel_name]) do Logger.info(state.topology, "Connected to Postgres database") meta = %{ @@ -70,7 +72,13 @@ defmodule Logflare.Cluster.PostgresStrategy do def handle_info(:heartbeat, state) do Process.cancel_timer(state.meta.heartbeat_ref) - P.query(state.meta.conn, "NOTIFY #{state.config[:channel_name]}, '#{node()}'", []) + + P.query( + state.meta.conn, + "NOTIFY #{@channel_prefix <> state.config[:channel_name]}, '#{node()}'", + [] + ) + ref = heartbeat(state.config[:heartbeat_interval]) {:noreply, put_in(state.meta.heartbeat_ref, ref)} end