From ac7347d5ce4a5f54fa6da43dc3ec205a19856ffb Mon Sep 17 00:00:00 2001 From: Paul Swartz Date: Wed, 20 Sep 2023 13:32:03 -0400 Subject: [PATCH] fix: make sure we aren't returning empty results when updating When we were only doing big updates, this was okay because we were not making updates frequently. With the MqttMediator, we're more frequently making smaller updates. --- apps/state/lib/state/server.ex | 138 ++++++++++++++------------ apps/state/test/state/server_test.exs | 57 +++++++++-- 2 files changed, 124 insertions(+), 71 deletions(-) diff --git a/apps/state/lib/state/server.ex b/apps/state/lib/state/server.ex index 3d99652d3..c0aaae812 100644 --- a/apps/state/lib/state/server.ex +++ b/apps/state/lib/state/server.ex @@ -187,9 +187,6 @@ defmodule State.Server do @impl State.Server def post_load_hook(structs), do: structs - @impl State.Server - def pre_insert_hook(item), do: [item] - @impl Events.Server def handle_event({:fetch, unquote(opts[:fetched_filename])}, body, _, state) do case handle_call({:new_state, body}, nil, state) do @@ -227,7 +224,6 @@ defmodule State.Server do new_state: 2, post_commit_hook: 0, post_load_hook: 1, - pre_insert_hook: 1, select: 1, select: 2, select_limit: 2, @@ -345,26 +341,40 @@ defmodule State.Server do ) if enum do - with {:atomic, result} <- :mnesia.transaction(&create_children/2, [enum, module], 0) do - {:ok, result} + {keys_to_delete, updates} = + case enum do + {:partial, updates} -> + key_index = module.key_index() + {Enum.map(updates, &Map.get(&1, key_index)), updates} + + _ -> + {:all, enum} + end + + recordable = module.recordable() + + updates = + if function_exported?(module, :pre_insert_hook, 1) do + Stream.flat_map(updates, &module.pre_insert_hook/1) + else + updates + end + + updates = Enum.map(updates, &recordable.to_record/1) + + with {:atomic, :ok} <- + :mnesia.transaction(&create_children/3, [keys_to_delete, updates, module], 10) do + {:ok, length(updates)} end else {:ok, 0} end end - defp create_children(enum, module) do - :mnesia.write_lock_table(module) - - {keys_to_delete, updates} = - case enum do - {:partial, updates} -> - key_index = module.key_index() - {Enum.map(updates, &Map.get(&1, key_index)), updates} - - _ -> - {:all, enum} - end + defp create_children(keys_to_delete, updates, module) do + if keys_to_delete == :all do + :mnesia.write_lock_table(module) + end delete_all = fn -> all_keys = @@ -378,14 +388,7 @@ defmodule State.Server do end write_new = fn -> - recordable = module.recordable() - - updates - |> Stream.flat_map(&module.pre_insert_hook/1) - |> Enum.reduce(0, fn item, sum -> - :mnesia.write(module, recordable.to_record(item), :write) - sum + 1 - end) + :lists.foreach(&:mnesia.write(module, &1, :write), updates) end :ok = @@ -396,15 +399,14 @@ defmodule State.Server do end ) - write_count = - debug_time( - write_new, - fn milliseconds -> - "write_new #{module} #{inspect(self())} took #{milliseconds}ms" - end - ) + debug_time( + write_new, + fn milliseconds -> + "write_new #{module} #{inspect(self())} took #{milliseconds}ms" + end + ) - write_count + :ok end def size(module) do @@ -412,9 +414,18 @@ defmodule State.Server do end def all(module, opts) do - module - |> :ets.tab2list() - |> to_structs(module, opts) + {:atomic, results} = + :mnesia.transaction(fn -> + :mnesia.foldl( + fn result, acc -> + [result | acc] + end, + [], + module + ) + end) + + to_structs(results, module, opts) rescue ArgumentError -> # if the table is being rebuilt, we re-try to get the data @@ -422,33 +433,32 @@ defmodule State.Server do end def all_keys(module) do - :mnesia.ets(fn -> - :mnesia.all_keys(module) - end) + {:atomic, results} = :mnesia.transaction(&:mnesia.all_keys/1, [module]) + + results end def by_index(values, module, indices, opts) do - indices - |> build_read_fun(module) - |> :lists.flatmap(values) - |> to_structs(module, opts) - catch - :exit, {:aborted, {_, [^module | _]}} -> - by_index(values, module, indices, opts) + read_fun = build_read_fun(indices, module) + + {:atomic, results} = :mnesia.transaction(&:lists.flatmap/2, [read_fun, values]) + + to_structs(results, module, opts) end defp build_read_fun({key_index, key_index}, module) do - &:mnesia.dirty_read(module, &1) + &:mnesia.read(module, &1) end defp build_read_fun({index, _key_index}, module) do - &:mnesia.dirty_index_read(module, &1, index) + &:mnesia.index_read(module, &1, index) end def by_index_match(match, module, index, opts) do - module - |> :mnesia.dirty_index_match_object(match, index) - |> to_structs(module, opts) + {:atomic, results} = + :mnesia.transaction(&:mnesia.index_match_object/4, [module, match, index, :read]) + + to_structs(results, module, opts) end def matchers_to_selectors(module, matchers) do @@ -470,15 +480,17 @@ defmodule State.Server do end def select(module, matchers, index) when is_list(matchers) and is_atom(index) do - :lists.flatmap(&module.match(&1, index), matchers) + {:atomic, results} = + :mnesia.transaction(fn -> + :lists.flatmap(&module.match(&1, index), matchers) + end) + + results end def select_with_selectors(module, selectors) when is_atom(module) and is_list(selectors) do - fn -> - :mnesia.select(module, selectors) - end - |> :mnesia.async_dirty() - |> to_structs(module, []) + {:atomic, results} = :mnesia.transaction(&:mnesia.select/2, [module, selectors]) + to_structs(results, module, []) end def select_limit(module, matchers, num_objects) do @@ -487,13 +499,11 @@ defmodule State.Server do end def select_limit_with_selectors(module, selectors, num_objects) do - case :mnesia.async_dirty(fn -> - :mnesia.select(module, selectors, num_objects, :read) - end) do - :"$end_of_table" -> + case :mnesia.transaction(&:mnesia.select/4, [module, selectors, num_objects, :read]) do + {:atomic, :"$end_of_table"} -> [] - {records, _cont} -> + {:atomic, {records, _cont}} -> to_structs(records, module, []) end end @@ -502,7 +512,7 @@ defmodule State.Server do recordable = module.recordable() records - |> Enum.map(&recordable.from_record(&1)) + |> Enum.map(&recordable.from_record/1) |> module.post_load_hook() |> State.all(opts) end diff --git a/apps/state/test/state/server_test.exs b/apps/state/test/state/server_test.exs index cd9f62aca..87d1ba9b6 100644 --- a/apps/state/test/state/server_test.exs +++ b/apps/state/test/state/server_test.exs @@ -148,20 +148,63 @@ defmodule State.ServerTest do Server.new_state([value]) assert Server.by_id(1) == [value] - task = - Task.async(fn -> - [value] - |> Stream.cycle() - |> Server.new_state(:infinity) - end) + task = fn task -> + Server.new_state([value], :infinity) + task.(task) + end + + task_pid = spawn(fn -> task.(task) end) + + Process.sleep(10) + + for _i <- Range.new(0, 1_000) do + assert Server.by_id(1) != [] + end + + for _i <- Range.new(0, 1_000) do + assert Server.all() != [] + end + + # wait for everything to shut down + Process.flag(:trap_exit, true) + Process.exit(task_pid, :brutal_kill) + example_pid = GenServer.whereis(Server) + + case example_pid do + nil -> + :ok + + pid -> + Process.exit(pid, :brutal_kill) + + receive do + {:EXIT, ^pid, :brutal_kill} -> :ok + end + end + + Process.flag(:trap_exit, false) + end + + test "new_state does not show an empty state when sending partial updates" do + value = %Example{id: 1, data: 1} + Server.new_state([value, %Example{id: 2, data: 2}]) + + task = fn task -> + Server.new_state({:partial, [value]}, :infinity) + task.(task) + end + + task_pid = spawn(fn -> task.(task) end) + Process.sleep(10) for _i <- Range.new(0, 1_000) do assert Server.by_id(1) != [] + assert Server.by_id(2) != [] end # wait for everything to shut down - Task.shutdown(task, :brutal_kill) Process.flag(:trap_exit, true) + Process.exit(task_pid, :brutal_kill) example_pid = GenServer.whereis(Server) case example_pid do