Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: accept MQTT connections for Vehicle Positions #673

Merged
merged 5 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN mix local.hex --force && \

RUN apk add --update git make build-base erlang-dev

ENV MIX_ENV=prod
ENV MIX_ENV=prod BUILD_WITHOUT_QUIC=1

ADD apps apps
ADD config config
Expand Down
4 changes: 3 additions & 1 deletion apps/api_web/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ defmodule ApiWeb.Mixfile do
{:recaptcha, git: "https://github.com/samueljseay/recaptcha.git", tag: "71cd746"},
{:sentry, "~> 8.0"},
{:qr_code, "~> 3.0"},
{:nimble_totp, "~> 1.0"}
{:nimble_totp, "~> 1.0"},
# address discrepency between cowboy (in api_web) and gun (in state_mediator)
{:cowlib, "~> 2.11", override: true}
]
end
end
24 changes: 19 additions & 5 deletions apps/parse/lib/parse/vehicle_positions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,30 @@ defmodule Parse.VehiclePositions do
alias Parse.Realtime.FeedMessage
import Parse.Helpers

def parse(<<31, 139, _::binary>> = blob) do
# gzip encoded
blob
|> :zlib.gunzip()
|> parse
end

def parse("{" <> _ = blob) do
Parse.VehiclePositionsJson.parse(blob)
end

def parse(blob) do
blob
|> FeedMessage.decode()
|> (fn message -> message.entity end).()
|> Stream.map(fn entity -> entity.vehicle end)
|> Stream.map(&parse_vehicle_update/1)
decoded = FeedMessage.decode(blob)

entities =
decoded.entity
|> Stream.map(fn entity -> entity.vehicle end)
|> Stream.map(&parse_vehicle_update/1)

if decoded.header.incrementality == :DIFFERENTIAL do
{:partial, entities}
else
entities
end
end

def parse_vehicle_update(update) do
Expand Down
20 changes: 16 additions & 4 deletions apps/parse/lib/parse/vehicle_positions_json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ defmodule Parse.VehiclePositionsJson do
alias Model.Vehicle

def parse(body) do
body
|> Jason.decode!(strings: :copy)
|> Map.get("entity")
|> Enum.flat_map(&parse_entity/1)
decoded = Jason.decode!(body, strings: :copy)

entities =
decoded
|> Map.get("entity")
|> Enum.flat_map(&parse_entity/1)

if decoded["header"]["incrementality"] in ["DIFFERENTIAL", 1] do
{:partial, entities}
else
entities
end
end

def parse_entity(
Expand Down Expand Up @@ -109,6 +117,10 @@ defmodule Parse.VehiclePositionsJson do
Parse.Timezone.unix_to_local(timestamp)
end

defp unix_to_local(timestamp) when is_float(timestamp) do
Parse.Timezone.unix_to_local(trunc(timestamp))
end

defp unix_to_local(nil) do
DateTime.utc_now()
end
Expand Down
6 changes: 6 additions & 0 deletions apps/parse/test/parse/vehicle_positions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ defmodule Parse.VehiclePositionsTest do
actual = parse(body)
assert actual == expected
end

test "can parse gzip-encoded JSON" do
body = :zlib.gzip(Jason.encode!(%{entity: [@vehicle]}))
actual = parse(body)
assert [_] = actual
end
end

describe "parse_vehicle_update/1" do
Expand Down
84 changes: 62 additions & 22 deletions apps/state/lib/state/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ defmodule State.Server do
@impl State.Server
def post_load_hook(structs), do: structs

@impl State.Server
def pre_insert_hook(item), do: [item]

@impl Events.Server
def handle_event({:fetch, unquote(opts[:fetched_filename])}, body, _, state) do
case handle_call({:new_state, body}, nil, state) do
Expand Down Expand Up @@ -227,7 +224,6 @@ defmodule State.Server do
new_state: 2,
post_commit_hook: 0,
post_load_hook: 1,
pre_insert_hook: 1,
select: 1,
select: 2,
select_limit: 2,
Expand Down Expand Up @@ -307,8 +303,8 @@ defmodule State.Server do
recordable = module.recordable()
indices = module.indices()
attributes = recordable.fields()
id_field = List.first(attributes)
index = Enum.reject(indices, &Kernel.==(&1, id_field))
key_index = module.key_index()
index = Enum.reject(indices, &Kernel.==(&1, key_index))
recreate_table(module, attributes: attributes, index: index, record_name: recordable)
end

Expand Down Expand Up @@ -345,28 +341,47 @@ defmodule State.Server do
)

if enum do
with {:atomic, :ok} <- :mnesia.transaction(&create_children/2, [enum, module], 0) do
:ok
has_hook? = function_exported?(module, :pre_insert_hook, 1)

update =
case enum do
{:partial, enum} when has_hook? ->
{:partial, Stream.flat_map(enum, &module.pre_insert_hook/1)}

{:partial, _enum} = update ->
update

enum when has_hook? ->
{:all, Stream.flat_map(enum, &module.pre_insert_hook/1)}

enum ->
{:all, enum}
end

with {:atomic, count} <-
:mnesia.transaction(&create_children/2, [update, module], 10) do
{:ok, count}
end
else
:ok
{:ok, 0}
end
end

defp create_children(enum, module) do
defp create_children({:all, enum}, module) do
recordable = module.recordable()
:mnesia.write_lock_table(module)

