Skip to content

Commit

Permalink
Add a way to extend DeviceChannel functionality via Extensions
Browse files Browse the repository at this point in the history
Allows for specialized extensions on device to report data and
interactions safely outside the without affecting the
firmware update mechanism.

- Geo and Health adapted to the new mechanism.
- Product-level settings to enable/disable
- Device-level settings to mostly disable
  • Loading branch information
jjcarstens authored and lawik committed Nov 19, 2024
1 parent 3843e0b commit ede38df
Show file tree
Hide file tree
Showing 29 changed files with 781 additions and 115 deletions.
16 changes: 12 additions & 4 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ config :nerves_hub,
username: System.get_env("ADMIN_AUTH_USERNAME"),
password: System.get_env("ADMIN_AUTH_PASSWORD")
],
device_health_check_enabled: System.get_env("DEVICE_HEALTH_CHECK_ENABLED", "true") == "true",
device_health_check_interval_minutes:
String.to_integer(System.get_env("DEVICE_HEALTH_CHECK_INTERVAL_MINUTES", "60")),
device_health_days_to_retain:
String.to_integer(System.get_env("HEALTH_CHECK_DAYS_TO_RETAIN", "7")),
device_deployment_change_jitter_seconds:
Expand All @@ -37,7 +34,18 @@ config :nerves_hub,
deployment_calculator_interval_seconds:
String.to_integer(System.get_env("DEPLOYMENT_CALCULATOR_INTERVAL_SECONDS", "3600")),
mapbox_access_token: System.get_env("MAPBOX_ACCESS_TOKEN"),
dashboard_enabled: System.get_env("DASHBOARD_ENABLED", "false") == "true"
dashboard_enabled: System.get_env("DASHBOARD_ENABLED", "false") == "true",
extension_config: [
geo: [
# No interval, fetch geo on device connection by default
interval_minutes:
System.get_env("FEATURES_GEO_INTERVAL_MINUTES", "0") |> String.to_integer()
],
health: [
interval_minutes:
System.get_env("FEATURES_HEALTH_INTERVAL_MINUTES", "60") |> String.to_integer()
]
]

config :nerves_hub, :device_socket_drainer,
batch_size: String.to_integer(System.get_env("DEVICE_SOCKET_DRAINER_BATCH_SIZE", "1000")),
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ config :nerves_hub, NervesHub.SwooshMailer, adapter: Swoosh.Adapters.Test

config :nerves_hub, NervesHub.RateLimit, limit: 100

config :sentry, environment_name: :test

config :phoenix_test, :endpoint, NervesHubWeb.Endpoint

# Initialize plugs at runtime for faster test compilation
Expand Down
29 changes: 29 additions & 0 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1383,4 +1383,33 @@ defmodule NervesHub.Devices do
"pending"
end
end

def enable_extension_setting(%Device{} = device, extension_string) do
device = Repo.get(Device, device.id)

Device.changeset(device, %{"extensions" => %{extension_string => true}})
|> Repo.update()
|> tap(fn _ ->
topic = "device:#{device.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "attach", %{"extensions" => [extension_string]})
end)
end

def disable_extension_setting(%Device{} = device, extension_string) do
device = Repo.get(Device, device.id)

Device.changeset(device, %{"extensions" => %{extension_string => false}})
|> Repo.update()
|> tap(fn _ ->
topic = "device:#{device.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "detach", %{"extensions" => [extension_string]})
end)
end

def preload_product(%Device{} = device) do
device
|> Repo.preload(:product)
end
end
3 changes: 3 additions & 0 deletions lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule NervesHub.Devices.Device do
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Deployments.Deployment
alias NervesHub.Extensions.ExtensionsSetting
alias NervesHub.Firmwares.FirmwareMetadata
alias NervesHub.Products.Product

Expand Down Expand Up @@ -71,12 +72,14 @@ defmodule NervesHub.Devices.Device do
field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
embeds_one(:extensions, ExtensionsSetting)
end

def changeset(%Device{} = device, params) do
device
|> cast(params, @required_params ++ @optional_params)
|> cast_embed(:firmware_metadata)
|> cast_embed(:extensions)
|> validate_required(@required_params)
|> validate_length(:tags, min: 1)
|> unique_constraint(:identifier)
Expand Down
1 change: 1 addition & 0 deletions lib/nerves_hub/devices/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule NervesHub.Devices.Metrics do

