Skip to content

Commit

Permalink
improved chaos
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Dec 21, 2022
1 parent 34b567a commit d3c2e7a
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions src/lavinmqperf.cr
Original file line number Diff line number Diff line change
Expand Up @@ -528,17 +528,21 @@ class Chaos < Perf
# random_ack_rejects
end

@i = Atomic(Int64).new(0u64)

private def churn_connections_and_pubs
done = Channel(Nil).new
loop do
100.times do
spawn do
body = Bytes.new(8)
AMQP::Client.start(@uri) do |conn|
ch = conn.channel
ch.confirm_select if rand(3).zero?
rand(10_000).times do
IO::ByteFormat::SystemEndian.encode(Time.monotonic.total_nanoseconds.to_i64, body)
ch.basic_publish(body, exchange, routing_key)
sleep rand(0.1)
body = uninitialized UInt8[8]
IO::ByteFormat::SystemEndian.encode(@i.add(1), body.to_slice)
ch.basic_publish(body.to_slice, rand_exchange, rand_routing_key)
end
done.send nil
end
Expand All @@ -550,15 +554,15 @@ class Chaos < Perf
end
end

private def routing_key : String
private def rand_routing_key : String
Random::DEFAULT.hex(1)
end

private def queue : String
private def rand_queue : String
Random::DEFAULT.hex(1)
end

private def exchange : String
private def rand_exchange : String
case rand(5)
when 1 then "amq.direct"
when 2 then "amq.topic"
Expand All @@ -575,19 +579,38 @@ class Chaos < Perf
spawn do
AMQP::Client.start(@uri) do |conn|
ch = conn.channel
prefetch = rand(100)
q = begin
ch.queue(rand_queue, args: AMQP::Client::Arguments.new({"x-expires": rand(1000),
"x-message-ttl": rand(1000),
"x-max-length": rand(1000),
"x-dead-letter-exchange": rand_exchange,
"x-max-priority": rand(255)}))
rescue
ch = conn.channel
ch.queue(rand_queue, passive: true)
end
x = rand_exchange
binding_key = rand_routing_key
q.bind(x, binding_key) unless x.empty?
prefetch = rand(100) + 1
ch.prefetch(prefetch)
q = ch.queue(queue)
q.bind(exchange, routing_key)
q.subscribe(no_ack: false, work_pool: prefetch) do |m|
sleep rand(1.0)
last = 0i64
q.subscribe(no_ack: false, work_pool: rand(prefetch) + 1) do |m|
ns = IO::ByteFormat::SystemEndian.decode(Int64, m.body_io)
# puts "messages out of order #{ns} < #{last}" if ns < last
last = ns
sleep rand(0.1)
case rand(3)
when 0 then m.ack
when 1 then m.reject(requeue: true)
when 2 then m.reject(requeue: false)
else m.ack
end
end
sleep rand(30.0)
case rand(3)
when 0 then q.delete
when 1 then q.unbind(x, binding_key) unless x.empty?
end
end
ensure
done.send nil
Expand Down

0 comments on commit d3c2e7a

Please sign in to comment.