Skip to content

Commit

Permalink
feat(Server): accept partial updates
Browse files Browse the repository at this point in the history
  • Loading branch information
paulswartz committed Sep 27, 2023
1 parent a501f8b commit c993525
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 22 deletions.
84 changes: 62 additions & 22 deletions apps/state/lib/state/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ defmodule State.Server do
@impl State.Server
def post_load_hook(structs), do: structs

@impl State.Server
def pre_insert_hook(item), do: [item]

@impl Events.Server
def handle_event({:fetch, unquote(opts[:fetched_filename])}, body, _, state) do
case handle_call({:new_state, body}, nil, state) do
Expand Down Expand Up @@ -227,7 +224,6 @@ defmodule State.Server do
new_state: 2,
post_commit_hook: 0,
post_load_hook: 1,
pre_insert_hook: 1,
select: 1,
select: 2,
select_limit: 2,
Expand Down Expand Up @@ -307,8 +303,8 @@ defmodule State.Server do
recordable = module.recordable()
indices = module.indices()
attributes = recordable.fields()
id_field = List.first(attributes)
index = Enum.reject(indices, &Kernel.==(&1, id_field))
key_index = module.key_index()
index = Enum.reject(indices, &Kernel.==(&1, key_index))
recreate_table(module, attributes: attributes, index: index, record_name: recordable)
end

Expand Down Expand Up @@ -345,28 +341,47 @@ defmodule State.Server do
)

if enum do
with {:atomic, :ok} <- :mnesia.transaction(&create_children/2, [enum, module], 0) do
:ok
has_hook? = function_exported?(module, :pre_insert_hook, 1)

update =
case enum do
{:partial, enum} when has_hook? ->
{:partial, Stream.flat_map(enum, &module.pre_insert_hook/1)}

{:partial, _enum} = update ->
update

enum when has_hook? ->
{:all, Stream.flat_map(enum, &module.pre_insert_hook/1)}

enum ->
{:all, enum}
end

with {:atomic, count} <-
:mnesia.transaction(&create_children/2, [update, module], 10) do
{:ok, count}
end
else
:ok
{:ok, 0}
end
end

defp create_children(enum, module) do
defp create_children({:all, enum}, module) do
recordable = module.recordable()
:mnesia.write_lock_table(module)

delete_all = fn ->
all_keys = :mnesia.all_keys(module)

:lists.foreach(&:mnesia.delete(module, &1, :write), all_keys)
end

write_new = fn ->
recordable = module.recordable()

enum
|> Stream.flat_map(&module.pre_insert_hook/1)
|> Enum.each(&:mnesia.write(module, recordable.to_record(&1), :write))
Enum.reduce(enum, 0, fn item, count ->
:mnesia.write(module, recordable.to_record(item), :write)
count + 1
end)
end

:ok =
Expand All @@ -377,13 +392,40 @@ defmodule State.Server do
end
)

:ok =
count =
debug_time(
write_new,
fn milliseconds ->
"write_new #{module} #{inspect(self())} took #{milliseconds}ms"
end
)

count
end

defp create_children({:partial, enum}, module) do
recordable = module.recordable()
key_index = module.key_index()

update = fn ->
enum
|> Enum.group_by(&Map.get(&1, key_index))
|> Enum.reduce(0, fn {key, items}, count ->
:mnesia.delete(module, key, :write)
:lists.foreach(&:mnesia.write(module, recordable.to_record(&1), :write), items)
length(items) + count
end)
end

count =
debug_time(
update,
fn milliseconds ->
"partial_update #{module} #{inspect(self())} took #{milliseconds}ms"
end
)

count
end

def size(module) do
Expand Down Expand Up @@ -481,7 +523,7 @@ defmodule State.Server do
recordable = module.recordable()

records
|> Enum.map(&recordable.from_record(&1))
|> Enum.map(&recordable.from_record/1)
|> module.post_load_hook()
|> State.all(opts)
end
Expand Down Expand Up @@ -524,7 +566,7 @@ defmodule State.Server do
end

defp do_handle_new_state(module, func) do
:ok =
{:ok, update_count} =
debug_time(
fn -> create!(func, module) end,
fn milliseconds ->
Expand All @@ -542,15 +584,13 @@ defmodule State.Server do
end
)

new_size = module.size()

_ =
Logger.info(fn ->
"Update #{module} #{inspect(self())}: #{new_size} items"
"Update #{module} #{inspect(self())}: #{update_count} items"
end)

module.update_metadata()
Events.publish({:new_state, module}, new_size)
Events.publish({:new_state, module}, update_count)

:ok
end
Expand Down
17 changes: 17 additions & 0 deletions apps/state/test/state/vehicle_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ defmodule State.VehicleTest do
assert [_, _] = all()
end

test "it can accept a partial update which only modifies some vehicles" do
new_state([
%Vehicle{id: "one", trip_id: "1", route_id: "1"},
%Vehicle{id: "two", trip_id: "2", route_id: "1"}
])

new_state(
{:partial,
[
%Vehicle{id: "one", trip_id: "3", route_id: "1"}
]}
)

assert %Vehicle{id: "one", trip_id: "3"} = by_id("one")
assert %Vehicle{id: "two", trip_id: "2"} = by_id("two")
end

@tag :capture_log
test "an invalid state doesn't crash the server" do
vehicle = %Vehicle{id: "1", trip_id: "2"}
Expand Down

0 comments on commit c993525

Please sign in to comment.