From 0a0acc843b60d20386b7039010aa6cf96de07d7c Mon Sep 17 00:00:00 2001 From: Lasse Peters Date: Fri, 10 May 2024 18:07:10 +0200 Subject: [PATCH] Tune websocket performance (#1177) --- src/Connections.jl | 15 +++++++++------ src/WebSockets.jl | 15 ++++++++++++++- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/Connections.jl b/src/Connections.jl index 9537059b..332ac04e 100644 --- a/src/Connections.jl +++ b/src/Connections.jl @@ -96,6 +96,9 @@ mutable struct Connection{IO_t <: IO} <: IO state::Any # populated & used by Servers code end +has_tcpsocket(c::Connection) = applicable(tcpsocket, c.io) +IOExtras.tcpsocket(c::Connection) = tcpsocket(c.io) + """ connectionkey @@ -142,7 +145,7 @@ function Base.flush(c::Connection) # Flushing the TCP buffer requires support for `Sockets.nagle()` # which was only added in Julia v1.3 @static if VERSION >= v"1.3" - sock = tcpsocket(c.io) + sock = tcpsocket(c) # I don't understand why uninitializd sockets can get here, but they can if sock.status ∉ (Base.StatusInit, Base.StatusUninit) && isopen(sock) Sockets.nagle(sock, false) @@ -312,7 +315,7 @@ function IOExtras.closeread(c::Connection) return end -Base.wait_close(c::Connection) = Base.wait_close(tcpsocket(c.io)) +Base.wait_close(c::Connection) = Base.wait_close(tcpsocket(c)) function Base.close(c::Connection) if iswritable(c) @@ -638,7 +641,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T}, end function Base.show(io::IO, c::Connection) - nwaiting = applicable(tcpsocket, c.io) ? bytesavailable(tcpsocket(c.io)) : 0 + nwaiting = has_tcpsocket(c) ? bytesavailable(tcpsocket(c)) : 0 print( io, tcpstatus(c), " ", @@ -648,14 +651,14 @@ function Base.show(io::IO, c::Connection) bytesavailable(c.buffer) > 0 ? " $(bytesavailable(c.buffer))-byte excess" : "", nwaiting > 0 ? " $nwaiting bytes waiting" : "", - applicable(tcpsocket, c.io) ? " $(Base._fd(tcpsocket(c.io)))" : "") + has_tcpsocket(c) ? " $(Base._fd(tcpsocket(c)))" : "") end function tcpstatus(c::Connection) - if !applicable(tcpsocket, c.io) + if !has_tcpsocket(c) return "" end - s = Base.uv_status_string(tcpsocket(c.io)) + s = Base.uv_status_string(tcpsocket(c)) if s == "connecting" return "🔜🔗" elseif s == "open" return "🔗 " elseif s == "active" return "🔁 " diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 83a4bcc8..fd39bd72 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -298,6 +298,8 @@ end const DEFAULT_MAX_FRAG = 1024 +IOExtras.tcpsocket(ws::WebSocket) = tcpsocket(ws.io) + WebSocket(io::Connection, req=Request(), resp=Response(); client::Bool=true, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG) = WebSocket(uuid4(), io, req, resp, maxframesize, maxfragmentation, client, UInt8[], UInt8[], false, false) @@ -420,7 +422,7 @@ function listen end listen(f, args...; kw...) = Servers.listen(http -> upgrade(f, http; kw...), args...; kw...) listen!(f, args...; kw...) = Servers.listen!(http -> upgrade(f, http; kw...), args...; kw...) -function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=false, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG, kw...) +function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=false, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG, nagle=false, quickack=true, kw...) @debugv 2 "Server websocket upgrade requested" isupgrade(http.message) || handshakeerror() if !hasheader(http, "Sec-WebSocket-Version", "13") @@ -437,6 +439,17 @@ function upgrade(f::Function, http::Streams.Stream; suppress_close_error::Bool=f startwrite(http) io = http.stream req = http.message + + # tune websocket tcp connection for performance : https://github.com/JuliaWeb/HTTP.jl/issues/1140 + @static if VERSION >= v"1.3" + sock = tcpsocket(io) + # I don't understand why uninitializd sockets can get here, but they can + if sock.status ∉ (Base.StatusInit, Base.StatusUninit) && isopen(sock) + Sockets.nagle(sock, nagle) + Sockets.quickack(sock, quickack) + end + end + ws = WebSocket(io, req, req.response; client=false, maxframesize, maxfragmentation) @debugv 2 "$(ws.id): WebSocket upgraded; connection established" try