Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish to exchange #786

Merged
merged 5 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions spec/api/bindings_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe LavinMQ::HTTP::BindingsController do
response = http.post("/api/bindings/%2f/e/be1/q/bindings_q1", body: body)
response.status_code.should eq 201
response.headers["Location"].should eq "bindings_q1/rk"
s.vhosts["/"].exchanges["be1"].queue_bindings.last_key.first.should eq "rk"
s.vhosts["/"].exchanges["be1"].bindings_details.first.routing_key.should eq "rk"
end
end

Expand Down Expand Up @@ -127,7 +127,7 @@ describe LavinMQ::HTTP::BindingsController do
props = binding[0]["properties_key"].as_s
response = http.delete("/api/bindings/%2f/e/be1/q/bindings_q1/#{props}")
response.status_code.should eq 204
s.vhosts["/"].exchanges["be1"].queue_bindings.empty?.should be_true
s.vhosts["/"].exchanges["be1"].bindings_details.empty?.should be_true
end
end
end
Expand Down
38 changes: 26 additions & 12 deletions spec/api/definitions_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,20 @@ describe LavinMQ::HTTP::Server do
]})
response = http.post("/api/definitions", body: body)
response.status_code.should eq 200
matches = [] of String
ex = s.vhosts["/"].exchanges["import_x1"]
ex.do_exchange_matches("r.k2", nil) { |e| matches << e.name }
matches.includes?("import_x2").should be_true
matches.clear
ex.do_queue_matches("rk", nil) { |e| matches << e.name }
matches.includes?("import_q1").should be_true
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("r.k2", nil, qs, es)
res = Set(LavinMQ::Exchange).new
res << s.vhosts["/"].exchanges["import_x1"]
res << s.vhosts["/"].exchanges["import_x2"]
es.should eq res
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("rk", nil, qs, es)
res = Set(LavinMQ::Queue).new
res << s.vhosts["/"].queues["import_q1"]
qs.should eq res
end
end

Expand Down Expand Up @@ -470,13 +477,20 @@ describe LavinMQ::HTTP::Server do
]})
response = http.post("/api/definitions/%2f", body: body)
response.status_code.should eq 200
matches = [] of String
ex = s.vhosts["/"].exchanges["import_x1"]
ex.do_exchange_matches("r.k2", nil) { |e| matches << e.name }
matches.includes?("import_x2").should be_true
matches.clear
ex.do_queue_matches("rk", nil) { |e| matches << e.name }
matches.includes?("import_q1").should be_true
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("r.k2", nil, qs, es)
res = Set(LavinMQ::Exchange).new
res << s.vhosts["/"].exchanges["import_x1"]
res << s.vhosts["/"].exchanges["import_x2"]
es.should eq res
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("rk", nil, qs, es)
res = Set(LavinMQ::Queue).new
res << s.vhosts["/"].queues["import_q1"]
qs.should eq res
end
end

Expand Down
46 changes: 46 additions & 0 deletions spec/exchange_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,50 @@ describe LavinMQ::Exchange do
end
end
end

describe "in_use?" do
it "should not be in use when just created" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.exchange("e1", "topic", auto_delete: true)
ch.exchange("e2", "topic", auto_delete: true)
s.vhosts["/"].exchanges["e1"].in_use?.should be_false
s.vhosts["/"].exchanges["e2"].in_use?.should be_false
end
end
end
it "should be in use when it has bindings" do
with_amqp_server do |s|
with_channel(s) do |ch|
x1 = ch.exchange("e1", "topic", auto_delete: true)
x2 = ch.exchange("e2", "topic", auto_delete: true)
x2.bind(x1.name, "#")
s.vhosts["/"].exchanges["e2"].in_use?.should be_true
end
end
end
it "should be in use when other exchange has binding to it" do
with_amqp_server do |s|
with_channel(s) do |ch|
x1 = ch.exchange("e1", "topic", auto_delete: true)
x2 = ch.exchange("e2", "topic", auto_delete: true)
x2.bind(x1.name, "#")
s.vhosts["/"].exchanges["e1"].in_use?.should be_true
end
end
end

it "should be in use when it has bindings" do
with_amqp_server do |s|
with_channel(s) do |ch|
x1 = ch.exchange("e1", "topic")
x2 = ch.exchange("e2", "topic", auto_delete: true)
x2.bind(x1.name, "#")
s.vhosts["/"].exchanges["e1"].in_use?.should be_true
x2.unbind(x1.name, "#")
s.vhosts["/"].exchanges["e1"].in_use?.should be_false
end
end
end
end
end
126 changes: 120 additions & 6 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ module LavinMQ
# Monkey patch for backward compability and easier testing
def matches(routing_key, headers = nil) : Set(Queue | Exchange)
s = Set(Queue | Exchange).new
queue_matches(routing_key, headers) { |q| s << q }
exchange_matches(routing_key, headers) { |x| s << x }
qs = Set(Queue).new
es = Set(Exchange).new
find_queues(routing_key, headers, qs, es)
qs.each { |q| s << q }
s
end
end
Expand All @@ -19,15 +21,20 @@ describe LavinMQ::DirectExchange do
q1 = LavinMQ::Queue.new(vhost, "q1")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.matches("q1").should eq(Set{q1})
found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should eq(Set{q1})
end
end

