From d08f53d8d6c5ca1013edc522561904877858c6d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Thu, 23 May 2024 17:50:04 +0200 Subject: [PATCH] Support message containers introduced in RabbitMQ 3.13.0 This commit makes the plugin depend on the `mc` module and hence 3.13.0+ at runtime. Fixes #108 --- .github/workflows/action.yml | 8 ++-- README.md | 6 +++ lib/rabbitmq_message_deduplication/common.ex | 40 +++++++++---------- .../rabbit_message_deduplication_exchange.ex | 8 +--- .../rabbit_message_deduplication_queue.ex | 27 ++++--------- mix.exs | 3 +- 6 files changed, 41 insertions(+), 51 deletions(-) diff --git a/.github/workflows/action.yml b/.github/workflows/action.yml index 5e47b26..9d037ce 100644 --- a/.github/workflows/action.yml +++ b/.github/workflows/action.yml @@ -2,15 +2,15 @@ name: build on: [push, pull_request] jobs: build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 strategy: matrix: erlang: - - 1:25.3-1 + - 1:26.2-1 elixir: - - 1.13.4-1 + - 1.14.5-1 rmqref: - - v3.12.x + - v3.13.x steps: - uses: actions/checkout@v2 - name: Install Erlang and Elixir diff --git a/README.md b/README.md index 05cb3b8..a549eb7 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,12 @@ Then copy all the *.ez files inside the plugins folder to the [RabbitMQ plugins [sudo] rabbitmq-plugins enable rabbitmq_message_deduplication ``` +## Version requirements + +The latest version of the plugin requires RabbitMQ 3.13.0. + +Earlier RabbitMQ versions are supported by 0.6.2. + ## Exchange level deduplication The exchange type `x-message-deduplication` allows to filter message duplicates before any routing rule is applied. diff --git a/lib/rabbitmq_message_deduplication/common.ex b/lib/rabbitmq_message_deduplication/common.ex index ed235b4..715ec87 100644 --- a/lib/rabbitmq_message_deduplication/common.ex +++ b/lib/rabbitmq_message_deduplication/common.ex @@ -16,19 +16,9 @@ defmodule RabbitMQMessageDeduplication.Common do require RabbitMQMessageDeduplication.Cache - alias :rabbit_binary_parser, as: RabbitBinaryParser + alias :mc, as: MC alias RabbitMQMessageDeduplication.Cache, as: Cache - defrecord :content, extract( - :content, from_lib: "rabbit_common/include/rabbit.hrl") - - @type basic_message :: record(:basic_message) - defrecord :basic_message, extract( - :basic_message, from_lib: "rabbit_common/include/rabbit.hrl") - - defrecord :basic_properties, :P_basic, extract( - :P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl") - @default_arguments %{type: nil, default: nil} @doc """ @@ -55,14 +45,24 @@ defmodule RabbitMQMessageDeduplication.Common do @doc """ Retrieve the given header from the message. """ - @spec message_header(basic_message, String.t) :: String.t | nil - def message_header(basic_message(content: message_content), header) do - message_content = RabbitBinaryParser.ensure_content_decoded(message_content) - - case content(message_content, :properties) do - basic_properties(headers: headers) when is_list(headers) -> - rabbit_keyfind(headers, header) - basic_properties(headers: :undefined) -> nil + @spec message_header(MC.state, String.t) :: String.t | integer() | float() | boolean() | :undefined | nil + def message_header(message, header) do + case MC.x_header(header, message) do + {_type, value} when not is_list(value) and not is_tuple(value) -> + # list and tuple values have type-tagged elements + # that would need to be untagged recursively + # we don't expect to use such headers, so those cases are not handled + value + :null -> + # header value in AMQP message was {:void, :undefined} + + # pre-3.13 version of this function used rabbit_keyfind/2 + # which returned :undefined instead of nil or :void. We have to + # keep this value as this is used in keys to cache the message + # and is preserved during a rolling upgrade in a replicated + # Mnesia table + :undefined + :undefined -> nil end end @@ -71,7 +71,7 @@ defmodule RabbitMQMessageDeduplication.Common do If not, it adds it to the cache with the corresponding name. """ - @spec duplicate?(tuple, basic_message, integer | nil) :: boolean + @spec duplicate?(tuple, MC.state, integer | nil) :: boolean def duplicate?(name, message, ttl \\ nil) do cache = cache_name(name) diff --git a/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex b/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex index a034cc1..c51fa05 100644 --- a/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex +++ b/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex @@ -53,12 +53,6 @@ defmodule RabbitMQMessageDeduplication.Exchange do defrecord :exchange, extract( :exchange, from_lib: "rabbit_common/include/rabbit.hrl") - defrecord :delivery, extract( - :delivery, from_lib: "rabbit_common/include/rabbit.hrl") - - defrecord :basic_message, extract( - :basic_message, from_lib: "rabbit_common/include/rabbit.hrl") - @doc """ Register the exchange type within the Broker. """ @@ -90,7 +84,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do end @impl :rabbit_exchange_type - def route(exchange(name: name), delivery(message: msg = basic_message())) do + def route(exchange(name: name), msg, _opts) do if route?(name, msg) do RabbitRouter.match_routing_key(name, [:_]) else diff --git a/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex b/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex index 4d7709e..e73870a 100644 --- a/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex +++ b/lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex @@ -22,7 +22,7 @@ defmodule RabbitMQMessageDeduplication.Queue do """ - import Record, only: [defrecord: 2, defrecord: 3, extract: 2] + import Record, only: [defrecord: 2] require RabbitMQMessageDeduplication.Cache require RabbitMQMessageDeduplication.Common @@ -30,6 +30,7 @@ defmodule RabbitMQMessageDeduplication.Queue do alias :amqqueue, as: AMQQueue alias :rabbit_log, as: RabbitLog alias :rabbit_amqqueue, as: RabbitQueue + alias :mc, as: MC alias RabbitMQMessageDeduplication.Common, as: Common alias RabbitMQMessageDeduplication.Cache, as: Cache alias RabbitMQMessageDeduplication.CacheManager, as: CacheManager @@ -46,15 +47,6 @@ defmodule RabbitMQMessageDeduplication.Queue do {:requires, :kernel_ready}, {:enables, :core_initialized}]} - defrecord :content, extract( - :content, from_lib: "rabbit_common/include/rabbit.hrl") - - defrecord :basic_message, extract( - :basic_message, from_lib: "rabbit_common/include/rabbit.hrl") - - defrecord :basic_properties, :P_basic, extract( - :P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl") - defrecord :dqack, [:tag, :header] defrecord :dqstate, [:queue, :queue_state, dedup_enabled: false] @@ -286,8 +278,9 @@ defmodule RabbitMQMessageDeduplication.Queue do if dedup_queue?(state) do case fetch(need_ack, state) do {:empty, state} -> {:empty, state} - {{message = basic_message(id: id), _, ack_tag}, state} -> + {{message, _, ack_tag}, state} -> maybe_delete_cache_entry(queue, message) + id = MC.get_annotation(:id, message) {{id, ack_tag}, state} end @@ -521,7 +514,7 @@ defmodule RabbitMQMessageDeduplication.Queue do end # Returns true if the message is a duplicate. - defp duplicate?(queue, message = basic_message()) do + defp duplicate?(queue, message) do name = AMQQueue.get_name(queue) if Common.duplicate?(name, message, message_expiration(message)) do @@ -533,18 +526,14 @@ defmodule RabbitMQMessageDeduplication.Queue do # Returns the expiration property of the given message defp message_expiration(message) do - basic_message(content: content(properties: properties)) = message - - case properties do - basic_properties(expiration: ttl) when is_bitstring(ttl) -> - String.to_integer(ttl) - basic_properties(expiration: :undefined) -> nil + case MC.ttl(message) do :undefined -> nil + ttl -> ttl end end # Removes the message deduplication header from the cache - defp maybe_delete_cache_entry(queue, msg = basic_message()) do + defp maybe_delete_cache_entry(queue, msg) when is_tuple(msg) do header = Common.message_header(msg, "x-deduplication-header") maybe_delete_cache_entry(queue, header) end diff --git a/mix.exs b/mix.exs index 05ac6b1..66d8ca8 100644 --- a/mix.exs +++ b/mix.exs @@ -18,7 +18,8 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Mixfile do applications: [:mnesia], extra_applications: [:rabbit], mod: {RabbitMQMessageDeduplication, []}, - registered: [RabbitMQMessageDeduplication] + registered: [RabbitMQMessageDeduplication], + broker_version_requirements: ["3.13.0"] ] end