Skip to content

Commit

Permalink
Implement datagram buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
svsool committed Nov 3, 2024
1 parent 7db6882 commit 61c9c12
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 20 deletions.
40 changes: 36 additions & 4 deletions lib/telemetry_metrics_statsd.ex
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,18 @@ defmodule TelemetryMetricsStatsd do
end
end

def get_udp_worker(pool_id) do
case :ets.lookup(pool_id, :udp_worker) do
[] ->
Logger.error("Failed to publish metrics over UDP: no workers available.")
:error

udp_workers ->
{:udp_worker, udp_worker} = Enum.random(udp_workers)
{:ok, udp_worker}
end
end

@doc false
@spec get_pool_id(pid()) :: :ets.tid()
def get_pool_id(reporter) do
Expand Down Expand Up @@ -432,7 +444,22 @@ defmodule TelemetryMetricsStatsd do
end

pool_id = :ets.new(__MODULE__, [:bag, :protected, read_concurrency: true])

udp_workers =
for _ <- 1..options.pool_size do
{:ok, worker_pid} =
TelemetryMetricsStatsd.UDPWorker.start_link(
repoter: self(),
pool_id: pool_id,
max_datagram_size: options.mtu,
buffer_flush_ms: options.buffer_flush_ms
)

{:udp_worker, worker_pid}
end

:ets.insert(pool_id, udps)
:ets.insert(pool_id, udp_workers)

handler_ids =
EventHandler.attach(
Expand Down Expand Up @@ -567,10 +594,15 @@ defmodule TelemetryMetricsStatsd do
defp update_pool(pool_id, new_host, new_port) do
pool_id
|> :ets.tab2list()
|> Enum.each(fn {:udp, udp} ->
:ets.delete_object(pool_id, {:udp, udp})
updated_udp = UDP.update(udp, new_host, new_port)
:ets.insert(pool_id, {:udp, updated_udp})
|> Enum.each(fn item ->
case item do
{:udp, udp} ->
:ets.delete_object(pool_id, {:udp, udp})
updated_udp = UDP.update(udp, new_host, new_port)
:ets.insert(pool_id, {:udp, updated_udp})

_ -> :ok
end
end)
end
end
20 changes: 5 additions & 15 deletions lib/telemetry_metrics_statsd/event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ defmodule TelemetryMetricsStatsd.EventHandler do
:ok

packets ->
publish_metrics(reporter, pool_id, Packet.build_packets(packets, mtu, "\n"))
schedule_metrics_publish(reporter, pool_id, packets)
end
end

Expand Down Expand Up @@ -128,20 +128,10 @@ defmodule TelemetryMetricsStatsd.EventHandler do
end
end

@spec publish_metrics(pid(), :ets.tid(), [binary()]) :: :ok
defp publish_metrics(reporter, pool_id, packets) do
case TelemetryMetricsStatsd.get_udp(pool_id) do
{:ok, udp} ->
Enum.reduce_while(packets, :cont, fn packet, :cont ->
case UDP.send(udp, packet) do
:ok ->
{:cont, :cont}

{:error, reason} ->
TelemetryMetricsStatsd.udp_error(reporter, udp, reason)
{:halt, :halt}
end
end)
defp schedule_metrics_publish(reporter, pool_id, packets) do
case TelemetryMetricsStatsd.get_udp_worker(pool_id) do
{:ok, udp_worker_pid} ->
TelemetryMetricsStatsd.UDPWorker.publish_datagrams(udp_worker_pid, packets)

:error ->
:ok
Expand Down
8 changes: 7 additions & 1 deletion lib/telemetry_metrics_statsd/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ defmodule TelemetryMetricsStatsd.Options do
],
mtu: [
type: :non_neg_integer,
default: 512,
default: 1432,
doc:
"Maximum Transmission Unit of the link between your application and the StastD server in bytes. " <>
"If this value is greater than the actual MTU of the link, UDP packets with published metrics will be dropped."
],
buffer_flush_ms: [
type: :non_neg_integer,
default: 0,
doc:
"The maximum time in milliseconds to wait before flushing the buffer. If the buffer is not full, it will be flushed after this time."
]
]