it "matches no rk" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
x = LavinMQ::DirectExchange.new(vhost, "")
x.matches("q1").should be_empty

found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should be_empty
end
end
end
Expand All @@ -39,15 +46,20 @@ describe LavinMQ::FanoutExchange do
q1 = LavinMQ::Queue.new(vhost, "q1")
x = LavinMQ::FanoutExchange.new(vhost, "")
x.bind(q1, "")
x.matches("any").should eq(Set{q1})

found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should eq(Set{q1})
end
end

it "matches no rk" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
x = LavinMQ::FanoutExchange.new(vhost, "")
x.matches("q1").should be_empty
found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should be_empty
end
end
end
Expand Down Expand Up @@ -307,3 +319,105 @@ describe LavinMQ::HeadersExchange do
end
end
end

describe LavinMQ::Exchange do
it "should handle CC in header" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["CC"] = ["q2"]
x.find_queues("q1", headers, found_queues)
found_queues.should eq(Set{q1, q2})
end
end
it "should raise if CC header isn't array" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["CC"] = "q2"
expect_raises(LavinMQ::Error::PreconditionFailed) do
x.find_queues("q1", headers, found_queues)
end
end
end

it "should handle BCC in header" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["BCC"] = ["q2"]
x.find_queues("q1", headers, found_queues)
found_queues.should eq(Set{q1, q2})
end
end

it "should raise if BCC header isn't array" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["BCC"] = "q2"
expect_raises(LavinMQ::Error::PreconditionFailed) do
x.find_queues("q1", headers, found_queues)
end
end
end

it "should drop BCC from header" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["BCC"] = ["q2"]
x.find_queues("q1", headers, found_queues)
headers["BCC"]?.should be_nil
end
end

it "should read both CC and BCC" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
q3 = LavinMQ::Queue.new(vhost, "q3")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
x.bind(q3, "q3", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["CC"] = ["q2"]
headers["BCC"] = ["q3"]
x.find_queues("q1", headers, found_queues)
found_queues.should eq(Set{q1, q2, q3})
end
end
end
13 changes: 6 additions & 7 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message", q.name
q.get(no_ack: true).try(&.body_io.to_s).should eq("test message")

iq = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first
iq = s.vhosts["/"].queues[q.name]
iq.pause!

x.publish_confirm "test message 2", q.name
Expand Down Expand Up @@ -93,7 +93,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message", q.name
q.get(no_ack: true).try(&.body_io.to_s).should eq("test message")

iq = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first
iq = s.vhosts["/"].queues[q.name]
iq.pause!

x.publish_confirm "test message 2", q.name
Expand Down Expand Up @@ -126,7 +126,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message", q.name
q.get(no_ack: true).try(&.body_io.to_s).should eq("test message")

iq = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first
iq = s.vhosts["/"].queues[q.name]
iq.pause!

x.publish_confirm "test message 2", q.name
Expand Down Expand Up @@ -187,7 +187,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message 3", q.name
x.publish_confirm "test message 4", q.name

internal_queue = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first
internal_queue = s.vhosts["/"].queues[q.name]
internal_queue.message_count.should eq 4

response = http.delete("/api/queues/%2f/#{q_name}/contents")
Expand All @@ -209,7 +209,7 @@ describe LavinMQ::Queue do
end

vhost = s.vhosts["/"]
internal_queue = vhost.exchanges[x_name].queue_bindings[{q.name, nil}].first
internal_queue = vhost.queues[q.name]
internal_queue.message_count.should eq 10

response = http.delete("/api/queues/%2f/#{q_name}/contents?count=5")
Expand All @@ -235,8 +235,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message #{i}", q.name
end

internal_queue = s.vhosts["/"].exchanges[x_name].queue_bindings[{q.name, nil}].first

internal_queue = s.vhosts["/"].queues[q.name]
internal_queue.message_count.should eq 10

channel = Channel(String).new(1)
Expand Down
6 changes: 3 additions & 3 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,18 @@ describe LavinMQ::Federation::Upstream do
wait_for { upstream.links.first?.try &.state.running? }

upstream_q = upstream_vhost.queues.values.first
upstream_q.bindings.size.should eq queues.size
upstream_q.bindings.size.should eq queues.size + 1 # +1 for the default exchange
# Assert setup is correct
10.times do |i|
downstream_q = downstream_ch.queue("")
downstream_q.bind("downstream_ex", "after.link.#{i}")
queues << downstream_q
end
sleep 0.1.seconds
upstream_q.bindings.size.should eq queues.size
upstream_q.bindings.size.should eq queues.size + 1
queues.each &.delete
sleep 10.milliseconds
upstream_q.bindings.size.should eq 0
upstream_q.bindings.size.should eq 1
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/vhost_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe LavinMQ::VHost do
v.declare_queue("q", true, false)
s.vhosts["test"].bind_queue("q", "e", "q")
s.restart
s.vhosts["test"].exchanges["e"].queue_bindings[{"q", nil}].size.should eq 1
s.vhosts["test"].exchanges["e"].bindings_details.first.destination.name.should eq "q"
end
end

Expand Down
Loading
Loading