-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add device extensions support #1479
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1383,4 +1383,33 @@ defmodule NervesHub.Devices do | |
"pending" | ||
end | ||
end | ||
|
||
def enable_extension_setting(%Device{} = device, extension_string) do | ||
device = Repo.get(Device, device.id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we fetching the device if we are requiring it as an arg? |
||
|
||
Device.changeset(device, %{"extensions" => %{extension_string => true}}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this actually work if I have multiple extensions? To me it looks like it will completely overwrite all extension settings with just this one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does not, |
||
|> Repo.update() | ||
|> tap(fn _ -> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we will need to check that the update was successful before broadcasting, otherwise we will be out of sync with the setting |
||
topic = "device:#{device.id}:extensions" | ||
|
||
NervesHubWeb.DeviceEndpoint.broadcast(topic, "attach", %{"extensions" => [extension_string]}) | ||
end) | ||
end | ||
|
||
def disable_extension_setting(%Device{} = device, extension_string) do | ||
device = Repo.get(Device, device.id) | ||
|
||
Device.changeset(device, %{"extensions" => %{extension_string => false}}) | ||
|> Repo.update() | ||
|> tap(fn _ -> | ||
topic = "device:#{device.id}:extensions" | ||
|
||
NervesHubWeb.DeviceEndpoint.broadcast(topic, "detach", %{"extensions" => [extension_string]}) | ||
end) | ||
end | ||
|
||
def preload_product(%Device{} = device) do | ||
device | ||
|> Repo.preload(:product) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
defmodule NervesHub.Extensions do | ||
@moduledoc """ | ||
An "extension" is an additional piece of functionality that we add onto the | ||
existing connection between the device and the NervesHub service. They are | ||
designed to be less important than firmware updates and requires both client | ||
to report support and the server to enable support. | ||
This is intended to ensure that: | ||
- The service decides when activity should be taken by the device meaning | ||
the fleet of devices will not inadvertently swarm the service with data. | ||
- The service can turn off extensions in various ways to ensure that disruptive | ||
extensions stop being enabled on subsequent connections. | ||
- Use of extensions should have very little chance to disrupt the flow of a | ||
critical firmware update. | ||
""" | ||
|
||
@callback handle_in(event :: String.t(), Phoenix.Channel.payload(), Phoenix.Socket.t()) :: | ||
{:noreply, Phoenix.Socket.t()} | ||
| {:noreply, Phoenix.Socket.t(), timeout() | :hibernate} | ||
| {:reply, Phoenix.Channel.reply(), Phoenix.Socket.t()} | ||
| {:stop, reason :: term(), Phoenix.Socket.t()} | ||
| {:stop, reason :: term(), Phoenix.Channel.reply(), Phoenix.Socket.t()} | ||
|
||
@callback handle_info(msg :: term(), Phoenix.Socket.t()) :: | ||
{:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()} | ||
|
||
@callback attach(Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | ||
@callback detach(Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()} | ||
|
||
require Logger | ||
|
||
@supported_extensions %{ | ||
health: """ | ||
Reporting of fundamental device metrics, metadata, alarms and more. | ||
Also supports custom metrics. Alarms require an alarm handler to be set. | ||
""", | ||
geo: """ | ||
Reporting of GeoIP information or custom geo-location information sources | ||
you've set up for your device. | ||
""" | ||
} | ||
Comment on lines
+33
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a behavior module, I feel like we should keep as much of this in the extension as possible. Maybe we need a |
||
@type extension() :: :health | :geo | ||
|
||
@doc """ | ||
Get list of supported extensions as atoms with descriptive text. | ||
""" | ||
@spec list() :: %{extension() => String.t()} | ||
def list do | ||
@supported_extensions | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
defmodule NervesHub.Extensions.ExtensionsSetting do | ||
use Ecto.Schema | ||
import Ecto.Changeset | ||
|
||
@primary_key false | ||
embedded_schema do | ||
field(:health, :boolean, default: nil) | ||
field(:geo, :boolean, default: nil) | ||
end | ||
|
||
def changeset(setting, params) do | ||
setting | ||
|> cast(params, [:health, :geo]) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
defmodule NervesHub.Extensions.Geo do | ||
@behaviour NervesHub.Extensions | ||
|
||
alias NervesHub.Devices | ||
|
||
@impl NervesHub.Extensions | ||
def attach(socket) do | ||
extension_config = Application.get_env(:nerves_hub, :extension_config) | ||
geo_interval = get_in(extension_config, [:geo, :interval_minutes]) || 0 | ||
|
||
send(self(), {__MODULE__, :location_request}) | ||
|
||
socket = | ||
if geo_interval > 0 do | ||
timer = | ||
geo_interval | ||
|> :timer.minutes() | ||
|> :timer.send_interval({__MODULE__, :location_request}) | ||
|
||
socket | ||
|> Phoenix.Socket.assign(:geo_timer, timer) | ||
|> Phoenix.Socket.assign(:geo_interval, geo_interval) | ||
else | ||
socket | ||
end | ||
|
||
{:noreply, socket} | ||
end | ||
|
||
@impl NervesHub.Extensions | ||
def detach(socket) do | ||
_ = if socket.assigns[:geo_timer], do: :timer.cancel(socket.assigns.geo_timer) | ||
{:noreply, Phoenix.Socket.assign(socket, :geo_timer, nil)} | ||
end | ||
|
||
@impl NervesHub.Extensions | ||
def handle_in("location:update", location, %{assigns: %{device: device}} = socket) do | ||
metadata = Map.put(device.connection_metadata, "location", location) | ||
|
||
{:ok, device} = Devices.update_device(device, %{connection_metadata: metadata}) | ||
|
||
_ = | ||
NervesHubWeb.DeviceEndpoint.broadcast( | ||
"device:#{device.identifier}:internal", | ||
"location:updated", | ||
location | ||
) | ||
|
||
{:noreply, Phoenix.Socket.assign(socket, :device, device)} | ||
end | ||
|
||
@impl NervesHub.Extensions | ||
def handle_info(:location_request, socket) do | ||
Phoenix.Channel.push(socket, "geo:location:request", %{}) | ||
{:noreply, socket} | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
defmodule NervesHub.Extensions.Health do | ||
@behaviour NervesHub.Extensions | ||
|
||
alias NervesHub.Devices | ||
alias NervesHub.Devices.Metrics | ||
|
||
require Logger | ||
|
||
# @impl NervesHub.Extensions | ||
def init(socket) do | ||
# Allow DB settings? | ||
socket | ||
end | ||
Comment on lines
+9
to
+13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cruft? |
||
|
||
@impl NervesHub.Extensions | ||
def attach(socket) do | ||
extension_config = Application.get_env(:nerves_hub, :extension_config, []) | ||
|
||
health_interval = | ||
case get_in(extension_config, [:health, :interval_minutes]) do | ||
i when is_integer(i) -> i | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed if we're calling |
||
_ -> 60 | ||
end | ||
|
||
send(self(), {__MODULE__, :check}) | ||
|
||
socket = | ||
if health_interval > 0 do | ||
timer = | ||
health_interval | ||
|> :timer.minutes() | ||
|> :timer.send_interval({__MODULE__, :check}) | ||
|
||
socket | ||
|> Phoenix.Socket.assign(:health_interval, health_interval) | ||
|> Phoenix.Socket.assign(:health_timer, timer) | ||
else | ||
socket | ||
end | ||
|
||
{:noreply, socket} | ||
end | ||
|
||
@impl NervesHub.Extensions | ||
def detach(socket) do | ||
_ = if socket.assigns[:health_timer], do: :timer.cancel(socket.assigns.health_timer) | ||
{:noreply, Phoenix.Socket.assign(socket, :health_timer, nil)} | ||
end | ||
|
||
@impl NervesHub.Extensions | ||
def handle_in("report", %{"value" => device_status}, socket) do | ||
device_meta = | ||
for {key, val} <- Map.from_struct(socket.assigns.device.firmware_metadata), | ||
into: %{}, | ||
do: {to_string(key), to_string(val)} | ||
|
||
# Separate metrics from health report to store in metrics table | ||
metrics = device_status["metrics"] | ||
|
||
health_report = | ||
device_status | ||
|> Map.delete("metrics") | ||
|> Map.put("metadata", Map.merge(device_status["metadata"], device_meta)) | ||
|
||
device_health = %{"device_id" => socket.assigns.device.id, "data" => health_report} | ||
|
||
with {:health_report, {:ok, _}} <- | ||
{:health_report, Devices.save_device_health(device_health)}, | ||
{:metrics_report, {count, _}} when count >= 0 <- | ||
{:metrics_report, Metrics.save_metrics(socket.assigns.device.id, metrics)} do | ||
device_internal_broadcast!(socket.assigns.device, "health_check_report", %{}) | ||
else | ||
{:health_report, {:error, err}} -> | ||
Logger.warning("Failed to save health check data: #{inspect(err)}") | ||
log_to_sentry(socket.assigns.device, "[DeviceChannel] Failed to save health check data.") | ||
|
||
{:metrics_report, {:error, err}} -> | ||
Logger.warning("Failed to save metrics: #{inspect(err)}") | ||
log_to_sentry(socket.assigns.device, "[DeviceChannel] Failed to save metrics.") | ||
end | ||
|
||
{:noreply, socket} | ||
end | ||
|
||
@impl NervesHub.Extensions | ||
def handle_info(:check, socket) do | ||
Phoenix.Channel.push(socket, "health:check", %{}) | ||
{:noreply, socket} | ||
end | ||
|
||
defp device_internal_broadcast!(device, event, payload) do | ||
topic = "device:#{device.identifier}:extensions" | ||
NervesHubWeb.DeviceEndpoint.broadcast_from!(self(), topic, event, payload) | ||
end | ||
|
||
defp log_to_sentry(device, msg_or_ex, extra \\ %{}) do | ||
Sentry.Context.set_tags_context(%{ | ||
device_identifier: device.identifier, | ||
device_id: device.id, | ||
product_id: device.product_id, | ||
org_id: device.org_id | ||
}) | ||
|
||
_ = | ||
if is_exception(msg_or_ex) do | ||
Sentry.capture_exception(msg_or_ex, extra: extra, result: :none) | ||
else | ||
Sentry.capture_message(msg_or_ex, extra: extra, result: :none) | ||
end | ||
|
||
:ok | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -269,4 +269,28 @@ defmodule NervesHub.Products do | |
end | ||
end | ||
end | ||
|
||
def enable_extension_setting(%Product{} = product, extension_string) do | ||
product = Repo.get(Product, product.id) | ||
|
||
Product.changeset(product, %{"extensions" => %{extension_string => true}}) | ||
|> Repo.update() | ||
|> tap(fn _ -> | ||
topic = "product:#{product.id}:extensions" | ||
|
||
NervesHubWeb.DeviceEndpoint.broadcast(topic, "attach", %{"extensions" => [extension_string]}) | ||
end) | ||
end | ||
|
||
def disable_extension_setting(%Product{} = product, extension_string) do | ||
product = Repo.get(Product, product.id) | ||
|
||
Product.changeset(product, %{"extensions" => %{extension_string => false}}) | ||
|> Repo.update() | ||
|> tap(fn _ -> | ||
topic = "product:#{product.id}:extensions" | ||
|
||
NervesHubWeb.DeviceEndpoint.broadcast(topic, "detach", %{"extensions" => [extension_string]}) | ||
end) | ||
Comment on lines
+273
to
+294
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thoughts on refactoring this into a |
||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do? Were we reporting errors to Sentry from tests before? 😨