Skip to content

Commit

Permalink
fix: Prevent match error on GenCounter (#1243)
Browse files Browse the repository at this point in the history
* Handles match error on GenCounter in a more grafecul way
* Adds Logger metadata on multiple processes
  • Loading branch information
filipecabaco authored Dec 9, 2024
1 parent 22c1ba8 commit 378b030
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 83 deletions.
1 change: 1 addition & 0 deletions lib/extensions/postgres_cdc_rls/worker_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule Extensions.PostgresCdcRls.WorkerSupervisor do

@impl true
def init(%{"id" => tenant} = args) when is_binary(tenant) do
Logger.metadata(external_id: tenant, project: tenant)
unless Api.get_tenant_by_external_id(tenant, :primary), do: raise(Exception)

tid_args = Map.merge(args, %{"subscribers_tid" => :ets.new(__MODULE__, [:public, :bag])})
Expand Down
4 changes: 3 additions & 1 deletion lib/extensions/postgres_cdc_stream/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ defmodule Extensions.PostgresCdcStream.Replication do
def stop(pid), do: GenServer.stop(pid)

@impl true
def init(args) do
def init(%{tenant: id} = args) do
Logger.metadata(external_id: id, project: id)

tid = :ets.new(__MODULE__, [:public, :set])
state = %{tid: tid, step: nil, ts: nil}
{:ok, Map.merge(args, state)}
Expand Down
3 changes: 2 additions & 1 deletion lib/extensions/postgres_cdc_stream/worker_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Extensions.PostgresCdcStream.WorkerSupervisor do

@impl true
def init(%{"id" => tenant} = args) when is_binary(tenant) do
unless Api.get_tenant_by_external_id(tenant, :primary), do: raise(Exception)
if !Api.get_tenant_by_external_id(tenant, :primary), do: raise(Exception)
Logger.metadata(external_id: tenant, project: tenant)

children = [
%{
Expand Down
37 changes: 19 additions & 18 deletions lib/realtime/gen_counter/gen_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ defmodule Realtime.GenCounter do
@doc """
Creates a new counter from any Erlang term.
"""
@spec new(term) :: {:ok, {:write_concurrency, reference()}} | {:error, term()}
@spec new({atom(), atom(), term()}) ::
{:ok, {:write_concurrency, reference()}} | {:error, term()}
def new(term) do
id = :erlang.phash2(term)

Expand Down Expand Up @@ -70,7 +71,7 @@ defmodule Realtime.GenCounter do
@spec add(term(), integer()) :: :ok | :error
def add(term, count) when is_integer(count) do
case find_counter(term) do
{:ok, counter_ref} ->
{:ok, counter_ref, _pid} ->
:counters.add(counter_ref, 1, count)

err ->
Expand All @@ -95,7 +96,7 @@ defmodule Realtime.GenCounter do
@spec sub(term(), integer()) :: :ok | :error
def sub(term, count) when is_integer(count) do
case find_counter(term) do
{:ok, counter_ref} ->
{:ok, counter_ref, _pid} ->
:counters.sub(counter_ref, 1, count)

err ->
Expand All @@ -111,7 +112,7 @@ defmodule Realtime.GenCounter do
@spec put(term(), integer()) :: :ok | :error
def put(term, count) when is_integer(count) do
case find_counter(term) do
{:ok, counter_ref} ->
{:ok, counter_ref, _pid} ->
:counters.put(counter_ref, 1, count)

err ->
Expand All @@ -127,7 +128,7 @@ defmodule Realtime.GenCounter do
@spec info(term()) :: %{memory: integer(), size: integer()} | :error
def info(term) do
case find_counter(term) do
{:ok, counter_ref} ->
{:ok, counter_ref, _pid} ->
:counters.info(counter_ref)

_err ->
Expand All @@ -144,7 +145,7 @@ defmodule Realtime.GenCounter do
{:ok, integer()} | {:error, :counter_not_found}
def get(term) do
case find_counter(term) do
{:ok, counter_ref} ->
{:ok, counter_ref, _pid} ->
count = :counters.get(counter_ref, 1)
{:ok, count}

Expand All @@ -162,14 +163,23 @@ defmodule Realtime.GenCounter do
end
end

@spec find_counter(term) ::
{:ok, :counters.counters_ref(), pid()} | {:error, :counter_not_found}
def find_counter(term) do
id = :erlang.phash2(term)

case Registry.lookup(Realtime.Registry.Unique, {__MODULE__, :counter, id}) do
[{pid, counter_ref}] -> {:ok, counter_ref, pid}
_error -> {:error, :counter_not_found}
end
end

# Callbacks

@impl true
def init(args) do
id = Keyword.get(args, :id)

id = Keyword.fetch!(args, :id)
state = %__MODULE__{id: id, counters: []}

{:ok, state}
end

Expand All @@ -192,13 +202,4 @@ defmodule Realtime.GenCounter do
_error -> {:error, :worker_not_found}
end
end

defp find_counter(term) do
id = :erlang.phash2(term)

case Registry.lookup(Realtime.Registry.Unique, {__MODULE__, :counter, id}) do
[{_pid, counter_ref}] -> {:ok, counter_ref}
_error -> {:error, :counter_not_found}
end
end
end
89 changes: 52 additions & 37 deletions lib/realtime/rate_counter/rate_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ defmodule Realtime.RateCounter do
event_name: [@app_name] ++ [:rate_counter],
measurements: %{sum: 0},
metadata: %{}
}
},
counter_pid: nil

@type t :: %__MODULE__{
id: term(),
Expand All @@ -51,13 +52,14 @@ defmodule Realtime.RateCounter do
event_name: :telemetry.event_name(),
measurements: :telemetry.event_measurements(),
metadata: :telemetry.event_metadata()
}
},
counter_pid: pid()
}

