Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add device extensions support #1479

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ config :nerves_hub,
username: System.get_env("ADMIN_AUTH_USERNAME"),
password: System.get_env("ADMIN_AUTH_PASSWORD")
],
device_health_check_enabled: System.get_env("DEVICE_HEALTH_CHECK_ENABLED", "true") == "true",
device_health_check_interval_minutes:
String.to_integer(System.get_env("DEVICE_HEALTH_CHECK_INTERVAL_MINUTES", "60")),
device_health_days_to_retain:
String.to_integer(System.get_env("HEALTH_CHECK_DAYS_TO_RETAIN", "7")),
device_deployment_change_jitter_seconds:
Expand All @@ -37,7 +34,18 @@ config :nerves_hub,
deployment_calculator_interval_seconds:
String.to_integer(System.get_env("DEPLOYMENT_CALCULATOR_INTERVAL_SECONDS", "3600")),
mapbox_access_token: System.get_env("MAPBOX_ACCESS_TOKEN"),
dashboard_enabled: System.get_env("DASHBOARD_ENABLED", "false") == "true"
dashboard_enabled: System.get_env("DASHBOARD_ENABLED", "false") == "true",
extension_config: [
geo: [
# No interval, fetch geo on device connection by default
interval_minutes:
System.get_env("FEATURES_GEO_INTERVAL_MINUTES", "0") |> String.to_integer()
],
health: [
interval_minutes:
System.get_env("FEATURES_HEALTH_INTERVAL_MINUTES", "60") |> String.to_integer()
]
]

config :nerves_hub, :device_socket_drainer,
batch_size: String.to_integer(System.get_env("DEVICE_SOCKET_DRAINER_BATCH_SIZE", "1000")),
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ config :nerves_hub, NervesHub.SwooshMailer, adapter: Swoosh.Adapters.Test

config :nerves_hub, NervesHub.RateLimit, limit: 100

config :sentry, environment_name: :test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? Were we reporting errors to Sentry from tests before? 😨


config :phoenix_test, :endpoint, NervesHubWeb.Endpoint

# Initialize plugs at runtime for faster test compilation
Expand Down
29 changes: 29 additions & 0 deletions lib/nerves_hub/devices.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1383,4 +1383,33 @@ defmodule NervesHub.Devices do
"pending"
end
end

def enable_extension_setting(%Device{} = device, extension_string) do
device = Repo.get(Device, device.id)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we fetching the device if we are requiring it as an arg?


Device.changeset(device, %{"extensions" => %{extension_string => true}})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually work if I have multiple extensions? To me it looks like it will completely overwrite all extension settings with just this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not, embeds_one raises by default on replace. I think an on_replace: :update will work here.

|> Repo.update()
|> tap(fn _ ->
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we will need to check that the update was successful before broadcasting, otherwise we will be out of sync with the setting

topic = "device:#{device.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "attach", %{"extensions" => [extension_string]})
end)
end

def disable_extension_setting(%Device{} = device, extension_string) do
device = Repo.get(Device, device.id)

Device.changeset(device, %{"extensions" => %{extension_string => false}})
|> Repo.update()
|> tap(fn _ ->
topic = "device:#{device.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "detach", %{"extensions" => [extension_string]})
end)
end

def preload_product(%Device{} = device) do
device
|> Repo.preload(:product)
end
end
3 changes: 3 additions & 0 deletions lib/nerves_hub/devices/device.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule NervesHub.Devices.Device do
alias NervesHub.Devices.DeviceCertificate
alias NervesHub.Devices.DeviceConnection
alias NervesHub.Deployments.Deployment
alias NervesHub.Extensions.ExtensionsSetting
alias NervesHub.Firmwares.FirmwareMetadata
alias NervesHub.Products.Product

Expand Down Expand Up @@ -71,12 +72,14 @@ defmodule NervesHub.Devices.Device do
field(:connection_established_at, :utc_datetime)
field(:connection_disconnected_at, :utc_datetime)
field(:connection_last_seen_at, :utc_datetime)
embeds_one(:extensions, ExtensionsSetting)
end

def changeset(%Device{} = device, params) do
device
|> cast(params, @required_params ++ @optional_params)
|> cast_embed(:firmware_metadata)
|> cast_embed(:extensions)
|> validate_required(@required_params)
|> validate_length(:tags, min: 1)
|> unique_constraint(:identifier)
Expand Down
1 change: 1 addition & 0 deletions lib/nerves_hub/devices/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule NervesHub.Devices.Metrics do

