Skip to content

Commit

Permalink
fix: make sure we aren't returning empty results when updating
Browse files Browse the repository at this point in the history
When we were only doing big updates, this was okay because we were not
making updates frequently. With the MqttMediator, we're more frequently
making smaller updates.
  • Loading branch information
paulswartz committed Sep 25, 2023
1 parent db9a98e commit ac7347d
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 71 deletions.
138 changes: 74 additions & 64 deletions apps/state/lib/state/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ defmodule State.Server do
@impl State.Server
def post_load_hook(structs), do: structs

@impl State.Server
def pre_insert_hook(item), do: [item]

@impl Events.Server
def handle_event({:fetch, unquote(opts[:fetched_filename])}, body, _, state) do
case handle_call({:new_state, body}, nil, state) do
Expand Down Expand Up @@ -227,7 +224,6 @@ defmodule State.Server do
new_state: 2,
post_commit_hook: 0,
post_load_hook: 1,
pre_insert_hook: 1,
select: 1,
select: 2,
select_limit: 2,
Expand Down Expand Up @@ -345,26 +341,40 @@ defmodule State.Server do
)

if enum do
with {:atomic, result} <- :mnesia.transaction(&create_children/2, [enum, module], 0) do
{:ok, result}
{keys_to_delete, updates} =
case enum do
{:partial, updates} ->
key_index = module.key_index()
{Enum.map(updates, &Map.get(&1, key_index)), updates}

_ ->
{:all, enum}
end

recordable = module.recordable()

updates =
if function_exported?(module, :pre_insert_hook, 1) do
Stream.flat_map(updates, &module.pre_insert_hook/1)
else
updates
end

updates = Enum.map(updates, &recordable.to_record/1)

with {:atomic, :ok} <-
:mnesia.transaction(&create_children/3, [keys_to_delete, updates, module], 10) do
{:ok, length(updates)}
end
else
{:ok, 0}
end
end

defp create_children(enum, module) do
:mnesia.write_lock_table(module)

{keys_to_delete, updates} =
case enum do
{:partial, updates} ->
key_index = module.key_index()
{Enum.map(updates, &Map.get(&1, key_index)), updates}

_ ->
{:all, enum}
end
defp create_children(keys_to_delete, updates, module) do
if keys_to_delete == :all do
:mnesia.write_lock_table(module)
end

delete_all = fn ->
all_keys =
Expand All @@ -378,14 +388,7 @@ defmodule State.Server do
end

write_new = fn ->
recordable = module.recordable()

updates
|> Stream.flat_map(&module.pre_insert_hook/1)
|> Enum.reduce(0, fn item, sum ->
:mnesia.write(module, recordable.to_record(item), :write)
sum + 1
end)
:lists.foreach(&:mnesia.write(module, &1, :write), updates)
end

:ok =
Expand All @@ -396,59 +399,66 @@ defmodule State.Server do
end
)

write_count =
debug_time(
write_new,
fn milliseconds ->
"write_new #{module} #{inspect(self())} took #{milliseconds}ms"
end
)
debug_time(
write_new,
fn milliseconds ->
"write_new #{module} #{inspect(self())} took #{milliseconds}ms"
end
)

write_count
:ok
end

def size(module) do
:mnesia.table_info(module, :size)
end

def all(module, opts) do
module
|> :ets.tab2list()
|> to_structs(module, opts)
{:atomic, results} =
:mnesia.transaction(fn ->
:mnesia.foldl(
fn result, acc ->
[result | acc]
end,
[],
module
)
end)

to_structs(results, module, opts)
rescue
ArgumentError ->
# if the table is being rebuilt, we re-try to get the data
all(module, opts)
end

def all_keys(module) do
:mnesia.ets(fn ->
:mnesia.all_keys(module)
end)
{:atomic, results} = :mnesia.transaction(&:mnesia.all_keys/1, [module])

results
end

def by_index(values, module, indices, opts) do
indices
|> build_read_fun(module)
|> :lists.flatmap(values)
|> to_structs(module, opts)
catch
:exit, {:aborted, {_, [^module | _]}} ->
by_index(values, module, indices, opts)
read_fun = build_read_fun(indices, module)

