From 52e261a07bb1a2e8e98ccb223758c8d726bd2be4 Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Wed, 18 Sep 2024 15:14:46 +0200 Subject: [PATCH] exchange.publish --- spec/api/bindings_spec.cr | 4 +- spec/api/definitions_spec.cr | 38 ++++-- spec/exchange_spec.cr | 46 +++++++ spec/message_routing_spec.cr | 126 +++++++++++++++++- spec/queue_spec.cr | 13 +- spec/upstream_spec.cr | 6 +- spec/vhost_spec.cr | 2 +- src/lavinmq/amqp/client.cr | 24 ++-- src/lavinmq/exchange/consistent_hash.cr | 72 +++++----- src/lavinmq/exchange/default.cr | 22 +-- src/lavinmq/exchange/direct.cr | 50 ++++--- src/lavinmq/exchange/exchange.cr | 170 +++++++++++++++--------- src/lavinmq/exchange/fanout.cr | 46 +++---- src/lavinmq/exchange/headers.cr | 79 +++++------ src/lavinmq/exchange/topic.cr | 78 +++++------ src/lavinmq/http/binding_helpers.cr | 16 +-- src/lavinmq/http/controller/bindings.cr | 41 +++--- src/lavinmq/http/controller/queues.cr | 4 +- src/lavinmq/vhost.cr | 128 ++++-------------- src/stdlib/iterator.cr | 13 ++ 20 files changed, 554 insertions(+), 424 deletions(-) create mode 100644 src/stdlib/iterator.cr diff --git a/spec/api/bindings_spec.cr b/spec/api/bindings_spec.cr index c5e9e9624..40dff3ded 100644 --- a/spec/api/bindings_spec.cr +++ b/spec/api/bindings_spec.cr @@ -65,7 +65,7 @@ describe LavinMQ::HTTP::BindingsController do response = http.post("/api/bindings/%2f/e/be1/q/bindings_q1", body: body) response.status_code.should eq 201 response.headers["Location"].should eq "bindings_q1/rk" - s.vhosts["/"].exchanges["be1"].queue_bindings.last_key.first.should eq "rk" + s.vhosts["/"].exchanges["be1"].bindings_details.first.routing_key.should eq "rk" end end @@ -127,7 +127,7 @@ describe LavinMQ::HTTP::BindingsController do props = binding[0]["properties_key"].as_s response = http.delete("/api/bindings/%2f/e/be1/q/bindings_q1/#{props}") response.status_code.should eq 204 - s.vhosts["/"].exchanges["be1"].queue_bindings.empty?.should be_true + s.vhosts["/"].exchanges["be1"].bindings_details.empty?.should be_true end end end diff --git a/spec/api/definitions_spec.cr b/spec/api/definitions_spec.cr index 079e8ed5b..de5a09bfc 100644 --- a/spec/api/definitions_spec.cr +++ b/spec/api/definitions_spec.cr @@ -106,13 +106,20 @@ describe LavinMQ::HTTP::Server do ]}) response = http.post("/api/definitions", body: body) response.status_code.should eq 200 - matches = [] of String ex = s.vhosts["/"].exchanges["import_x1"] - ex.do_exchange_matches("r.k2", nil) { |e| matches << e.name } - matches.includes?("import_x2").should be_true - matches.clear - ex.do_queue_matches("rk", nil) { |e| matches << e.name } - matches.includes?("import_q1").should be_true + qs = Set(LavinMQ::Queue).new + es = Set(LavinMQ::Exchange).new + ex.find_queues("r.k2", nil, qs, es) + res = Set(LavinMQ::Exchange).new + res << s.vhosts["/"].exchanges["import_x1"] + res << s.vhosts["/"].exchanges["import_x2"] + es.should eq res + qs = Set(LavinMQ::Queue).new + es = Set(LavinMQ::Exchange).new + ex.find_queues("rk", nil, qs, es) + res = Set(LavinMQ::Queue).new + res << s.vhosts["/"].queues["import_q1"] + qs.should eq res end end @@ -470,13 +477,20 @@ describe LavinMQ::HTTP::Server do ]}) response = http.post("/api/definitions/%2f", body: body) response.status_code.should eq 200 - matches = [] of String ex = s.vhosts["/"].exchanges["import_x1"] - ex.do_exchange_matches("r.k2", nil) { |e| matches << e.name } - matches.includes?("import_x2").should be_true - matches.clear - ex.do_queue_matches("rk", nil) { |e| matches << e.name } - matches.includes?("import_q1").should be_true + qs = Set(LavinMQ::Queue).new + es = Set(LavinMQ::Exchange).new + ex.find_queues("r.k2", nil, qs, es) + res = Set(LavinMQ::Exchange).new + res << s.vhosts["/"].exchanges["import_x1"] + res << s.vhosts["/"].exchanges["import_x2"] + es.should eq res + qs = Set(LavinMQ::Queue).new + es = Set(LavinMQ::Exchange).new + ex.find_queues("rk", nil, qs, es) + res = Set(LavinMQ::Queue).new + res << s.vhosts["/"].queues["import_q1"] + qs.should eq res end end diff --git a/spec/exchange_spec.cr b/spec/exchange_spec.cr index 7c9abe486..78cce2945 100644 --- a/spec/exchange_spec.cr +++ b/spec/exchange_spec.cr @@ -124,4 +124,50 @@ describe LavinMQ::Exchange do end end end + + describe "in_use?" do + it "should not be in use when just created" do + with_amqp_server do |s| + with_channel(s) do |ch| + ch.exchange("e1", "topic", auto_delete: true) + ch.exchange("e2", "topic", auto_delete: true) + s.vhosts["/"].exchanges["e1"].in_use?.should be_false + s.vhosts["/"].exchanges["e2"].in_use?.should be_false + end + end + end + it "should be in use when it has bindings" do + with_amqp_server do |s| + with_channel(s) do |ch| + x1 = ch.exchange("e1", "topic", auto_delete: true) + x2 = ch.exchange("e2", "topic", auto_delete: true) + x2.bind(x1.name, "#") + s.vhosts["/"].exchanges["e2"].in_use?.should be_true + end + end + end + it "should be in use when other exchange has binding to it" do + with_amqp_server do |s| + with_channel(s) do |ch| + x1 = ch.exchange("e1", "topic", auto_delete: true) + x2 = ch.exchange("e2", "topic", auto_delete: true) + x2.bind(x1.name, "#") + s.vhosts["/"].exchanges["e1"].in_use?.should be_true + end + end + end + + it "should be in use when it has bindings" do + with_amqp_server do |s| + with_channel(s) do |ch| + x1 = ch.exchange("e1", "topic") + x2 = ch.exchange("e2", "topic", auto_delete: true) + x2.bind(x1.name, "#") + s.vhosts["/"].exchanges["e1"].in_use?.should be_true + x2.unbind(x1.name, "#") + s.vhosts["/"].exchanges["e1"].in_use?.should be_false + end + end + end + end end diff --git a/spec/message_routing_spec.cr b/spec/message_routing_spec.cr index 3643f7192..f5ec53d3d 100644 --- a/spec/message_routing_spec.cr +++ b/spec/message_routing_spec.cr @@ -5,8 +5,10 @@ module LavinMQ # Monkey patch for backward compability and easier testing def matches(routing_key, headers = nil) : Set(Queue | Exchange) s = Set(Queue | Exchange).new - queue_matches(routing_key, headers) { |q| s << q } - exchange_matches(routing_key, headers) { |x| s << x } + qs = Set(Queue).new + es = Set(Exchange).new + find_queues(routing_key, headers, qs, es) + qs.each { |q| s << q } s end end @@ -19,7 +21,9 @@ describe LavinMQ::DirectExchange do q1 = LavinMQ::Queue.new(vhost, "q1") x = LavinMQ::DirectExchange.new(vhost, "") x.bind(q1, "q1", LavinMQ::AMQP::Table.new) - x.matches("q1").should eq(Set{q1}) + found_queues = Set(LavinMQ::Queue).new + x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new) + found_queues.should eq(Set{q1}) end end @@ -27,7 +31,10 @@ describe LavinMQ::DirectExchange do with_amqp_server do |s| vhost = s.vhosts.create("x") x = LavinMQ::DirectExchange.new(vhost, "") - x.matches("q1").should be_empty + + found_queues = Set(LavinMQ::Queue).new + x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new) + found_queues.should be_empty end end end @@ -39,7 +46,10 @@ describe LavinMQ::FanoutExchange do q1 = LavinMQ::Queue.new(vhost, "q1") x = LavinMQ::FanoutExchange.new(vhost, "") x.bind(q1, "") - x.matches("any").should eq(Set{q1}) + + found_queues = Set(LavinMQ::Queue).new + x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new) + found_queues.should eq(Set{q1}) end end @@ -47,7 +57,9 @@ describe LavinMQ::FanoutExchange do with_amqp_server do |s| vhost = s.vhosts.create("x") x = LavinMQ::FanoutExchange.new(vhost, "") - x.matches("q1").should be_empty + found_queues = Set(LavinMQ::Queue).new + x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new) + found_queues.should be_empty end end end @@ -307,3 +319,105 @@ describe LavinMQ::HeadersExchange do end end end + +describe LavinMQ::Exchange do + it "should handle CC in header" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + q1 = LavinMQ::Queue.new(vhost, "q1") + q2 = LavinMQ::Queue.new(vhost, "q2") + x = LavinMQ::DirectExchange.new(vhost, "") + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) + x.bind(q2, "q2", LavinMQ::AMQP::Table.new) + found_queues = Set(LavinMQ::Queue).new + headers = LavinMQ::AMQP::Table.new + headers["CC"] = ["q2"] + x.find_queues("q1", headers, found_queues) + found_queues.should eq(Set{q1, q2}) + end + end + it "should raise if CC header isn't array" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + q1 = LavinMQ::Queue.new(vhost, "q1") + q2 = LavinMQ::Queue.new(vhost, "q2") + x = LavinMQ::DirectExchange.new(vhost, "") + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) + x.bind(q2, "q2", LavinMQ::AMQP::Table.new) + found_queues = Set(LavinMQ::Queue).new + headers = LavinMQ::AMQP::Table.new + headers["CC"] = "q2" + expect_raises(LavinMQ::Error::PreconditionFailed) do + x.find_queues("q1", headers, found_queues) + end + end + end + + it "should handle BCC in header" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + q1 = LavinMQ::Queue.new(vhost, "q1") + q2 = LavinMQ::Queue.new(vhost, "q2") + x = LavinMQ::DirectExchange.new(vhost, "") + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) + x.bind(q2, "q2", LavinMQ::AMQP::Table.new) + found_queues = Set(LavinMQ::Queue).new + headers = LavinMQ::AMQP::Table.new + headers["BCC"] = ["q2"] + x.find_queues("q1", headers, found_queues) + found_queues.should eq(Set{q1, q2}) + end + end + + it "should raise if BCC header isn't array" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + q1 = LavinMQ::Queue.new(vhost, "q1") + q2 = LavinMQ::Queue.new(vhost, "q2") + x = LavinMQ::DirectExchange.new(vhost, "") + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) + x.bind(q2, "q2", LavinMQ::AMQP::Table.new) + found_queues = Set(LavinMQ::Queue).new + headers = LavinMQ::AMQP::Table.new + headers["BCC"] = "q2" + expect_raises(LavinMQ::Error::PreconditionFailed) do + x.find_queues("q1", headers, found_queues) + end + end + end + + it "should drop BCC from header" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + q1 = LavinMQ::Queue.new(vhost, "q1") + q2 = LavinMQ::Queue.new(vhost, "q2") + x = LavinMQ::DirectExchange.new(vhost, "") + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) + x.bind(q2, "q2", LavinMQ::AMQP::Table.new) + found_queues = Set(LavinMQ::Queue).new + headers = LavinMQ::AMQP::Table.new + headers["BCC"] = ["q2"] + x.find_queues("q1", headers, found_queues) + headers["BCC"]?.should be_nil + end + end + + it "should read both CC and BCC" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + q1 = LavinMQ::Queue.new(vhost, "q1") + q2 = LavinMQ::Queue.new(vhost, "q2") + q3 = LavinMQ::Queue.new(vhost, "q3") + x = LavinMQ::DirectExchange.new(vhost, "") + x.bind(q1, "q1", LavinMQ::AMQP::Table.new) + x.bind(q2, "q2", LavinMQ::AMQP::Table.new) + x.bind(q3, "q3", LavinMQ::AMQP::Table.new) + found_queues = Set(LavinMQ::Queue).new + headers = LavinMQ::AMQP::Table.new + headers["CC"] = ["q2"] + headers["BCC"] = ["q3"] + x.find_queues("q1", headers, found_queues) + found_queues.should eq(Set{q1, q2, q3}) + end + end +end diff --git a/spec/queue_spec.cr b/spec/queue_spec.cr index cf8f9399d..e6492d02f 100644 --- a/spec/queue_spec.cr +++ b/spec/queue_spec.cr @@ -59,7 +59,7 @@ describe LavinMQ::Queue do x.publish_confirm "test message", q.name q.get(no_ack: true).try(&.body_io.to_s).should eq("test message") - iq = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first + iq = s.vhosts["/"].queues[q.name] iq.pause! x.publish_confirm "test message 2", q.name @@ -93,7 +93,7 @@ describe LavinMQ::Queue do x.publish_confirm "test message", q.name q.get(no_ack: true).try(&.body_io.to_s).should eq("test message") - iq = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first + iq = s.vhosts["/"].queues[q.name] iq.pause! x.publish_confirm "test message 2", q.name @@ -126,7 +126,7 @@ describe LavinMQ::Queue do x.publish_confirm "test message", q.name q.get(no_ack: true).try(&.body_io.to_s).should eq("test message") - iq = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first + iq = s.vhosts["/"].queues[q.name] iq.pause! x.publish_confirm "test message 2", q.name @@ -187,7 +187,7 @@ describe LavinMQ::Queue do x.publish_confirm "test message 3", q.name x.publish_confirm "test message 4", q.name - internal_queue = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first + internal_queue = s.vhosts["/"].queues[q.name] internal_queue.message_count.should eq 4 response = http.delete("/api/queues/%2f/#{q_name}/contents") @@ -209,7 +209,7 @@ describe LavinMQ::Queue do end vhost = s.vhosts["/"] - internal_queue = vhost.exchanges[x_name].queue_bindings[{q.name, nil}].first + internal_queue = vhost.queues[q.name] internal_queue.message_count.should eq 10 response = http.delete("/api/queues/%2f/#{q_name}/contents?count=5") @@ -235,8 +235,7 @@ describe LavinMQ::Queue do x.publish_confirm "test message #{i}", q.name end - internal_queue = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first - + internal_queue = s.vhosts["/"].queues[q.name] internal_queue.message_count.should eq 10 channel = Channel(String).new(1) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 876c3d336..358e990d0 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -432,7 +432,7 @@ describe LavinMQ::Federation::Upstream do wait_for { upstream.links.first?.try &.state.running? } upstream_q = upstream_vhost.queues.values.first - upstream_q.bindings.size.should eq queues.size + upstream_q.bindings.size.should eq queues.size + 1 # +1 for the default exchange # Assert setup is correct 10.times do |i| downstream_q = downstream_ch.queue("") @@ -440,10 +440,10 @@ describe LavinMQ::Federation::Upstream do queues << downstream_q end sleep 0.1.seconds - upstream_q.bindings.size.should eq queues.size + upstream_q.bindings.size.should eq queues.size + 1 queues.each &.delete sleep 10.milliseconds - upstream_q.bindings.size.should eq 0 + upstream_q.bindings.size.should eq 1 end end end diff --git a/spec/vhost_spec.cr b/spec/vhost_spec.cr index 4f8653736..d8bf9f9eb 100644 --- a/spec/vhost_spec.cr +++ b/spec/vhost_spec.cr @@ -71,7 +71,7 @@ describe LavinMQ::VHost do v.declare_queue("q", true, false) s.vhosts["test"].bind_queue("q", "e", "q") s.restart - s.vhosts["test"].exchanges["e"].queue_bindings[{"q", nil}].size.should eq 1 + s.vhosts["test"].exchanges["e"].bindings_details.first.destination.name.should eq "q" end end diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr index e87eb57c5..093361a13 100644 --- a/src/lavinmq/amqp/client.cr +++ b/src/lavinmq/amqp/client.cr @@ -679,7 +679,7 @@ module LavinMQ end return unless valid_q_bind_unbind?(frame) - q = @vhost.queues.fetch(frame.queue_name, nil) + q = @vhost.queues[frame.queue_name]? if q.nil? send_not_found frame, "Queue '#{frame.queue_name}' not found" elsif !@vhost.exchanges.has_key? frame.exchange_name @@ -691,8 +691,12 @@ module LavinMQ elsif queue_exclusive_to_other_client?(q) send_resource_locked(frame, "Exclusive queue") else - @vhost.apply(frame) - send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait + begin + @vhost.apply(frame) + send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait + rescue ex : LavinMQ::Exchange::AccessRefused + send_access_refused(frame, ex.message) + end end end @@ -702,7 +706,7 @@ module LavinMQ end return unless valid_q_bind_unbind?(frame) - q = @vhost.queues.fetch(frame.queue_name, nil) + q = @vhost.queues[frame.queue_name]? if q.nil? # should return not_found according to spec but we make it idempotent send AMQP::Frame::Queue::UnbindOk.new(frame.channel) @@ -716,8 +720,12 @@ module LavinMQ elsif queue_exclusive_to_other_client?(q) send_resource_locked(frame, "Exclusive queue") else - @vhost.apply(frame) - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + begin + @vhost.apply(frame) + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + rescue ex : LavinMQ::Exchange::AccessRefused + send_access_refused(frame, ex.message) + end end end @@ -728,10 +736,6 @@ module LavinMQ elsif !valid_entity_name(frame.exchange_name) send_precondition_failed(frame, "Exchange name isn't valid") return false - elsif frame.exchange_name.empty? || frame.exchange_name == DEFAULT_EX - target = frame.is_a?(AMQP::Frame::Queue::Bind) ? "bind to" : "unbind from" - send_access_refused(frame, "Not allowed to #{target} the default exchange") - return false end true end diff --git a/src/lavinmq/exchange/consistent_hash.cr b/src/lavinmq/exchange/consistent_hash.cr index a42796093..81748ff4c 100644 --- a/src/lavinmq/exchange/consistent_hash.cr +++ b/src/lavinmq/exchange/consistent_hash.cr @@ -4,64 +4,64 @@ require "../consistent_hasher.cr" module LavinMQ class ConsistentHashExchange < Exchange @hasher = ConsistentHasher(Destination).new + @bindings = Hash(Destination, String).new def type : String "x-consistent-hash" end - private def weight(routing_key : String) : UInt32 - routing_key.to_u32? || raise Error::PreconditionFailed.new("Routing key must to be a number") - end - - private def hash_key(routing_key : String, headers : AMQP::Table?) - hash_on = @arguments["x-hash-on"]? - return routing_key unless hash_on.is_a?(String) - return "" if headers.nil? - case value = headers[hash_on.as(String)]? - when String then value.as(String) - when Nil then "" - else raise Error::PreconditionFailed.new("Routing header must be string") + def bindings_details : Iterator(BindingDetails) + @bindings.each.map do |d, w| + binding_key = BindingKey.new(w, @arguments) + BindingDetails.new(name, vhost.name, binding_key, d) end end def bind(destination : Destination, routing_key : String, headers : AMQP::Table?) + return false if @bindings.has_key? destination + @bindings[destination] = routing_key w = weight(routing_key) @hasher.add(destination.name, w, destination) - ret = case destination - when Queue - @queue_bindings[{routing_key, headers}].add? destination - when Exchange - @exchange_bindings[{routing_key, headers}].add? destination - end - after_bind(destination, routing_key, headers) - ret + binding_key = BindingKey.new(routing_key, @arguments) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Bind, data) + true end def unbind(destination : Destination, routing_key : String, headers : AMQP::Table?) + return false unless @bindings.delete destination w = weight(routing_key) - ret = case destination - when Queue - @queue_bindings[{routing_key, headers}].delete destination - when Exchange - @exchange_bindings[{routing_key, headers}].delete destination - end @hasher.remove(destination.name, w) - ret + + binding_key = BindingKey.new(routing_key, @arguments) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Unbind, data) + + delete if @auto_delete && @bindings.empty? + true end - def do_queue_matches(routing_key : String, headers : AMQP::Table?, & : Queue -> _) + protected def bindings(routing_key, headers) : Iterator(Destination) key = hash_key(routing_key, headers) - case dest = @hasher.get(key) - when Queue - yield dest.as(Queue) + if d = @hasher.get(key) + {d}.each + else + Iterator(Destination).empty end end - def do_exchange_matches(routing_key : String, headers : AMQP::Table?, & : Exchange -> _) - key = hash_key(routing_key, headers) - case dest = @hasher.get(key) - when Exchange - yield dest.as(Exchange) + private def weight(routing_key : String) : UInt32 + routing_key.to_u32? || raise Error::PreconditionFailed.new("Routing key must to be a number") + end + + private def hash_key(routing_key : String, headers : AMQP::Table?) + hash_on = @arguments["x-hash-on"]? + return routing_key unless hash_on.is_a?(String) + return "" if headers.nil? + case value = headers[hash_on.as(String)]? + when String then value.as(String) + when Nil then "" + else raise Error::PreconditionFailed.new("Routing header must be string") end end end diff --git a/src/lavinmq/exchange/default.cr b/src/lavinmq/exchange/default.cr index b3f0de4db..a12cc41b5 100644 --- a/src/lavinmq/exchange/default.cr +++ b/src/lavinmq/exchange/default.cr @@ -6,22 +6,24 @@ module LavinMQ "direct" end - def bind(destination, routing_key, headers = nil) - raise "Access refused" - end - - def unbind(destination, routing_key, headers = nil) - raise "Access refused" + def bindings_details : Iterator(BindingDetails) + Iterator(BindingDetails).empty end - def do_queue_matches(routing_key, headers = nil, & : Queue -> _) + protected def bindings(routing_key, headers) : Iterator(Destination) if q = @vhost.queues[routing_key]? - yield q + Tuple(Destination).new(q).each + else + Iterator(Destination).empty end end - def do_exchange_matches(routing_key, headers = nil, & : Exchange -> _) - # noop + def bind(destination, routing_key, headers = nil) + raise LavinMQ::Exchange::AccessRefused.new(self) + end + + def unbind(destination, routing_key, headers = nil) + raise LavinMQ::Exchange::AccessRefused.new(self) end end end diff --git a/src/lavinmq/exchange/direct.cr b/src/lavinmq/exchange/direct.cr index 553e06886..af559859f 100644 --- a/src/lavinmq/exchange/direct.cr +++ b/src/lavinmq/exchange/direct.cr @@ -2,40 +2,46 @@ require "./exchange" module LavinMQ class DirectExchange < Exchange + @bindings = Hash(String, Set(Destination)).new do |h, k| + h[k] = Set(Destination).new + end + def type : String "direct" end - def bind(destination : Queue, routing_key, headers = nil) - ret = @queue_bindings[{routing_key, nil}].add? destination - after_bind(destination, routing_key, headers) - ret + def bindings_details : Iterator(BindingDetails) + @bindings.each.flat_map do |key, ds| + ds.each.map do |d| + binding_key = BindingKey.new(key) + BindingDetails.new(name, vhost.name, binding_key, d) + end + end end - def bind(destination : Exchange, routing_key, headers = nil) - ret = @exchange_bindings[{routing_key, nil}].add? destination - after_bind(destination, routing_key, headers) - ret + def bind(destination : Destination, routing_key, headers = nil) : Bool + return false unless @bindings[routing_key].add? destination + binding_key = BindingKey.new(routing_key) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Bind, data) + true end - def unbind(destination : Queue, routing_key, headers = nil) - ret = @queue_bindings[{routing_key, nil}].delete destination - after_unbind(destination, routing_key, headers) - ret - end + def unbind(destination : Destination, routing_key, headers = nil) : Bool + rk_bindings = @bindings[routing_key] + return false unless rk_bindings.delete destination + @bindings.delete routing_key if rk_bindings.empty? - def unbind(destination : Exchange, routing_key, headers = nil) - ret = @exchange_bindings[{routing_key, nil}].delete destination - after_unbind(destination, routing_key, headers) - ret - end + binding_key = BindingKey.new(routing_key) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Unbind, data) - def do_queue_matches(routing_key, headers = nil, & : Queue -> _) - @queue_bindings[{routing_key, nil}].each { |q| yield q } + delete if @auto_delete && @bindings.each_value.all?(&.empty?) + true end - def do_exchange_matches(routing_key, headers = nil, & : Exchange -> _) - @exchange_bindings[{routing_key, nil}].each { |x| yield x } + protected def bindings(routing_key, headers) : Iterator(Destination) + @bindings[routing_key].each end end end diff --git a/src/lavinmq/exchange/exchange.cr b/src/lavinmq/exchange/exchange.cr index 3a47b6668..e9b2d0120 100644 --- a/src/lavinmq/exchange/exchange.cr +++ b/src/lavinmq/exchange/exchange.cr @@ -7,8 +7,17 @@ require "./event" require "../queue" module LavinMQ - alias BindingKey = Tuple(String, AMQP::Table?) alias Destination = Queue | Exchange + record BindingKey, routing_key : String, arguments : AMQP::Table? = nil do + def properties_key + if arguments.nil? || arguments.try(&.empty?) + routing_key.empty? ? "~" : routing_key + else + hsh = Base64.urlsafe_encode(arguments.to_s) + "#{routing_key}~#{hsh}" + end + end + end abstract class Exchange include PolicyTarget @@ -16,7 +25,7 @@ module LavinMQ include SortableJSON include Observable(ExchangeEvent) - getter name, arguments, queue_bindings, exchange_bindings, vhost, type, alternate_exchange + getter name, arguments, vhost, type, alternate_exchange getter? durable, internal, auto_delete getter policy : Policy? getter operator_policy : OperatorPolicy? @@ -32,12 +41,6 @@ module LavinMQ def initialize(@vhost : VHost, @name : String, @durable = false, @auto_delete = false, @internal = false, @arguments = AMQP::Table.new) - @queue_bindings = Hash(BindingKey, Set(Queue)).new do |h, k| - h[k] = Set(Queue).new - end - @exchange_bindings = Hash(BindingKey, Set(Exchange)).new do |h, k| - h[k] = Set(Exchange).new - end handle_arguments end @@ -105,22 +108,12 @@ module LavinMQ end def in_use? - return true if @queue_bindings.size > 0 - return true if @exchange_bindings.size > 0 - return true if @vhost.exchanges.any? { |_, x| x.exchange_bindings.any? { |_, exs| exs.includes? self } } - false - end - - def bindings_details - Iterator.chain({@queue_bindings.each, @exchange_bindings.each}).flat_map do |(key, destinations)| - destinations.map { |destination| binding_details(key, destination) } + return true unless bindings_details.empty? + @vhost.exchanges.any? do |_, x| + x.bindings_details.each.any? { |bd| bd.destination == self } end end - def binding_details(key, destination) - BindingDetails.new(name, vhost.name, key, destination) - end - MAX_NAME_LENGTH = 256 private def init_delayed_queue @@ -142,22 +135,6 @@ module LavinMQ REPUBLISH_HEADERS = {"x-head", "x-tail", "x-from"} - protected def after_bind(destination : Destination, routing_key : String, headers : AMQP::Table?) - notify_observers(ExchangeEvent::Bind, binding_details({routing_key, headers}, destination)) - true - end - - protected def after_unbind(destination, routing_key, headers) - @queue_bindings.reject! { |_k, v| v.empty? } - @exchange_bindings.reject! { |_k, v| v.empty? } - if @auto_delete && - @queue_bindings.each_value.all? &.empty? && - @exchange_bindings.each_value.all? &.empty? - delete - end - notify_observers(ExchangeEvent::Unbind, binding_details({routing_key, headers}, destination)) - end - protected def delete return if @deleted @deleted = true @@ -167,18 +144,82 @@ module LavinMQ end abstract def type : String - abstract def bind(destination : Queue, routing_key : String, headers : AMQP::Table?) - abstract def unbind(destination : Queue, routing_key : String, headers : AMQP::Table?) - abstract def bind(destination : Exchange, routing_key : String, headers : AMQP::Table?) - abstract def unbind(destination : Exchange, routing_key : String, headers : AMQP::Table?) - abstract def do_queue_matches(routing_key : String, headers : AMQP::Table?, & : Queue -> _) - abstract def do_exchange_matches(routing_key : String, headers : AMQP::Table?, & : Exchange -> _) - - def queue_matches(routing_key : String, headers = nil, &blk : Queue -> _) + abstract def bind(destination : Destination, routing_key : String, headers : AMQP::Table?) + abstract def unbind(destination : Destination, routing_key : String, headers : AMQP::Table?) + abstract def bindings_details : Iterator(BindingDetails) + + def publish(msg : Message, immediate : Bool, + queues : Set(Queue) = Set(Queue).new, + exchanges : Set(Exchange) = Set(Exchange).new) : Int32 + @publish_in_count += 1 + headers = msg.properties.headers if should_delay_message?(headers) - @delayed_queue.try { |q| yield q } - else - do_queue_matches(routing_key, headers, &blk) + if q = @delayed_queue + q.publish(msg) + return 1 + else + return 0 + end + end + find_queues(msg.routing_key, headers, queues, exchanges) + if queues.empty? + @unroutable_count += 1 + return 0 + end + return 0 if immediate && !queues.any? &.immediate_delivery? + + count = 0 + queues.each do |queue| + if queue.publish(msg) + count += 1 + msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind + end + end + @publish_out_count += count + count + end + + def find_queues(routing_key : String, headers : AMQP::Table?, + queues : Set(Queue), + exchanges : Set(Exchange) = Set(Exchange).new) : Nil + return unless exchanges.add? self + bindings(routing_key, headers).each do |d| + case d + when Queue + queues.add(d) unless d.closed? + when Exchange + d.find_queues(routing_key, headers, queues, exchanges) + end + end + + if hdrs = headers + find_cc_queues(hdrs, "CC", queues) + find_cc_queues(hdrs, "BCC", queues) + hdrs.delete "BCC" + end + + if queues.empty? && alternate_exchange + @vhost.exchanges[alternate_exchange]?.try do |ae| + ae.find_queues(routing_key, headers, queues, exchanges) + end + end + end + + private def find_cc_queues(headers, key, queues) + return unless cc = headers[key]? + cc = cc.as?(Array(AMQP::Field)) + + raise Error::PreconditionFailed.new("#{key} header not a string array") unless cc + + hdrs = headers.clone + hdrs.delete "CC" + hdrs.delete key + cc.each do |rk| + if rk = rk.as?(String) + find_queues(rk, hdrs, queues) + else + raise Error::PreconditionFailed.new("#{key} header not a string array") + end end end @@ -204,22 +245,28 @@ module LavinMQ end end end + + class AccessRefused < Error + def initialize(@exchange : Exchange) + super("Access refused to #{@exchange.name}") + end + end end struct BindingDetails include SortableJSON - getter source, vhost, key, destination + getter source, vhost, binding_key, destination def initialize(@source : String, @vhost : String, - @key : BindingKey, @destination : Queue | Exchange) + @binding_key : BindingKey, @destination : Destination) end - def routing_key - @key[0] + def arguments + @binding_key.arguments end - def arguments - @key[1] + def routing_key + @binding_key.routing_key end def details_tuple @@ -228,19 +275,10 @@ module LavinMQ vhost: @vhost, destination: @destination.name, destination_type: @destination.is_a?(Queue) ? "queue" : "exchange", - routing_key: routing_key, - arguments: arguments || NamedTuple.new, - properties_key: BindingDetails.hash_key(@key), + routing_key: @binding_key.routing_key, + arguments: @binding_key.arguments, + properties_key: @binding_key.properties_key, } end - - def self.hash_key(key : BindingKey) - if key[1].nil? || key[1].try &.empty? - key[0].empty? ? "~" : key[0] - else - hsh = Base64.urlsafe_encode(key[1].to_s) - "#{key[0]}~#{hsh}" - end - end end end diff --git a/src/lavinmq/exchange/fanout.cr b/src/lavinmq/exchange/fanout.cr index 9e393bc1f..623b59a2a 100644 --- a/src/lavinmq/exchange/fanout.cr +++ b/src/lavinmq/exchange/fanout.cr @@ -2,40 +2,38 @@ require "./exchange" module LavinMQ class FanoutExchange < Exchange + @bindings = Set(Destination).new + def type : String "fanout" end - def bind(destination : Queue, routing_key, headers = nil) - ret = @queue_bindings[{routing_key, nil}].add? destination - after_bind(destination, routing_key, headers) - ret - end - - def bind(destination : Exchange, routing_key, headers = nil) - ret = @exchange_bindings[{routing_key, nil}].add? destination - after_bind(destination, routing_key, headers) - ret - end - - def unbind(destination : Queue, routing_key, headers = nil) - ret = @queue_bindings[{routing_key, nil}].delete destination - after_unbind(destination, routing_key, headers) - ret + def bindings_details : Iterator(BindingDetails) + @bindings.each.map do |d| + binding_key = BindingKey.new("") + BindingDetails.new(name, vhost.name, binding_key, d) + end end - def unbind(destination : Exchange, routing_key, headers = nil) - ret = @exchange_bindings[{routing_key, nil}].delete destination - after_unbind(destination, routing_key, headers) - ret + def bind(destination : Destination, routing_key, headers = nil) + return false unless @bindings.add? destination + binding_key = BindingKey.new("") + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Bind, data) + true end - def do_queue_matches(routing_key, headers = nil, & : Queue -> _) - @queue_bindings.each_value { |s| s.each { |q| yield q } } + def unbind(destination : Destination, routing_key, headers = nil) + return false unless @bindings.delete destination + binding_key = BindingKey.new("") + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Unbind, data) + delete if @auto_delete && @bindings.empty? + true end - def do_exchange_matches(routing_key, headers = nil, & : Exchange -> _) - @exchange_bindings.each_value { |s| s.each { |q| yield q } } + protected def bindings(routing_key, headers) : Iterator(Destination) + @bindings.each end end end diff --git a/src/lavinmq/exchange/headers.cr b/src/lavinmq/exchange/headers.cr index 11578406b..98165aa93 100644 --- a/src/lavinmq/exchange/headers.cr +++ b/src/lavinmq/exchange/headers.cr @@ -2,8 +2,8 @@ require "./exchange" module LavinMQ class HeadersExchange < Exchange - def type : String - "headers" + @bindings = Hash(AMQP::Table, Set(Destination)).new do |h, k| + h[k] = Set(Destination).new end def initialize(@vhost : VHost, @name : String, @durable = false, @@ -13,44 +13,45 @@ module LavinMQ super end - def bind(destination : Queue, routing_key, headers) - validate!(headers) - args = headers ? @arguments.clone.merge!(headers) : @arguments - ret = @queue_bindings[{routing_key, args}].add? destination - after_bind(destination, routing_key, headers) - ret + def type : String + "headers" end - def bind(destination : Exchange, routing_key, headers) - validate!(headers) - args = headers ? @arguments.clone.merge!(headers) : @arguments - ret = @exchange_bindings[{routing_key, args}].add? destination - after_bind(destination, routing_key, headers) - ret + def bindings_details : Iterator(BindingDetails) + @bindings.each.flat_map do |args, ds| + ds.map do |d| + binding_key = BindingKey.new("", args) + BindingDetails.new(name, vhost.name, binding_key, d) + end + end end - def unbind(destination : Queue, routing_key, headers) + def bind(destination : Destination, routing_key, headers) + validate!(headers) args = headers ? @arguments.clone.merge!(headers) : @arguments - ret = @queue_bindings[{routing_key, args}].delete destination - after_unbind(destination, routing_key, headers) - ret + return false unless @bindings[args].add? destination + binding_key = BindingKey.new(routing_key, args) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Bind, data) + true end - def unbind(destination : Exchange, routing_key, headers) + def unbind(destination : Destination, routing_key, headers) args = headers ? @arguments.clone.merge!(headers) : @arguments - ret = @exchange_bindings[{routing_key, args}].delete destination - after_unbind(destination, routing_key, headers) - ret - end + bds = @bindings[args] + return false unless bds.delete(destination) + @bindings.delete(routing_key) if bds.empty? - def do_queue_matches(routing_key, headers = nil, & : Queue ->) - matches(@queue_bindings, routing_key, headers) do |destination| - yield destination.as(Queue) - end + binding_key = BindingKey.new(routing_key, args) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Unbind, data) + + delete if @auto_delete && @bindings.each_value.all?(&.empty?) + true end - def do_exchange_matches(routing_key, headers = nil, & : Exchange ->) - matches(@exchange_bindings, routing_key, headers) { |e| yield e.as(Exchange) } + protected def bindings(routing_key, headers) : Iterator(Destination) + matches(headers).each end private def validate!(headers) : Nil @@ -63,27 +64,19 @@ module LavinMQ end end - # ameba:disable Metrics/CyclomaticComplexity - private def matches(bindings, routing_key, headers, & : Queue | Exchange ->) - bindings.each do |bt, dst| - args = bt[1] || next + private def matches(headers) : Iterator(Destination) + @bindings.each.select do |args, _| if headers.nil? || headers.empty? - if args.empty? - dst.each { |d| yield d } - end + args.empty? else case args["x-match"]? when "any" - if args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } - dst.each { |d| yield d } - end + args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } else - if args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } - dst.each { |d| yield d } - end + args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } end end - end + end.flat_map { |_, v| v.each } end end end diff --git a/src/lavinmq/exchange/topic.cr b/src/lavinmq/exchange/topic.cr index b342406ea..117a78214 100644 --- a/src/lavinmq/exchange/topic.cr +++ b/src/lavinmq/exchange/topic.cr @@ -2,75 +2,67 @@ require "./exchange" module LavinMQ class TopicExchange < Exchange - def initialize(*args) - super(*args) - @queue_binding_keys = Hash(Array(String), Set(Queue)).new do |h, k| - h[k] = Set(Queue).new - end - @exchange_binding_keys = Hash(Array(String), Set(Exchange)).new do |h, k| - h[k] = Set(Exchange).new - end + @bindings = Hash(Array(String), Set(Destination)).new do |h, k| + h[k] = Set(Destination).new end def type : String "topic" end - def bind(destination : Queue, routing_key, headers = nil) - ret = @queue_bindings[{routing_key, nil}].add? destination - @queue_binding_keys[routing_key.split(".")] << destination - after_bind(destination, routing_key, headers) - ret + def bindings_details : Iterator(BindingDetails) + @bindings.each.flat_map do |rk, ds| + ds.each.map do |d| + binding_key = BindingKey.new(rk.join(".")) + BindingDetails.new(name, vhost.name, binding_key, d) + end + end end - def bind(destination : Exchange, routing_key, headers = nil) - ret = @exchange_bindings[{routing_key, nil}].add? destination - @exchange_binding_keys[routing_key.split(".")] << destination - after_bind(destination, routing_key, headers) - ret + def bind(destination : Destination, routing_key, headers = nil) + return false unless @bindings[routing_key.split(".")].add? destination + binding_key = BindingKey.new(routing_key) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Bind, data) + true end - def unbind(destination : Queue, routing_key, headers = nil) - ret = @queue_bindings[{routing_key, nil}].delete destination - @queue_binding_keys[routing_key.split(".")].delete destination - after_unbind(destination, routing_key, headers) - ret - end + def unbind(destination : Destination, routing_key, headers = nil) + rks = routing_key.split(".") + bds = @bindings[routing_key.split(".")] + return false unless bds.delete destination + @bindings.delete(rks) if bds.empty? - def unbind(destination : Exchange, routing_key, headers = nil) - ret = @exchange_bindings[{routing_key, nil}].delete destination - @exchange_binding_keys[routing_key.split(".")].delete destination - after_unbind(destination, routing_key, headers) - ret - end + binding_key = BindingKey.new(routing_key) + data = BindingDetails.new(name, vhost.name, binding_key, destination) + notify_observers(ExchangeEvent::Unbind, data) - def do_queue_matches(routing_key, headers = nil, & : Queue -> _) - matches(@queue_binding_keys, routing_key, headers) do |destination| - yield destination.as(Queue) - end + delete if @auto_delete && @bindings.each_value.all?(&.empty?) + true end - def do_exchange_matches(routing_key, headers = nil, & : Exchange -> _) - matches(@exchange_binding_keys, routing_key, headers) { |e| yield e.as(Exchange) } + protected def bindings(routing_key, headers) : Iterator(Destination) + select_matches(routing_key).each end # ameba:disable Metrics/CyclomaticComplexity - private def matches(binding_keys, routing_key, headers = nil, & : Queue | Exchange -> _) - return if binding_keys.empty? + private def select_matches(routing_key) : Iterator(Destination) + binding_keys = @bindings + + return Iterator(Destination).empty if binding_keys.empty? # optimize the case where the only binding key is '#' if binding_keys.size == 1 bk, qs = binding_keys.first if bk.size == 1 if bk.first == "#" - qs.each { |d| yield d } - return + return qs.each end end end rk_parts = routing_key.split(".") - binding_keys.each do |bks, dst| + binding_keys.each.select do |bks, _| ok = false prev_hash = false size = bks.size # binding keys can max be 256 chars long anyway @@ -127,8 +119,8 @@ module LavinMQ break unless ok i += 1 end - dst.each { |d| yield d } if ok - end + ok + end.flat_map { |_, v| v.each } end end end diff --git a/src/lavinmq/http/binding_helpers.cr b/src/lavinmq/http/binding_helpers.cr index c9d161e0b..7230e4b09 100644 --- a/src/lavinmq/http/binding_helpers.cr +++ b/src/lavinmq/http/binding_helpers.cr @@ -10,19 +10,9 @@ module LavinMQ end end - private def binding_for_props(context, source, destination : Queue, props) - binding = source.queue_bindings.find do |k, v| - v.includes?(destination) && BindingDetails.hash_key(k) == props - end - unless binding - not_found(context, "Binding '#{props}' on exchange '#{source.name}' -> queue '#{destination.name}' does not exist") - end - binding - end - - private def binding_for_props(context, source, destination : Exchange, props) - binding = source.exchange_bindings.find do |k, v| - v.includes?(destination) && BindingDetails.hash_key(k) == props + private def binding_for_props(context, source, destination : Destination, props) + binding = source.bindings_details.find do |bd| + bd.destination == destination && bd.binding_key.properties_key == props end unless binding not_found(context, "Binding '#{props}' on exchange '#{source.name}' -> exchange '#{destination.name}' does not exist") diff --git a/src/lavinmq/http/controller/bindings.cr b/src/lavinmq/http/controller/bindings.cr index c19bc5d10..b516e519d 100644 --- a/src/lavinmq/http/controller/bindings.cr +++ b/src/lavinmq/http/controller/bindings.cr @@ -32,10 +32,10 @@ module LavinMQ refuse_unless_management(context, user(context), vhost) e = exchange(context, params, vhost) q = queue(context, params, vhost, "queue") - itr = e.queue_bindings.each.select { |(_, v)| v.includes?(q) } - .map { |(k, _)| e.binding_details(k, q) } + itr = e.bindings_details.select { |db| db.destination == q } if e.name.empty? - default_binding = BindingDetails.new("", q.vhost.name, {q.name, nil}, q) + binding_key = BindingKey.new(q.name) + default_binding = BindingDetails.new("", q.vhost.name, binding_key, q) itr = {default_binding}.each.chain(itr) end page(context, itr) @@ -63,7 +63,7 @@ module LavinMQ bad_request(context, "Field 'routing_key' is required") end ok = e.vhost.bind_queue(q.name, e.name, routing_key, arguments) - props = BindingDetails.hash_key({routing_key, arguments}) + props = BindingKey.new(routing_key, arguments).properties_key context.response.headers["Location"] = q.name + "/" + props context.response.status_code = 201 Log.debug do @@ -79,8 +79,7 @@ module LavinMQ e = exchange(context, params, vhost) q = queue(context, params, vhost, "queue") props = URI.decode_www_form(params["props"]) - binding = binding_for_props(context, e, q, props) - e.binding_details(binding[0], q).to_json(context.response) + binding_for_props(context, e, q, props).to_json(context.response) end end @@ -99,12 +98,14 @@ module LavinMQ end props = URI.decode_www_form(params["props"]) found = false - e.queue_bindings.each do |k, destinations| - next unless destinations.includes?(q) && BindingDetails.hash_key(k) == props - arguments = k[1] || AMQP::Table.new - @amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, k[0], arguments) + e.bindings_details.each do |binding| + next unless binding.destination == q && binding.binding_key.properties_key == props + arguments = binding.arguments || AMQP::Table.new + @amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, + binding.routing_key, arguments) found = true - Log.debug { "exchange '#{e.name}' unbound from queue '#{q.name}' with key '#{k}'" } + Log.debug { "exchange '#{e.name}' unbound from queue '#{q.name}' with " \ + " key '#{binding.routing_key}'" } break end context.response.status_code = found ? 204 : 404 @@ -116,8 +117,8 @@ module LavinMQ refuse_unless_management(context, user(context), vhost) source = exchange(context, params, vhost) destination = exchange(context, params, vhost, "destination") - page(context, source.exchange_bindings.each.select { |(_, v)| v.includes?(destination) } - .map { |(k, _)| source.binding_details(k, destination) }) + bindings = source.bindings_details.select { |bd| bd.destination == destination } + page(context, bindings) end end @@ -144,7 +145,7 @@ module LavinMQ bad_request(context, "Field 'routing_key' is required") end source.vhost.bind_exchange(destination.name, source.name, routing_key, arguments) - props = BindingDetails.hash_key({routing_key, arguments}) + props = BindingKey.new(routing_key, arguments).properties_key context.response.headers["Location"] = context.request.path + "/" + props context.response.status_code = 201 end @@ -157,7 +158,7 @@ module LavinMQ destination = exchange(context, params, vhost, "destination") props = URI.decode_www_form(params["props"]) binding = binding_for_props(context, source, destination, props) - source.binding_details(binding[0], destination).to_json(context.response) + binding.to_json(context.response) end end @@ -178,10 +179,12 @@ module LavinMQ end props = URI.decode_www_form(params["props"]) found = false - source.exchange_bindings.each do |k, destinations| - next unless destinations.includes?(destination) && BindingDetails.hash_key(k) == props - arguments = k[1] || AMQP::Table.new - @amqp_server.vhosts[vhost].unbind_exchange(destination.name, source.name, k[0], arguments) + source.bindings_details.each do |binding| + next unless binding.destination == destination && + binding.binding_key.properties_key == props + arguments = binding.arguments || AMQP::Table.new + @amqp_server.vhosts[vhost].unbind_exchange(destination.name, + source.name, binding.routing_key, arguments) found = true break end diff --git a/src/lavinmq/http/controller/queues.cr b/src/lavinmq/http/controller/queues.cr index 05f7f3fac..078305636 100644 --- a/src/lavinmq/http/controller/queues.cr +++ b/src/lavinmq/http/controller/queues.cr @@ -134,9 +134,7 @@ module LavinMQ with_vhost(context, params) do |vhost| refuse_unless_management(context, user(context), vhost) queue = queue(context, params, vhost) - itr = queue.bindings.map { |exchange, args| BindingDetails.new(exchange.name, vhost, args, queue) } - default_binding = BindingDetails.new("", queue.vhost.name, {queue.name, nil}, queue) - page(context, {default_binding}.each.chain(itr)) + page(context, queue.bindings) end end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index ddd1bee17..08e82ecfc 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -121,92 +121,13 @@ module LavinMQ def publish(msg : Message, immediate = false, visited = Set(Exchange).new, found_queues = Set(Queue).new) : Bool ex = @exchanges[msg.exchange_name]? || return false - ex.publish_in_count += 1 - headers = msg.properties.headers - find_all_queues(ex, msg.routing_key, headers, visited, found_queues) - headers.delete("BCC") if headers - if found_queues.empty? - ex.unroutable_count += 1 - return false - end - return false if immediate && !found_queues.any? &.immediate_delivery? - ok = false - found_queues.each do |q| - if q.publish(msg) - ex.publish_out_count += 1 - ok = true - msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind - end - end - ok + published_queue_count = ex.publish(msg, immediate, found_queues, visited) + !published_queue_count.zero? ensure visited.clear found_queues.clear end - private def find_all_queues(ex : Exchange, routing_key : String, - headers : AMQP::Table?, - visited : Set(Exchange), - queues : Set(Queue)) : Nil - ex.queue_matches(routing_key, headers) do |q| - next if q.closed? - queues.add? q - end - - ex.exchange_matches(routing_key, headers) do |e2e| - visited.add(ex) - if visited.add?(e2e) - find_all_queues(e2e, routing_key, headers, visited, queues) - end - end - - find_cc_queues(ex, headers, visited, queues) - - if queues.empty? && ex.alternate_exchange - if ae = @exchanges[ex.alternate_exchange]? - visited.add(ex) - if visited.add?(ae) - find_all_queues(ae, routing_key, headers, visited, queues) - end - end - end - end - - private def find_cc_queues(ex, headers, visited, queues) - if cc = headers.try(&.fetch("CC", nil)) - if cc = cc.as?(Array(AMQP::Field)) - hdrs = headers.not_nil!.clone - hdrs.delete "CC" - cc.each do |cc_rk| - if cc_rk = cc_rk.as?(String) - find_all_queues(ex, cc_rk, hdrs, visited, queues) - else - raise Error::PreconditionFailed.new("CC header not a string array") - end - end - else - raise Error::PreconditionFailed.new("CC header not a string array") - end - end - - if bcc = headers.try(&.fetch("BCC", nil)) - if bcc = bcc.as?(Array(AMQP::Field)) - hdrs = headers.not_nil!.clone - hdrs.delete "CC" - hdrs.delete "BCC" - bcc.each do |bcc_rk| - if bcc_rk = bcc_rk.as?(String) - find_all_queues(ex, bcc_rk, hdrs, visited, queues) - else - raise Error::PreconditionFailed.new("BCC header not a string array") - end - end - else - raise Error::PreconditionFailed.new("BCC header not a string array") - end - end - end - def details_tuple { name: @name, @@ -308,8 +229,9 @@ module LavinMQ when AMQP::Frame::Exchange::Delete if x = @exchanges.delete f.exchange_name @exchanges.each_value do |ex| - ex.exchange_bindings.each do |binding_args, destinations| - ex.unbind(x, *binding_args) if destinations.includes?(x) + ex.bindings_details.each do |binding| + next unless binding.destination == x + ex.unbind(x, binding.routing_key, binding.arguments) end end x.delete @@ -336,8 +258,9 @@ module LavinMQ when AMQP::Frame::Queue::Delete if q = @queues.delete(f.queue_name) @exchanges.each_value do |ex| - ex.queue_bindings.each do |binding_args, destinations| - ex.unbind(q, *binding_args) if destinations.includes?(q) + ex.bindings_details.each do |binding| + next unless binding.destination == q + ex.unbind(q, binding.routing_key, binding.arguments) end end store_definition(f, dirty: true) if !loading && q.durable? && !q.exclusive? @@ -362,15 +285,12 @@ module LavinMQ end end - alias QueueBinding = Tuple(BindingKey, Exchange) - - def queue_bindings(queue : Queue) - iterators = @exchanges.each_value.map do |ex| - ex.queue_bindings.each.select do |(_binding_args, destinations)| - destinations.includes?(queue) - end.map { |(binding_args, _destinations)| {ex, binding_args} } + def queue_bindings(queue : Queue) : Iterator(BindingDetails) + default_binding = BindingDetails.new("", name, BindingKey.new(queue.name), queue) + bindings = @exchanges.each_value.flat_map do |ex| + ex.bindings_details.select { |binding| binding.destination == queue } end - Iterator(QueueBinding).chain(iterators) + {default_binding}.each.chain(bindings) end def add_operator_policy(name : String, pattern : String, apply_to : String, @@ -626,17 +546,17 @@ module LavinMQ io.write_bytes f end @exchanges.each_value.select(&.durable?).each do |e| - e.queue_bindings.each do |(routing_key, arguments), queues| - args = arguments || AMQP::Table.new - queues.each do |q| - f = AMQP::Frame::Queue::Bind.new(0_u16, 0_u16, q.name, e.name, routing_key, false, args) - io.write_bytes f - end - end - e.exchange_bindings.each do |(routing_key, arguments), exchanges| - args = arguments || AMQP::Table.new - exchanges.each do |ex| - f = AMQP::Frame::Exchange::Bind.new(0_u16, 0_u16, ex.name, e.name, routing_key, false, args) + e.bindings_details.each do |binding| + args = binding.arguments || AMQP::Table.new + frame = case binding.destination + when Queue + AMQP::Frame::Queue::Bind.new(0_u16, 0_u16, binding.destination.name, e.name, + binding.routing_key, false, args) + when Exchange + AMQP::Frame::Exchange::Bind.new(0_u16, 0_u16, binding.destination.name, e.name, + binding.routing_key, false, args) + end + if f = frame io.write_bytes f end end diff --git a/src/stdlib/iterator.cr b/src/stdlib/iterator.cr new file mode 100644 index 000000000..624cd46d9 --- /dev/null +++ b/src/stdlib/iterator.cr @@ -0,0 +1,13 @@ +module Iterator(T) + def self.empty + EmptyIterator(T).new + end + + private struct EmptyIterator(T) + include Iterator(T) + + def next + stop + end + end +end