Skip to content

Commit

Permalink
Heartbeat support
Browse files Browse the repository at this point in the history
Server should send heartbeat frames to client depending on the
negotiated heartbeat interval.

Probably fixes #140
  • Loading branch information
carlhoerberg committed Mar 9, 2024
1 parent 86c86d1 commit 39772c8
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module AMQProxy
@frame_max : UInt32
@channel_max : UInt16
@heartbeat : UInt16
@last_heartbeat = Time.monotonic

def initialize(@socket : TCPSocket)
set_socket_options(@socket)
Expand All @@ -27,9 +28,11 @@ module AMQProxy
def read_loop(channel_pool, socket = @socket) # ameba:disable Metrics/CyclomaticComplexity
Log.context.set(remote_address: socket.remote_address.to_s)
Log.debug { "Connected" }
socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0
loop do
case frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
when AMQ::Protocol::Frame::Heartbeat then write frame
when AMQ::Protocol::Frame::Heartbeat
@last_heartbeat = Time.monotonic
when AMQ::Protocol::Frame::Connection::CloseOk then return
when AMQ::Protocol::Frame::Connection::Close
close_all_upstream_channels
Expand Down Expand Up @@ -61,6 +64,15 @@ module AMQProxy
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
end
rescue IO::TimeoutError
time_since_last_heartbeat = (Time.monotonic - @last_heartbeat).total_seconds.to_i # ignore subsecond latency
if time_since_last_heartbeat <= 1 + @heartbeat # add 1s grace because of rounding
Log.debug { "Sending heartbeat (last heartbeat #{time_since_last_heartbeat}s ago)" }
write AMQ::Protocol::Frame::Heartbeat.new
else
Log.warn { "No heartbeat response in #{time_since_last_heartbeat}s (max #{1 + @heartbeat}s), closing connection" }
return
end
end
rescue ex : IO::EOFError
Log.debug { "Disconnected" }
Expand Down

0 comments on commit 39772c8

Please sign in to comment.