{:atomic, results} = :mnesia.transaction(&:lists.flatmap/2, [read_fun, values])

to_structs(results, module, opts)
end

defp build_read_fun({key_index, key_index}, module) do
&:mnesia.dirty_read(module, &1)
&:mnesia.read(module, &1)
end

defp build_read_fun({index, _key_index}, module) do
&:mnesia.dirty_index_read(module, &1, index)
&:mnesia.index_read(module, &1, index)
end

def by_index_match(match, module, index, opts) do
module
|> :mnesia.dirty_index_match_object(match, index)
|> to_structs(module, opts)
{:atomic, results} =
:mnesia.transaction(&:mnesia.index_match_object/4, [module, match, index, :read])

to_structs(results, module, opts)
end

def matchers_to_selectors(module, matchers) do
Expand All @@ -470,15 +480,17 @@ defmodule State.Server do
end

def select(module, matchers, index) when is_list(matchers) and is_atom(index) do
:lists.flatmap(&module.match(&1, index), matchers)
{:atomic, results} =
:mnesia.transaction(fn ->
:lists.flatmap(&module.match(&1, index), matchers)
end)

results
end

def select_with_selectors(module, selectors) when is_atom(module) and is_list(selectors) do
fn ->
:mnesia.select(module, selectors)
end
|> :mnesia.async_dirty()
|> to_structs(module, [])
{:atomic, results} = :mnesia.transaction(&:mnesia.select/2, [module, selectors])
to_structs(results, module, [])
end

def select_limit(module, matchers, num_objects) do
Expand All @@ -487,13 +499,11 @@ defmodule State.Server do
end

def select_limit_with_selectors(module, selectors, num_objects) do
case :mnesia.async_dirty(fn ->
:mnesia.select(module, selectors, num_objects, :read)
end) do
:"$end_of_table" ->
case :mnesia.transaction(&:mnesia.select/4, [module, selectors, num_objects, :read]) do
{:atomic, :"$end_of_table"} ->
[]

{records, _cont} ->
{:atomic, {records, _cont}} ->
to_structs(records, module, [])
end
end
Expand All @@ -502,7 +512,7 @@ defmodule State.Server do
recordable = module.recordable()

records
|> Enum.map(&recordable.from_record(&1))
|> Enum.map(&recordable.from_record/1)
|> module.post_load_hook()
|> State.all(opts)
end
Expand Down
57 changes: 50 additions & 7 deletions apps/state/test/state/server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,63 @@ defmodule State.ServerTest do
Server.new_state([value])
assert Server.by_id(1) == [value]

task =
Task.async(fn ->
[value]
|> Stream.cycle()
|> Server.new_state(:infinity)
end)
task = fn task ->
Server.new_state([value], :infinity)
task.(task)
end

task_pid = spawn(fn -> task.(task) end)

Process.sleep(10)

for _i <- Range.new(0, 1_000) do
assert Server.by_id(1) != []
end

for _i <- Range.new(0, 1_000) do
assert Server.all() != []
end

# wait for everything to shut down
Process.flag(:trap_exit, true)
Process.exit(task_pid, :brutal_kill)
example_pid = GenServer.whereis(Server)

case example_pid do
nil ->
:ok

pid ->
Process.exit(pid, :brutal_kill)

receive do
{:EXIT, ^pid, :brutal_kill} -> :ok
end
end

Process.flag(:trap_exit, false)
end

test "new_state does not show an empty state when sending partial updates" do
value = %Example{id: 1, data: 1}
Server.new_state([value, %Example{id: 2, data: 2}])

task = fn task ->
Server.new_state({:partial, [value]}, :infinity)
task.(task)
end

task_pid = spawn(fn -> task.(task) end)
Process.sleep(10)

for _i <- Range.new(0, 1_000) do
assert Server.by_id(1) != []
assert Server.by_id(2) != []
end

# wait for everything to shut down
Task.shutdown(task, :brutal_kill)
Process.flag(:trap_exit, true)
Process.exit(task_pid, :brutal_kill)
example_pid = GenServer.whereis(Server)

case example_pid do
Expand Down

0 comments on commit ac7347d

Please sign in to comment.