delete_all = fn ->
all_keys = :mnesia.all_keys(module)

:lists.foreach(&:mnesia.delete(module, &1, :write), all_keys)
end

write_new = fn ->
recordable = module.recordable()

enum
|> Stream.flat_map(&module.pre_insert_hook/1)
|> Enum.each(&:mnesia.write(module, recordable.to_record(&1), :write))
Enum.reduce(enum, 0, fn item, count ->
:mnesia.write(module, recordable.to_record(item), :write)
count + 1
end)
end

:ok =
Expand All @@ -377,13 +392,40 @@ defmodule State.Server do
end
)

:ok =
count =
debug_time(
write_new,
fn milliseconds ->
"write_new #{module} #{inspect(self())} took #{milliseconds}ms"
end
)

count
end

defp create_children({:partial, enum}, module) do
recordable = module.recordable()
key_index = module.key_index()

update = fn ->
enum
|> Enum.group_by(&Map.get(&1, key_index))
|> Enum.reduce(0, fn {key, items}, count ->
:mnesia.delete(module, key, :write)
:lists.foreach(&:mnesia.write(module, recordable.to_record(&1), :write), items)
length(items) + count
end)
end

count =
debug_time(
update,
fn milliseconds ->
"partial_update #{module} #{inspect(self())} took #{milliseconds}ms"
end
)

count
end

def size(module) do
Expand Down Expand Up @@ -481,7 +523,7 @@ defmodule State.Server do
recordable = module.recordable()

records
|> Enum.map(&recordable.from_record(&1))
|> Enum.map(&recordable.from_record/1)
|> module.post_load_hook()
|> State.all(opts)
end
Expand Down Expand Up @@ -524,7 +566,7 @@ defmodule State.Server do
end

defp do_handle_new_state(module, func) do
:ok =
{:ok, update_count} =
debug_time(
fn -> create!(func, module) end,
fn milliseconds ->
Expand All @@ -542,15 +584,13 @@ defmodule State.Server do
end
)

new_size = module.size()

_ =
Logger.info(fn ->
"Update #{module} #{inspect(self())}: #{new_size} items"
"Update #{module} #{inspect(self())}: #{update_count} items"
end)

module.update_metadata()
Events.publish({:new_state, module}, new_size)
Events.publish({:new_state, module}, update_count)

:ok
end
Expand Down
17 changes: 17 additions & 0 deletions apps/state/test/state/vehicle_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ defmodule State.VehicleTest do
assert [_, _] = all()
end

test "it can accept a partial update which only modifies some vehicles" do
new_state([
%Vehicle{id: "one", trip_id: "1", route_id: "1"},
%Vehicle{id: "two", trip_id: "2", route_id: "1"}
])

new_state(
{:partial,
[
%Vehicle{id: "one", trip_id: "3", route_id: "1"}
]}
)

assert %Vehicle{id: "one", trip_id: "3"} = by_id("one")
assert %Vehicle{id: "two", trip_id: "2"} = by_id("two")
end

@tag :capture_log
test "an invalid state doesn't crash the server" do
vehicle = %Vehicle{id: "1", trip_id: "2"}
Expand Down
6 changes: 5 additions & 1 deletion apps/state_mediator/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ config :state_mediator, State.Vehicle,
:system,
"MBTA_VEHICLE_SOURCE",
"https://cdn.mbta.com/realtime/VehiclePositions_enhanced.json"
}
},
broker: {:system, "MBTA_VEHICLE_BROKER", nil},
topic: {:system, "MBTA_VEHICLE_TOPIC", nil},
username: {:system, "MBTA_VEHICLE_USERNAME", nil},
password: {:system, "MBTA_VEHICLE_PASSWORD", nil}

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
Expand Down
41 changes: 30 additions & 11 deletions apps/state_mediator/lib/state_mediator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,7 @@ defmodule StateMediator do
interval: 10_000
]
},
{
StateMediator.Mediator,
[
spec_id: :vehicle_mediator,
state: State.Vehicle,
url: source_url(State.Vehicle),
opts: [timeout: 10_000],
sync_timeout: 30_000,
interval: 1_000
]
},
vehicle_mediator_child(app_value(State.Vehicle, :broker), source_url(State.Vehicle)),
{
StateMediator.Mediator,
[
Expand Down Expand Up @@ -75,6 +65,35 @@ defmodule StateMediator do
[]
end

defp vehicle_mediator_child(no_broker, url) when no_broker in ["", nil] do
{
StateMediator.Mediator,
[
spec_id: :vehicle_mediator,
state: State.Vehicle,
url: url,
opts: [timeout: 10_000],
sync_timeout: 30_000,
interval: 1_000
]
}
end

defp vehicle_mediator_child(broker, _url) do
{
StateMediator.MqttMediator,
[
spec_id: :vehicle_mediator,
state: State.Vehicle,
url: broker,
topic: app_value(State.Vehicle, :topic),
username: app_value(State.Vehicle, :username),
password: app_value(State.Vehicle, :password),
sync_timeout: 30_000
]
}
end

@spec crowding_children(boolean()) :: [:supervisor.child_spec() | {module(), term()} | module()]
defp crowding_children(true) do
Logger.info("#{__MODULE__} CR_CROWDING_ENABLED=true")
Expand Down
Loading
Loading