@spec start_link([keyword()]) :: {:ok, pid()} | {:error, {:already_started, pid()}}
def start_link(args) do
id = Keyword.get(args, :id)
unless id, do: raise("Supply an identifier to start a counter!")
if !id, do: raise("Supply an identifier to start a counter!")

GenServer.start_link(__MODULE__, args,
name: {:via, Registry, {Realtime.Registry.Unique, {__MODULE__, :rate_counter, id}}}
Expand Down Expand Up @@ -93,51 +95,59 @@ defmodule Realtime.RateCounter do

@impl true
def init(args) do
id = Keyword.get(args, :id)
id = Keyword.fetch!(args, :id)
telem_opts = Keyword.get(args, :telemetry)
every = Keyword.get(args, :tick, @tick)
max_bucket_len = Keyword.get(args, :max_bucket_len, @max_bucket_len)
idle_shutdown_ms = Keyword.get(args, :idle_shutdown, @idle_shutdown)

telem_opts = Keyword.get(args, :telemetry)

telemetry =
if telem_opts,
do: %{
emit: true,
event_name: [@app_name] ++ [:rate_counter] ++ telem_opts.event_name,
measurements: Map.merge(%{sum: 0}, telem_opts.measurements),
metadata: Map.merge(%{id: id}, telem_opts.metadata)
},
else: %{emit: false}

Logger.info("Starting #{__MODULE__} for: #{inspect(id)}")

ensure_counter_started(id)
case ensure_counter_started(id) do
{:ok, _ref, pid} ->
Process.monitor(pid)

ticker = tick(0)
telemetry =
if telem_opts do
Logger.metadata(telem_opts.metadata)

idle_shutdown_ref =
unless idle_shutdown_ms == :infinity, do: shutdown_after(idle_shutdown_ms), else: nil
%{
emit: true,
event_name: [@app_name] ++ [:rate_counter] ++ telem_opts.event_name,
measurements: Map.merge(%{sum: 0}, telem_opts.measurements),
metadata: Map.merge(%{id: id}, telem_opts.metadata)
}
else
%{emit: false}
end

ticker = tick(0)

idle_shutdown_ref =
if idle_shutdown_ms != :infinity, do: shutdown_after(idle_shutdown_ms), else: nil

state = %__MODULE__{
id: id,
tick: every,
tick_ref: ticker,
max_bucket_len: max_bucket_len,
idle_shutdown: idle_shutdown_ms,
idle_shutdown_ref: idle_shutdown_ref,
telemetry: telemetry,
counter_pid: pid
}

state = %__MODULE__{
id: id,
tick: every,
tick_ref: ticker,
max_bucket_len: max_bucket_len,
idle_shutdown: idle_shutdown_ms,
idle_shutdown_ref: idle_shutdown_ref,
telemetry: telemetry
}
Cachex.put!(@cache, id, state)

Cachex.put!(@cache, id, state)
{:ok, state}

{:ok, state}
_ ->
{:shutdown, :kill, %{}}
end
end

@impl true
def handle_info(:tick, state) do
Process.cancel_timer(state.tick_ref)

{:ok, count} = GenCounter.get(state.id)
:ok = GenCounter.put(state.id, 0)

Expand Down Expand Up @@ -178,6 +188,13 @@ defmodule Realtime.RateCounter do
{:stop, :normal, state}
end

def handle_info(
{:DOWN, _, :process, counter_pid, _},
%{counter_pid: counter_pid} = state
) do
{:stop, :shutdown, state}
end

defp tick(every) do
Process.send_after(self(), :tick, every)
end
Expand All @@ -187,9 +204,7 @@ defmodule Realtime.RateCounter do
end

defp ensure_counter_started(id) do
case GenCounter.get(id) do
{:ok, _count} -> :ok
{:error, :counter_not_found} -> GenCounter.new(id)
end
GenCounter.new(id)
GenCounter.find_counter(id)
end
end
12 changes: 9 additions & 3 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,15 @@ defmodule Realtime.Tenants.Connect do
spec = {__MODULE__, [tenant_id: tenant_id] ++ opts}

case DynamicSupervisor.start_child(supervisor, spec) do
{:ok, _} -> get_status(tenant_id)
{:error, {:already_started, _}} -> get_status(tenant_id)
_ -> {:error, :tenant_database_unavailable}
{:ok, _} ->
get_status(tenant_id)

{:error, {:already_started, _}} ->
get_status(tenant_id)

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:error, :tenant_database_unavailable}
end
end