Expand Down
104 changes: 104 additions & 0 deletions lib/telemetry_metrics_statsd/udp_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
defmodule TelemetryMetricsStatsd.UDPWorker do
@moduledoc false

use GenServer

alias TelemetryMetricsStatsd.{UDP, Packet}

@default_buffer_flush_ms 1000
@default_max_datagram_size 1432

defstruct [
:repoter,
:pool_id,
:buffered_datagram,
:buffer_flush_ms,
:max_datagram_size
]

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

@impl true
def init(opts) do
buffer_flush_ms = opts[:buffer_flush_ms] || @default_buffer_flush_ms

state = %__MODULE__{
repoter: opts[:repoter],
pool_id: opts[:pool_id],
buffered_datagram: [],
buffer_flush_ms: buffer_flush_ms,
max_datagram_size: opts[:max_datagram_size] || @default_max_datagram_size
}

schedule_flush(state)

{:ok, state}
end

def publish_datagrams(pid, datagrams) do
GenServer.call(pid, {:publish_datagrams, datagrams})
end

def handle_call({:publish_datagrams, datagrams}, from, state) do
new_buffered_datagrams =
Enum.reduce(datagrams, state.buffered_datagram, &do_append_datagram/2)

cond do
state.buffer_flush_ms == 0 ->
for packet <- Packet.build_packets(datagrams, state.max_datagram_size, "\n") do
maybe_send_udp_datagrams(state, packet)
end

{:reply, :ok, %{state | buffered_datagram: []}}

Enum.any?(datagrams, fn datagram -> IO.iodata_length(datagram) > state.max_datagram_size end) ->
{:reply,
{:error, "Payload is too big (more than #{state.max_datagram_size} bytes), dropped."},
state}

IO.iodata_length(new_buffered_datagrams) > state.max_datagram_size ->
maybe_send_udp_datagrams(state)

{:reply, :ok, %{state | buffered_datagram: datagrams}}

true ->
{:reply, :ok, %{state | buffered_datagram: new_buffered_datagrams}}
end
end

defp maybe_send_udp_datagrams(state, datagrams \\ nil) do
case TelemetryMetricsStatsd.get_udp(state.pool_id) do
{:ok, udp} ->
case UDP.send(udp, datagrams || state.buffered_datagram) do
:ok -> :ok

{:error, reason} ->
TelemetryMetricsStatsd.udp_error(state.reporter, udp, reason)
end

:error ->
:ok
end
end

@impl true
def handle_info(:buffer_flush, state) do
if state.buffered_datagram != [] do
maybe_send_udp_datagrams(state)
end

schedule_flush(state)

{:noreply, %{state | buffered_datagram: []}}
end

defp schedule_flush(%{buffer_flush_ms: buffer_flush_ms}) when buffer_flush_ms > 0,
do: Process.send_after(self(), :buffer_flush, buffer_flush_ms)

defp schedule_flush(_), do: :ok

defp do_append_datagram([], first_datagram), do: first_datagram
defp do_append_datagram(current_buffer, datagram), do: [current_buffer, "\n", datagram]
end
13 changes: 13 additions & 0 deletions test/telemetry_metrics_statsd_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,19 @@ defmodule TelemetryMetricsStatsdTest do
assert udp.host == {127, 0, 0, 1}
end)
end

test "it buffers packets when buffer_flush_ms is non-zero" do
{socket, port} = given_udp_port_opened()
counter = given_counter("http.requests", event_name: "http.request")

start_reporter(metrics: [counter], port: port, buffer_flush_ms: 10, pool_size: 1)

:telemetry.execute([:http, :request], %{latency: 211})
:telemetry.execute([:http, :request], %{latency: 200})
:telemetry.execute([:http, :request], %{latency: 198})

assert_reported(socket, "http.requests:1|c\nhttp.requests:1|c\nhttp.requests:1|c\n")
end
end

defp given_udp_port_opened(inet_address_family \\ :inet) do
Expand Down

0 comments on commit 61c9c12

Please sign in to comment.