@default_metric_types [
:cpu_temp,
:cpu_usage_percent,
:load_15min,
:load_1min,
:load_5min,
Expand Down
52 changes: 52 additions & 0 deletions lib/nerves_hub/extensions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule NervesHub.Extensions do
@moduledoc """
An "extension" is an additional piece of functionality that we add onto the
existing connection between the device and the NervesHub service. They are
designed to be less important than firmware updates and requires both client
to report support and the server to enable support.

This is intended to ensure that:

- The service decides when activity should be taken by the device meaning
the fleet of devices will not inadvertently swarm the service with data.
- The service can turn off extensions in various ways to ensure that disruptive
extensions stop being enabled on subsequent connections.
- Use of extensions should have very little chance to disrupt the flow of a
critical firmware update.
"""

@callback handle_in(event :: String.t(), Phoenix.Channel.payload(), Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()}
| {:noreply, Phoenix.Socket.t(), timeout() | :hibernate}
| {:reply, Phoenix.Channel.reply(), Phoenix.Socket.t()}
| {:stop, reason :: term(), Phoenix.Socket.t()}
| {:stop, reason :: term(), Phoenix.Channel.reply(), Phoenix.Socket.t()}

@callback handle_info(msg :: term(), Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} | {:stop, reason :: term(), Phoenix.Socket.t()}

@callback attach(Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()}
@callback detach(Phoenix.Socket.t()) :: {:noreply, Phoenix.Socket.t()}

require Logger

@supported_extensions %{
health: """
Reporting of fundamental device metrics, metadata, alarms and more.
Also supports custom metrics. Alarms require an alarm handler to be set.
""",
geo: """
Reporting of GeoIP information or custom geo-location information sources
you've set up for your device.
"""
}
Comment on lines +33 to +42
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a behavior module, I feel like we should keep as much of this in the extension as possible. Maybe we need a description callback for the module to define it?

@type extension() :: :health | :geo

@doc """
Get list of supported extensions as atoms with descriptive text.
"""
@spec list() :: %{extension() => String.t()}
def list do
@supported_extensions
end
end
15 changes: 15 additions & 0 deletions lib/nerves_hub/extensions/extensions_setting.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule NervesHub.Extensions.ExtensionsSetting do
use Ecto.Schema
import Ecto.Changeset

@primary_key false
embedded_schema do
field(:health, :boolean, default: nil)
field(:geo, :boolean, default: nil)
end

def changeset(setting, params) do
setting
|> cast(params, [:health, :geo])
end
end
57 changes: 57 additions & 0 deletions lib/nerves_hub/extensions/geo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule NervesHub.Extensions.Geo do
@behaviour NervesHub.Extensions

alias NervesHub.Devices

@impl NervesHub.Extensions
def attach(socket) do
extension_config = Application.get_env(:nerves_hub, :extension_config)
geo_interval = get_in(extension_config, [:geo, :interval_minutes]) || 0

send(self(), {__MODULE__, :location_request})

socket =
if geo_interval > 0 do
timer =
geo_interval
|> :timer.minutes()
|> :timer.send_interval({__MODULE__, :location_request})

socket
|> Phoenix.Socket.assign(:geo_timer, timer)
|> Phoenix.Socket.assign(:geo_interval, geo_interval)
else
socket
end

{:noreply, socket}
end

@impl NervesHub.Extensions
def detach(socket) do
_ = if socket.assigns[:geo_timer], do: :timer.cancel(socket.assigns.geo_timer)
{:noreply, Phoenix.Socket.assign(socket, :geo_timer, nil)}
end

@impl NervesHub.Extensions
def handle_in("location:update", location, %{assigns: %{device: device}} = socket) do
metadata = Map.put(device.connection_metadata, "location", location)

{:ok, device} = Devices.update_device(device, %{connection_metadata: metadata})

_ =
NervesHubWeb.DeviceEndpoint.broadcast(
"device:#{device.identifier}:internal",
"location:updated",
location
)

{:noreply, Phoenix.Socket.assign(socket, :device, device)}
end

@impl NervesHub.Extensions
def handle_info(:location_request, socket) do
Phoenix.Channel.push(socket, "geo:location:request", %{})
{:noreply, socket}
end
end
113 changes: 113 additions & 0 deletions lib/nerves_hub/extensions/health.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule NervesHub.Extensions.Health do
@behaviour NervesHub.Extensions

alias NervesHub.Devices
alias NervesHub.Devices.Metrics

require Logger

# @impl NervesHub.Extensions
def init(socket) do
# Allow DB settings?
socket
end
Comment on lines +9 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cruft?


@impl NervesHub.Extensions
def attach(socket) do
extension_config = Application.get_env(:nerves_hub, :extension_config, [])

health_interval =
case get_in(extension_config, [:health, :interval_minutes]) do
i when is_integer(i) -> i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed if we're calling String.to_integer() in runtime.exs?

_ -> 60
end

send(self(), {__MODULE__, :check})

socket =
if health_interval > 0 do
timer =
health_interval
|> :timer.minutes()
|> :timer.send_interval({__MODULE__, :check})

socket
|> Phoenix.Socket.assign(:health_interval, health_interval)
|> Phoenix.Socket.assign(:health_timer, timer)
else
socket
end

{:noreply, socket}
end

@impl NervesHub.Extensions
def detach(socket) do
_ = if socket.assigns[:health_timer], do: :timer.cancel(socket.assigns.health_timer)
{:noreply, Phoenix.Socket.assign(socket, :health_timer, nil)}
end

@impl NervesHub.Extensions
def handle_in("report", %{"value" => device_status}, socket) do
device_meta =
for {key, val} <- Map.from_struct(socket.assigns.device.firmware_metadata),
into: %{},
do: {to_string(key), to_string(val)}

# Separate metrics from health report to store in metrics table
metrics = device_status["metrics"]

health_report =
device_status
|> Map.delete("metrics")
|> Map.put("metadata", Map.merge(device_status["metadata"], device_meta))

device_health = %{"device_id" => socket.assigns.device.id, "data" => health_report}

with {:health_report, {:ok, _}} <-
{:health_report, Devices.save_device_health(device_health)},
{:metrics_report, {count, _}} when count >= 0 <-
{:metrics_report, Metrics.save_metrics(socket.assigns.device.id, metrics)} do
device_internal_broadcast!(socket.assigns.device, "health_check_report", %{})
else
{:health_report, {:error, err}} ->
Logger.warning("Failed to save health check data: #{inspect(err)}")
log_to_sentry(socket.assigns.device, "[DeviceChannel] Failed to save health check data.")

{:metrics_report, {:error, err}} ->
Logger.warning("Failed to save metrics: #{inspect(err)}")
log_to_sentry(socket.assigns.device, "[DeviceChannel] Failed to save metrics.")
end

{:noreply, socket}
end

@impl NervesHub.Extensions
def handle_info(:check, socket) do
Phoenix.Channel.push(socket, "health:check", %{})
{:noreply, socket}
end

defp device_internal_broadcast!(device, event, payload) do
topic = "device:#{device.identifier}:extensions"
NervesHubWeb.DeviceEndpoint.broadcast_from!(self(), topic, event, payload)
end

defp log_to_sentry(device, msg_or_ex, extra \\ %{}) do
Sentry.Context.set_tags_context(%{
device_identifier: device.identifier,
device_id: device.id,
product_id: device.product_id,
org_id: device.org_id
})

_ =
if is_exception(msg_or_ex) do
Sentry.capture_exception(msg_or_ex, extra: extra, result: :none)
else
Sentry.capture_message(msg_or_ex, extra: extra, result: :none)
end

:ok
end
end
24 changes: 24 additions & 0 deletions lib/nerves_hub/products.ex
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,28 @@ defmodule NervesHub.Products do
end
end
end

def enable_extension_setting(%Product{} = product, extension_string) do
product = Repo.get(Product, product.id)

Product.changeset(product, %{"extensions" => %{extension_string => true}})
|> Repo.update()
|> tap(fn _ ->
topic = "product:#{product.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "attach", %{"extensions" => [extension_string]})
end)
end

def disable_extension_setting(%Product{} = product, extension_string) do
product = Repo.get(Product, product.id)

Product.changeset(product, %{"extensions" => %{extension_string => false}})
|> Repo.update()
|> tap(fn _ ->
topic = "product:#{product.id}:extensions"

NervesHubWeb.DeviceEndpoint.broadcast(topic, "detach", %{"extensions" => [extension_string]})
end)
Comment on lines +273 to +294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on refactoring this into a toggle_extension_setting vs handling each condition explicitly?

end
end
3 changes: 3 additions & 0 deletions lib/nerves_hub/products/product.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule NervesHub.Products.Product do
alias NervesHub.Scripts.Script
alias NervesHub.Devices.CACertificate
alias NervesHub.Devices.Device
alias NervesHub.Extensions.ExtensionsSetting
alias NervesHub.Firmwares.Firmware
alias NervesHub.Products.SharedSecretAuth

Expand All @@ -31,6 +32,7 @@ defmodule NervesHub.Products.Product do
field(:name, :string)
field(:deleted_at, :utc_datetime)
field(:delta_updatable, :boolean, default: false)
embeds_one(:extensions, ExtensionsSetting, on_replace: :update)

timestamps()
end
Expand All @@ -44,6 +46,7 @@ defmodule NervesHub.Products.Product do
def changeset(product, params) do
product
|> cast(params, @required_params ++ @optional_params)
|> cast_embed(:extensions)
|> update_change(:name, &trim/1)
|> validate_required(@required_params)
|> unique_constraint(:name, name: :products_org_id_name_index)
Expand Down
Loading
Loading