Expand Down
2 changes: 2 additions & 0 deletions lib/realtime_web/plugs/assign_tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ defmodule RealtimeWeb.Plugs.AssignTenant do
def call(%Plug.Conn{host: host} = conn, _opts) do
with {:ok, external_id} <- Database.get_external_id(host),
%Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id) do
Logger.metadata(external_id: external_id, project: external_id)

tenant =
tenant
|> tap(&initialize_counters/1)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.65",
version: "2.33.66",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
7 changes: 1 addition & 6 deletions test/e2e/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
createClient,
SupabaseClient,
RealtimeChannel,
} from "npm:@supabase/supabase-js@latest";
} from "npm:@supabase/supabase-js@2.47.3";
import {
assert,
assertEquals,
Expand Down Expand Up @@ -125,7 +125,6 @@ describe("postgres changes extension", () => {
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

let result: Array<any> = [];
let topic = crypto.randomUUID();
Expand Down Expand Up @@ -164,7 +163,6 @@ describe("postgres changes extension", () => {
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

let result: Array<any> = [];
let topic = crypto.randomUUID();
Expand Down Expand Up @@ -207,7 +205,6 @@ describe("postgres changes extension", () => {
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

let result: Array<any> = [];
let topic = crypto.randomUUID();
Expand Down Expand Up @@ -275,7 +272,6 @@ describe("authorization check", () => {
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

const channel = supabase
.channel(crypto.randomUUID(), { config: { ...config, private: true } })
Expand Down Expand Up @@ -303,7 +299,6 @@ describe("broadcast changes", () => {
"[email protected]",
"test_test"
);
await supabase.realtime.setAuth(accessToken);

const channel = supabase
.channel(`event:${id}`, { config: { ...config, private: true } })
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule Realtime.DatabaseTest do
assert {:error, %DBConnection.ConnectionError{reason: :queue_timeout}} =
Task.async(fn ->
Database.transaction(db_conn, fn conn ->
Postgrex.query!(conn, "SELECT pg_sleep(5)", [])
Postgrex.query!(conn, "SELECT pg_sleep(6)", [])
end)
end)
|> Task.await(15000)
Expand Down
Loading

0 comments on commit 378b030

Please sign in to comment.