diff --git a/apps/state/lib/state/server.ex b/apps/state/lib/state/server.ex index 3d99652d3..c0aaae812 100644 --- a/apps/state/lib/state/server.ex +++ b/apps/state/lib/state/server.ex @@ -187,9 +187,6 @@ defmodule State.Server do @impl State.Server def post_load_hook(structs), do: structs - @impl State.Server - def pre_insert_hook(item), do: [item] - @impl Events.Server def handle_event({:fetch, unquote(opts[:fetched_filename])}, body, _, state) do case handle_call({:new_state, body}, nil, state) do @@ -227,7 +224,6 @@ defmodule State.Server do new_state: 2, post_commit_hook: 0, post_load_hook: 1, - pre_insert_hook: 1, select: 1, select: 2, select_limit: 2, @@ -345,26 +341,40 @@ defmodule State.Server do ) if enum do - with {:atomic, result} <- :mnesia.transaction(&create_children/2, [enum, module], 0) do - {:ok, result} + {keys_to_delete, updates} = + case enum do + {:partial, updates} -> + key_index = module.key_index() + {Enum.map(updates, &Map.get(&1, key_index)), updates} + + _ -> + {:all, enum} + end + + recordable = module.recordable() + + updates = + if function_exported?(module, :pre_insert_hook, 1) do + Stream.flat_map(updates, &module.pre_insert_hook/1) + else + updates + end + + updates = Enum.map(updates, &recordable.to_record/1) + + with {:atomic, :ok} <- + :mnesia.transaction(&create_children/3, [keys_to_delete, updates, module], 10) do + {:ok, length(updates)} end else {:ok, 0} end end - defp create_children(enum, module) do - :mnesia.write_lock_table(module) - - {keys_to_delete, updates} = - case enum do - {:partial, updates} -> - key_index = module.key_index() - {Enum.map(updates, &Map.get(&1, key_index)), updates} - - _ -> - {:all, enum} - end + defp create_children(keys_to_delete, updates, module) do + if keys_to_delete == :all do + :mnesia.write_lock_table(module) + end delete_all = fn -> all_keys = @@ -378,14 +388,7 @@ defmodule State.Server do end write_new = fn -> - recordable = module.recordable() - - updates - |> Stream.flat_map(&module.pre_insert_hook/1) - |> Enum.reduce(0, fn item, sum -> - :mnesia.write(module, recordable.to_record(item), :write) - sum + 1 - end) + :lists.foreach(&:mnesia.write(module, &1, :write), updates) end :ok = @@ -396,15 +399,14 @@ defmodule State.Server do end ) - write_count = - debug_time( - write_new, - fn milliseconds -> - "write_new #{module} #{inspect(self())} took #{milliseconds}ms" - end - ) + debug_time( + write_new, + fn milliseconds -> + "write_new #{module} #{inspect(self())} took #{milliseconds}ms" + end + ) - write_count + :ok end def size(module) do @@ -412,9 +414,18 @@ defmodule State.Server do end def all(module, opts) do - module - |> :ets.tab2list() - |> to_structs(module, opts) + {:atomic, results} = + :mnesia.transaction(fn -> + :mnesia.foldl( + fn result, acc -> + [result | acc] + end, + [], + module + ) + end) + + to_structs(results, module, opts) rescue ArgumentError -> # if the table is being rebuilt, we re-try to get the data @@ -422,33 +433,32 @@ defmodule State.Server do end def all_keys(module) do - :mnesia.ets(fn -> - :mnesia.all_keys(module) - end) + {:atomic, results} = :mnesia.transaction(&:mnesia.all_keys/1, [module]) + + results end def by_index(values, module, indices, opts) do - indices - |> build_read_fun(module) - |> :lists.flatmap(values) - |> to_structs(module, opts) - catch - :exit, {:aborted, {_, [^module | _]}} -> - by_index(values, module, indices, opts) + read_fun = build_read_fun(indices, module) + + {:atomic, results} = :mnesia.transaction(&:lists.flatmap/2, [read_fun, values]) + + to_structs(results, module, opts) end defp build_read_fun({key_index, key_index}, module) do - &:mnesia.dirty_read(module, &1) + &:mnesia.read(module, &1) end defp build_read_fun({index, _key_index}, module) do - &:mnesia.dirty_index_read(module, &1, index) + &:mnesia.index_read(module, &1, index) end def by_index_match(match, module, index, opts) do - module - |> :mnesia.dirty_index_match_object(match, index) - |> to_structs(module, opts) + {:atomic, results} = + :mnesia.transaction(&:mnesia.index_match_object/4, [module, match, index, :read]) + + to_structs(results, module, opts) end def matchers_to_selectors(module, matchers) do @@ -470,15 +480,17 @@ defmodule State.Server do end def select(module, matchers, index) when is_list(matchers) and is_atom(index) do - :lists.flatmap(&module.match(&1, index), matchers) + {:atomic, results} = + :mnesia.transaction(fn -> + :lists.flatmap(&module.match(&1, index), matchers) + end) + + results end def select_with_selectors(module, selectors) when is_atom(module) and is_list(selectors) do - fn -> - :mnesia.select(module, selectors) - end - |> :mnesia.async_dirty() - |> to_structs(module, []) + {:atomic, results} = :mnesia.transaction(&:mnesia.select/2, [module, selectors]) + to_structs(results, module, []) end def select_limit(module, matchers, num_objects) do @@ -487,13 +499,11 @@ defmodule State.Server do end def select_limit_with_selectors(module, selectors, num_objects) do - case :mnesia.async_dirty(fn -> - :mnesia.select(module, selectors, num_objects, :read) - end) do - :"$end_of_table" -> + case :mnesia.transaction(&:mnesia.select/4, [module, selectors, num_objects, :read]) do + {:atomic, :"$end_of_table"} -> [] - {records, _cont} -> + {:atomic, {records, _cont}} -> to_structs(records, module, []) end end @@ -502,7 +512,7 @@ defmodule State.Server do recordable = module.recordable() records - |> Enum.map(&recordable.from_record(&1)) + |> Enum.map(&recordable.from_record/1) |> module.post_load_hook() |> State.all(opts) end diff --git a/apps/state/test/state/server_test.exs b/apps/state/test/state/server_test.exs index cd9f62aca..87d1ba9b6 100644 --- a/apps/state/test/state/server_test.exs +++ b/apps/state/test/state/server_test.exs @@ -148,20 +148,63 @@ defmodule State.ServerTest do Server.new_state([value]) assert Server.by_id(1) == [value] - task = - Task.async(fn -> - [value] - |> Stream.cycle() - |> Server.new_state(:infinity) - end) + task = fn task -> + Server.new_state([value], :infinity) + task.(task) + end + + task_pid = spawn(fn -> task.(task) end) + + Process.sleep(10) + + for _i <- Range.new(0, 1_000) do + assert Server.by_id(1) != [] + end + + for _i <- Range.new(0, 1_000) do + assert Server.all() != [] + end + + # wait for everything to shut down + Process.flag(:trap_exit, true) + Process.exit(task_pid, :brutal_kill) + example_pid = GenServer.whereis(Server) + + case example_pid do + nil -> + :ok + + pid -> + Process.exit(pid, :brutal_kill) + + receive do + {:EXIT, ^pid, :brutal_kill} -> :ok + end + end + + Process.flag(:trap_exit, false) + end + + test "new_state does not show an empty state when sending partial updates" do + value = %Example{id: 1, data: 1} + Server.new_state([value, %Example{id: 2, data: 2}]) + + task = fn task -> + Server.new_state({:partial, [value]}, :infinity) + task.(task) + end + + task_pid = spawn(fn -> task.(task) end) + Process.sleep(10) for _i <- Range.new(0, 1_000) do assert Server.by_id(1) != [] + assert Server.by_id(2) != [] end # wait for everything to shut down - Task.shutdown(task, :brutal_kill) Process.flag(:trap_exit, true) + Process.exit(task_pid, :brutal_kill) example_pid = GenServer.whereis(Server) case example_pid do