diff --git a/apps/state/lib/state/server.ex b/apps/state/lib/state/server.ex index 3dd43aa80..91ba242d7 100644 --- a/apps/state/lib/state/server.ex +++ b/apps/state/lib/state/server.ex @@ -187,9 +187,6 @@ defmodule State.Server do @impl State.Server def post_load_hook(structs), do: structs - @impl State.Server - def pre_insert_hook(item), do: [item] - @impl Events.Server def handle_event({:fetch, unquote(opts[:fetched_filename])}, body, _, state) do case handle_call({:new_state, body}, nil, state) do @@ -227,7 +224,6 @@ defmodule State.Server do new_state: 2, post_commit_hook: 0, post_load_hook: 1, - pre_insert_hook: 1, select: 1, select: 2, select_limit: 2, @@ -307,8 +303,8 @@ defmodule State.Server do recordable = module.recordable() indices = module.indices() attributes = recordable.fields() - id_field = List.first(attributes) - index = Enum.reject(indices, &Kernel.==(&1, id_field)) + key_index = module.key_index() + index = Enum.reject(indices, &Kernel.==(&1, key_index)) recreate_table(module, attributes: attributes, index: index, record_name: recordable) end @@ -345,28 +341,47 @@ defmodule State.Server do ) if enum do - with {:atomic, :ok} <- :mnesia.transaction(&create_children/2, [enum, module], 0) do - :ok + has_hook? = function_exported?(module, :pre_insert_hook, 1) + + update = + case enum do + {:partial, enum} when has_hook? -> + {:partial, Stream.flat_map(enum, &module.pre_insert_hook/1)} + + {:partial, _enum} = update -> + update + + enum when has_hook? -> + {:all, Stream.flat_map(enum, &module.pre_insert_hook/1)} + + enum -> + {:all, enum} + end + + with {:atomic, count} <- + :mnesia.transaction(&create_children/2, [update, module], 10) do + {:ok, count} end else - :ok + {:ok, 0} end end - defp create_children(enum, module) do + defp create_children({:all, enum}, module) do + recordable = module.recordable() :mnesia.write_lock_table(module) delete_all = fn -> all_keys = :mnesia.all_keys(module) + :lists.foreach(&:mnesia.delete(module, &1, :write), all_keys) end write_new = fn -> - recordable = module.recordable() - - enum - |> Stream.flat_map(&module.pre_insert_hook/1) - |> Enum.each(&:mnesia.write(module, recordable.to_record(&1), :write)) + Enum.reduce(enum, 0, fn item, count -> + :mnesia.write(module, recordable.to_record(item), :write) + count + 1 + end) end :ok = @@ -377,13 +392,40 @@ defmodule State.Server do end ) - :ok = + count = debug_time( write_new, fn milliseconds -> "write_new #{module} #{inspect(self())} took #{milliseconds}ms" end ) + + count + end + + defp create_children({:partial, enum}, module) do + recordable = module.recordable() + key_index = module.key_index() + + update = fn -> + enum + |> Enum.group_by(&Map.get(&1, key_index)) + |> Enum.reduce(0, fn {key, items}, count -> + :mnesia.delete(module, key, :write) + :lists.foreach(&:mnesia.write(module, recordable.to_record(&1), :write), items) + length(items) + count + end) + end + + count = + debug_time( + update, + fn milliseconds -> + "partial_update #{module} #{inspect(self())} took #{milliseconds}ms" + end + ) + + count end def size(module) do @@ -481,7 +523,7 @@ defmodule State.Server do recordable = module.recordable() records - |> Enum.map(&recordable.from_record(&1)) + |> Enum.map(&recordable.from_record/1) |> module.post_load_hook() |> State.all(opts) end @@ -524,7 +566,7 @@ defmodule State.Server do end defp do_handle_new_state(module, func) do - :ok = + {:ok, update_count} = debug_time( fn -> create!(func, module) end, fn milliseconds -> @@ -542,15 +584,13 @@ defmodule State.Server do end ) - new_size = module.size() - _ = Logger.info(fn -> - "Update #{module} #{inspect(self())}: #{new_size} items" + "Update #{module} #{inspect(self())}: #{update_count} items" end) module.update_metadata() - Events.publish({:new_state, module}, new_size) + Events.publish({:new_state, module}, update_count) :ok end diff --git a/apps/state/test/state/vehicle_test.exs b/apps/state/test/state/vehicle_test.exs index ab4ef1c01..3f867573f 100644 --- a/apps/state/test/state/vehicle_test.exs +++ b/apps/state/test/state/vehicle_test.exs @@ -88,6 +88,23 @@ defmodule State.VehicleTest do assert [_, _] = all() end + test "it can accept a partial update which only modifies some vehicles" do + new_state([ + %Vehicle{id: "one", trip_id: "1", route_id: "1"}, + %Vehicle{id: "two", trip_id: "2", route_id: "1"} + ]) + + new_state( + {:partial, + [ + %Vehicle{id: "one", trip_id: "3", route_id: "1"} + ]} + ) + + assert %Vehicle{id: "one", trip_id: "3"} = by_id("one") + assert %Vehicle{id: "two", trip_id: "2"} = by_id("two") + end + @tag :capture_log test "an invalid state doesn't crash the server" do vehicle = %Vehicle{id: "1", trip_id: "2"}