Skip to content

Commit

Permalink
Tune websocket performance (#1177)
Browse files Browse the repository at this point in the history
  • Loading branch information
lassepe authored May 10, 2024
1 parent 74913cb commit 0a0acc8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
15 changes: 9 additions & 6 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ mutable struct Connection{IO_t <: IO} <: IO
state::Any # populated & used by Servers code
end

has_tcpsocket(c::Connection) = applicable(tcpsocket, c.io)
IOExtras.tcpsocket(c::Connection) = tcpsocket(c.io)

"""
connectionkey
Expand Down Expand Up @@ -142,7 +145,7 @@ function Base.flush(c::Connection)
# Flushing the TCP buffer requires support for `Sockets.nagle()`
# which was only added in Julia v1.3
@static if VERSION >= v"1.3"
sock = tcpsocket(c.io)
sock = tcpsocket(c)
# I don't understand why uninitializd sockets can get here, but they can
if sock.status (Base.StatusInit, Base.StatusUninit) && isopen(sock)
Sockets.nagle(sock, false)
Expand Down Expand Up @@ -312,7 +315,7 @@ function IOExtras.closeread(c::Connection)
return
end

Base.wait_close(c::Connection) = Base.wait_close(tcpsocket(c.io))
Base.wait_close(c::Connection) = Base.wait_close(tcpsocket(c))

function Base.close(c::Connection)
if iswritable(c)
Expand Down Expand Up @@ -638,7 +641,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
end

function Base.show(io::IO, c::Connection)
nwaiting = applicable(tcpsocket, c.io) ? bytesavailable(tcpsocket(c.io)) : 0
nwaiting = has_tcpsocket(c) ? bytesavailable(tcpsocket(c)) : 0
print(
io,
tcpstatus(c), " ",
Expand All @@ -648,14 +651,14 @@ function Base.show(io::IO, c::Connection)
bytesavailable(c.buffer) > 0 ?
" $(bytesavailable(c.buffer))-byte excess" : "",
nwaiting > 0 ? " $nwaiting bytes waiting" : "",
applicable(tcpsocket, c.io) ? " $(Base._fd(tcpsocket(c.io)))" : "")
has_tcpsocket(c) ? " $(Base._fd(tcpsocket(c)))" : "")
end

function tcpstatus(c::Connection)
if !applicable(tcpsocket, c.io)
if !has_tcpsocket(c)
return ""
end
s = Base.uv_status_string(tcpsocket(c.io))
s = Base.uv_status_string(tcpsocket(c))
if s == "connecting" return "🔜🔗"
elseif s == "open" return "🔗 "
elseif s == "active" return "🔁 "
Expand Down
15 changes: 14 additions & 1 deletion src/WebSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ end

const DEFAULT_MAX_FRAG = 1024

IOExtras.tcpsocket(ws::WebSocket) = tcpsocket(ws.io)

WebSocket(io::Connection, req=Request(), resp=Response(); client::Bool=true, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG) =
WebSocket(uuid4(), io, req, resp, maxframesize, maxfragmentation, client, UInt8[], UInt8[], false, false)

Expand Down Expand Up @@ -420,7 +422,7 @@ function listen end
listen(f, args...; kw...) = Servers.listen(http -> upgrade(f, http; kw...), args...; kw...)
listen!(f, args...; kw...) = Servers.listen!(http -> upgrade(f, http; kw...), args...; kw...)

function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=false, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG, kw...)
function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=false, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG, nagle=false, quickack=true, kw...)
@debugv 2 "Server websocket upgrade requested"
isupgrade(http.message) || handshakeerror()
if !hasheader(http, "Sec-WebSocket-Version", "13")
Expand All @@ -437,6 +439,17 @@ function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=f
startwrite(http)
io = http.stream
req = http.message

# tune websocket tcp connection for performance : https://github.com/JuliaWeb/HTTP.jl/issues/1140
@static if VERSION >= v"1.3"
sock = tcpsocket(io)
# I don't understand why uninitializd sockets can get here, but they can
if sock.status (Base.StatusInit, Base.StatusUninit) && isopen(sock)
Sockets.nagle(sock, nagle)
Sockets.quickack(sock, quickack)
end
end

ws = WebSocket(io, req, req.response; client=false, maxframesize, maxfragmentation)
@debugv 2 "$(ws.id): WebSocket upgraded; connection established"
try
Expand Down

0 comments on commit 0a0acc8

Please sign in to comment.