From 61c9c12e8106468fa4130e7c929fc44e8593c15c Mon Sep 17 00:00:00 2001 From: Svyat Sobol Date: Sun, 3 Nov 2024 16:24:35 +0200 Subject: [PATCH] Implement datagram buffering --- lib/telemetry_metrics_statsd.ex | 40 ++++++- lib/telemetry_metrics_statsd/event_handler.ex | 20 +--- lib/telemetry_metrics_statsd/options.ex | 8 +- lib/telemetry_metrics_statsd/udp_worker.ex | 104 ++++++++++++++++++ test/telemetry_metrics_statsd_test.exs | 13 +++ 5 files changed, 165 insertions(+), 20 deletions(-) create mode 100644 lib/telemetry_metrics_statsd/udp_worker.ex diff --git a/lib/telemetry_metrics_statsd.ex b/lib/telemetry_metrics_statsd.ex index 9ca8821..12cc755 100644 --- a/lib/telemetry_metrics_statsd.ex +++ b/lib/telemetry_metrics_statsd.ex @@ -402,6 +402,18 @@ defmodule TelemetryMetricsStatsd do end end + def get_udp_worker(pool_id) do + case :ets.lookup(pool_id, :udp_worker) do + [] -> + Logger.error("Failed to publish metrics over UDP: no workers available.") + :error + + udp_workers -> + {:udp_worker, udp_worker} = Enum.random(udp_workers) + {:ok, udp_worker} + end + end + @doc false @spec get_pool_id(pid()) :: :ets.tid() def get_pool_id(reporter) do @@ -432,7 +444,22 @@ defmodule TelemetryMetricsStatsd do end pool_id = :ets.new(__MODULE__, [:bag, :protected, read_concurrency: true]) + + udp_workers = + for _ <- 1..options.pool_size do + {:ok, worker_pid} = + TelemetryMetricsStatsd.UDPWorker.start_link( + repoter: self(), + pool_id: pool_id, + max_datagram_size: options.mtu, + buffer_flush_ms: options.buffer_flush_ms + ) + + {:udp_worker, worker_pid} + end + :ets.insert(pool_id, udps) + :ets.insert(pool_id, udp_workers) handler_ids = EventHandler.attach( @@ -567,10 +594,15 @@ defmodule TelemetryMetricsStatsd do defp update_pool(pool_id, new_host, new_port) do pool_id |> :ets.tab2list() - |> Enum.each(fn {:udp, udp} -> - :ets.delete_object(pool_id, {:udp, udp}) - updated_udp = UDP.update(udp, new_host, new_port) - :ets.insert(pool_id, {:udp, updated_udp}) + |> Enum.each(fn item -> + case item do + {:udp, udp} -> + :ets.delete_object(pool_id, {:udp, udp}) + updated_udp = UDP.update(udp, new_host, new_port) + :ets.insert(pool_id, {:udp, updated_udp}) + + _ -> :ok + end end) end end diff --git a/lib/telemetry_metrics_statsd/event_handler.ex b/lib/telemetry_metrics_statsd/event_handler.ex index f642c7b..0311bf6 100644 --- a/lib/telemetry_metrics_statsd/event_handler.ex +++ b/lib/telemetry_metrics_statsd/event_handler.ex @@ -79,7 +79,7 @@ defmodule TelemetryMetricsStatsd.EventHandler do :ok packets -> - publish_metrics(reporter, pool_id, Packet.build_packets(packets, mtu, "\n")) + schedule_metrics_publish(reporter, pool_id, packets) end end @@ -128,20 +128,10 @@ defmodule TelemetryMetricsStatsd.EventHandler do end end - @spec publish_metrics(pid(), :ets.tid(), [binary()]) :: :ok - defp publish_metrics(reporter, pool_id, packets) do - case TelemetryMetricsStatsd.get_udp(pool_id) do - {:ok, udp} -> - Enum.reduce_while(packets, :cont, fn packet, :cont -> - case UDP.send(udp, packet) do - :ok -> - {:cont, :cont} - - {:error, reason} -> - TelemetryMetricsStatsd.udp_error(reporter, udp, reason) - {:halt, :halt} - end - end) + defp schedule_metrics_publish(reporter, pool_id, packets) do + case TelemetryMetricsStatsd.get_udp_worker(pool_id) do + {:ok, udp_worker_pid} -> + TelemetryMetricsStatsd.UDPWorker.publish_datagrams(udp_worker_pid, packets) :error -> :ok diff --git a/lib/telemetry_metrics_statsd/options.ex b/lib/telemetry_metrics_statsd/options.ex index b950355..8484edf 100644 --- a/lib/telemetry_metrics_statsd/options.ex +++ b/lib/telemetry_metrics_statsd/options.ex @@ -61,10 +61,16 @@ defmodule TelemetryMetricsStatsd.Options do ], mtu: [ type: :non_neg_integer, - default: 512, + default: 1432, doc: "Maximum Transmission Unit of the link between your application and the StastD server in bytes. " <> "If this value is greater than the actual MTU of the link, UDP packets with published metrics will be dropped." + ], + buffer_flush_ms: [ + type: :non_neg_integer, + default: 0, + doc: + "The maximum time in milliseconds to wait before flushing the buffer. If the buffer is not full, it will be flushed after this time." ] ] diff --git a/lib/telemetry_metrics_statsd/udp_worker.ex b/lib/telemetry_metrics_statsd/udp_worker.ex new file mode 100644 index 0000000..8a0cf3e --- /dev/null +++ b/lib/telemetry_metrics_statsd/udp_worker.ex @@ -0,0 +1,104 @@ +defmodule TelemetryMetricsStatsd.UDPWorker do + @moduledoc false + + use GenServer + + alias TelemetryMetricsStatsd.{UDP, Packet} + + @default_buffer_flush_ms 1000 + @default_max_datagram_size 1432 + + defstruct [ + :repoter, + :pool_id, + :buffered_datagram, + :buffer_flush_ms, + :max_datagram_size + ] + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl true + def init(opts) do + buffer_flush_ms = opts[:buffer_flush_ms] || @default_buffer_flush_ms + + state = %__MODULE__{ + repoter: opts[:repoter], + pool_id: opts[:pool_id], + buffered_datagram: [], + buffer_flush_ms: buffer_flush_ms, + max_datagram_size: opts[:max_datagram_size] || @default_max_datagram_size + } + + schedule_flush(state) + + {:ok, state} + end + + def publish_datagrams(pid, datagrams) do + GenServer.call(pid, {:publish_datagrams, datagrams}) + end + + def handle_call({:publish_datagrams, datagrams}, from, state) do + new_buffered_datagrams = + Enum.reduce(datagrams, state.buffered_datagram, &do_append_datagram/2) + + cond do + state.buffer_flush_ms == 0 -> + for packet <- Packet.build_packets(datagrams, state.max_datagram_size, "\n") do + maybe_send_udp_datagrams(state, packet) + end + + {:reply, :ok, %{state | buffered_datagram: []}} + + Enum.any?(datagrams, fn datagram -> IO.iodata_length(datagram) > state.max_datagram_size end) -> + {:reply, + {:error, "Payload is too big (more than #{state.max_datagram_size} bytes), dropped."}, + state} + + IO.iodata_length(new_buffered_datagrams) > state.max_datagram_size -> + maybe_send_udp_datagrams(state) + + {:reply, :ok, %{state | buffered_datagram: datagrams}} + + true -> + {:reply, :ok, %{state | buffered_datagram: new_buffered_datagrams}} + end + end + + defp maybe_send_udp_datagrams(state, datagrams \\ nil) do + case TelemetryMetricsStatsd.get_udp(state.pool_id) do + {:ok, udp} -> + case UDP.send(udp, datagrams || state.buffered_datagram) do + :ok -> :ok + + {:error, reason} -> + TelemetryMetricsStatsd.udp_error(state.reporter, udp, reason) + end + + :error -> + :ok + end + end + + @impl true + def handle_info(:buffer_flush, state) do + if state.buffered_datagram != [] do + maybe_send_udp_datagrams(state) + end + + schedule_flush(state) + + {:noreply, %{state | buffered_datagram: []}} + end + + defp schedule_flush(%{buffer_flush_ms: buffer_flush_ms}) when buffer_flush_ms > 0, + do: Process.send_after(self(), :buffer_flush, buffer_flush_ms) + + defp schedule_flush(_), do: :ok + + defp do_append_datagram([], first_datagram), do: first_datagram + defp do_append_datagram(current_buffer, datagram), do: [current_buffer, "\n", datagram] +end diff --git a/test/telemetry_metrics_statsd_test.exs b/test/telemetry_metrics_statsd_test.exs index cda5a5b..3dce238 100644 --- a/test/telemetry_metrics_statsd_test.exs +++ b/test/telemetry_metrics_statsd_test.exs @@ -742,6 +742,19 @@ defmodule TelemetryMetricsStatsdTest do assert udp.host == {127, 0, 0, 1} end) end + + test "it buffers packets when buffer_flush_ms is non-zero" do + {socket, port} = given_udp_port_opened() + counter = given_counter("http.requests", event_name: "http.request") + + start_reporter(metrics: [counter], port: port, buffer_flush_ms: 10, pool_size: 1) + + :telemetry.execute([:http, :request], %{latency: 211}) + :telemetry.execute([:http, :request], %{latency: 200}) + :telemetry.execute([:http, :request], %{latency: 198}) + + assert_reported(socket, "http.requests:1|c\nhttp.requests:1|c\nhttp.requests:1|c\n") + end end defp given_udp_port_opened(inet_address_family \\ :inet) do