@default_metric_types [
:cpu_temp,
:cpu_usage_percent,
:load_15min,
:load_1min,
:load_5min,
Expand Down
52 changes: 52 additions & 0 deletions lib/nerves_hub/extensions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule NervesHub.Extensions do
@moduledoc """
An "extension" is an additional piece of functionality that we add onto the
existing connection between the device and the NervesHub service. They are
designed to be less important than firmware updates and requires both client
to report support and the server to enable support.
This is intended to ensure that:
- The service decides when activity should be taken by the device meaning
the fleet of devices will not inadvertently swarm the service with data.
- The service can turn off extensions in various ways to ensure that disruptive
extensions stop being enabled on subsequent connections.
- Use of extensions should have very little chance to disrupt the flow of a
critical firmware update.
"""

@callback handle_in(event :: String.t(), Phoenix.Channel.payload(), Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()}
| {:noreply, Phoenix.Socket.t(), timeout() | :hibernate}
| {:reply, Phoenix.Channel.reply(), Phoenix.Socket.t()}
| {:stop, reason :: term(), Phoenix.Socket.t()}
| {:stop, reason :: term(), Phoenix.Channel.reply(), Phoenix.Socket.t()}

@callback handle_info(msg :: term(), Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}

@callback attach(Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()}
@callback detach(Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()}

require Logger

@supported_extensions %{
health: """
Reporting of fundamental device metrics, metadata, alarms and more.
Also supports custom metrics. Alarms require an alarm handler to be set.
""",
geo: """
Reporting of GeoIP information or custom geo-location information sources
you've set up for your device.
"""
}
@type extension() :: :health | :geo

@doc """
Get list of supported extensions as atoms with descriptive text.
"""
@spec list() :: %{extension() => String.t()}
def list do
@supported_extensions
end
end
15 changes: 15 additions & 0 deletions lib/nerves_hub/extensions/extensions_setting.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule NervesHub.Extensions.ExtensionsSetting do
use Ecto.Schema
import Ecto.Changeset

@primary_key false
embedded_schema do
field(:health, :boolean, default: nil)
field(:geo, :boolean, default: nil)
end

def changeset(setting, params) do
setting
|> cast(params, [:health, :geo])
end
end
57 changes: 57 additions & 0 deletions lib/nerves_hub/extensions/geo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule NervesHub.Extensions.Geo do
@behaviour NervesHub.Extensions

alias NervesHub.Devices

@impl NervesHub.Extensions
def attach(socket) do
extension_config = Application.get_env(:nerves_hub, :extension_config)
geo_interval = get_in(extension_config, [:geo, :interval_minutes]) || 0

send(self(), {__MODULE__, :location_request})

socket =
if geo_interval > 0 do
timer =
geo_interval
|> :timer.minutes()
|> :timer.send_interval({__MODULE__, :location_request})

socket
|> Phoenix.Socket.assign(:geo_timer, timer)
|> Phoenix.Socket.assign(:geo_interval, geo_interval)
else
socket
end

{:noreply, socket}
end

@impl NervesHub.Extensions
def detach(socket) do
_ = if socket.assigns[:geo_timer], do: :timer.cancel(socket.assigns.geo_timer)
{:noreply, Phoenix.Socket.assign(socket, :geo_timer, nil)}
end

@impl NervesHub.Extensions
def handle_in("location:update", location, %{assigns: %{device: device}} = socket) do
metadata = Map.put(device.connection_metadata, "location", location)

{:ok, device} = Devices.update_device(device, %{connection_metadata: metadata})

_ =
NervesHubWeb.DeviceEndpoint.broadcast(
"device:#{device.identifier}:internal",
"location:updated",
location
)

{:noreply, Phoenix.Socket.assign(socket, :device, device)}
end

@impl NervesHub.Extensions
def handle_info(:location_request, socket) do
Phoenix.Channel.push(socket, "geo:location:request", %{})
{:noreply, socket}
end
end
113 changes: 113 additions & 0 deletions lib/nerves_hub/extensions/health.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule NervesHub.Extensions.Health do
@behaviour NervesHub.Extensions

alias NervesHub.Devices
alias NervesHub.Devices.Metrics

require Logger

# @impl NervesHub.Extensions
def init(socket) do
# Allow DB settings?
socket
end

@impl NervesHub.Extensions
def attach(socket) do
extension_config = Application.get_env(:nerves_hub, :extension_config, [])

health_interval =
case get_in(extension_config, [:health, :interval_minutes]) do
i when is_integer(i) -> i
_ -> 60
end

send(self(), {__MODULE__, :check})

socket =
if health_interval > 0 do
timer =
health_interval
|> :timer.minutes()
|> :timer.send_interval({__MODULE__, :check})

socket
|> Phoenix.Socket.assign(:health_interval, health_interval)
|> Phoenix.Socket.assign(:health_timer, timer)
else
socket
end

{:noreply, socket}
end

@impl NervesHub.Extensions
def detach(socket) do
_ = if socket.assigns[:health_timer], do: :timer.cancel(socket.assigns.health_timer)
{:noreply, Phoenix.Socket.assign(socket, :health_timer, nil)}
end

@impl NervesHub.Extensions
def handle_in("report", %{"value" => device_status}, socket) do
device_meta =
for {key, val} <- Map.from_struct(socket.assigns.device.firmware_metadata),
into: %{},
do: {to_string(key), to_string(val)}

# Separate metrics from health report to store in metrics table
metrics = device_status["metrics"]

health_report =
device_status
|> Map.delete("metrics")
|> Map.put("metadata", Map.merge(device_status["metadata"], device_meta))

device_health = %{"device_id" => socket.assigns.device.id, "data" => health_report}

with {:health_report, {:ok, _}} <-
{:health_report, Devices.save_device_health(device_health)},
{:metrics_report, {count, _}} when count >= 0 <-
{:metrics_report, Metrics.save_metrics(socket.assigns.device.id, metrics)} do
device_internal_broadcast!(socket.assigns.device, "health_check_report", %{})
else
{:health_report, {:error, err}} ->
Logger.warning("Failed to save health check data: #{inspect(err)}")
log_to_sentry(socket.assigns.device, "[DeviceChannel] Failed to save health check data.")

{:metrics_report, {:error, err}} ->
Logger.warning("Failed to save metrics: #{inspect(err)}")
log_to_sentry(socket.assigns.device, "[DeviceChannel] Failed to save metrics.")
end

{:noreply, socket}
end

@impl NervesHub.Extensions
def handle_info(:check, socket) do
Phoenix.Channel.push(socket, "health:check", %{})
{:noreply, socket}
end

defp device_internal_broadcast!(device, event, payload) do
topic = "device:#{device.identifier}:extensions"
NervesHubWeb.DeviceEndpoint.broadcast_from!(self(), topic, event, payload)
end

defp log_to_sentry(device, msg_or_ex, extra \\ %{}) do
Sentry.Context.set_tags_context(%{
device_identifier: device.identifier,
device_id: device.id,
product_id: device.product_id,
org_id: device.org_id
})

_ =
if is_exception(msg_or_ex) do
Sentry.capture_exception(msg_or_ex, extra: extra, result: :none)
else
Sentry.capture_message(msg_or_ex, extra: extra, result: :none)
end

:ok
end
end
24 changes: 24 additions & 0 deletions lib/nerves_hub/products.ex
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,28 @@ defmodule NervesHub.Products do
end
end
end

def enable_extension_setting(%Product{} = product, extension_string) do
product = Repo.get(Product, product.id)

Product.changeset(product, %{"extensions" => %{extension_string => true}})
|> Repo.update()
|> tap(fn _ ->
topic = "product:#{product.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "attach", %{"extensions" => [extension_string]})
end)
end

def disable_extension_setting(%Product{} = product, extension_string) do
product = Repo.get(Product, product.id)

Product.changeset(product, %{"extensions" => %{extension_string => false}})
|> Repo.update()
|> tap(fn _ ->
topic = "product:#{product.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "detach", %{"extensions" => [extension_string]})
end)
end
end
3 changes: 3 additions & 0 deletions lib/nerves_hub/products/product.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule NervesHub.Products.Product do
alias NervesHub.Scripts.Script
alias NervesHub.Devices.CACertificate
alias NervesHub.Devices.Device
alias NervesHub.Extensions.ExtensionsSetting
alias NervesHub.Firmwares.Firmware
alias NervesHub.Products.SharedSecretAuth

Expand All @@ -31,6 +32,7 @@ defmodule NervesHub.Products.Product do
field(:name, :string)
field(:deleted_at, :utc_datetime)
field(:delta_updatable, :boolean, default: false)
embeds_one(:extensions, ExtensionsSetting, on_replace: :update)

timestamps()
end
Expand All @@ -44,6 +46,7 @@ defmodule NervesHub.Products.Product do
def changeset(product, params) do
product
|> cast(params, @required_params ++ @optional_params)
|> cast_embed(:extensions)
|> update_change(:name, &trim/1)
|> validate_required(@required_params)
|> unique_constraint(:name, name: :products_org_id_name_index)
Expand Down
Loading

0 comments on commit ede38df

Please sign in to comment.