Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): spawn task sooner in listenloop + fast accept #1103

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/examples/cors_server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const CORS_RES_HEADERS = ["Access-Control-Allow-Origin" => "*"]
#=
JSONMiddleware minimizes code by automatically converting the request body
to JSON to pass to the other service functions automatically. JSONMiddleware
recieves the body of the response from the other service funtions and sends
receives the body of the response from the other service funtions and sends
back a success response code
=#
function JSONMiddleware(handler)
Expand All @@ -65,9 +65,9 @@ function JSONMiddleware(handler)
end

#= CorsMiddleware: handles preflight request with the OPTIONS flag
If a request was recieved with the correct headers, then a response will be
If a request was received with the correct headers, then a response will be
sent back with a 200 code, if the correct headers were not specified in the request,
then a CORS error will be recieved on the client side
then a CORS error will be received on the client side

Since each request passes throught the CORS Handler, then if the request is
not a preflight request, it will simply go to the JSONMiddleware to be passed to the
Expand Down
1 change: 1 addition & 0 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ include("clientlayers/ConnectionRequest.jl"); using .ConnectionRequest
include("clientlayers/StreamRequest.jl"); using .StreamRequest

include("download.jl")
include("accept.jl")
include("Servers.jl") ;using .Servers; using .Servers: listen
include("Handlers.jl") ;using .Handlers; using .Handlers: serve
include("parsemultipart.jl") ;using .MultiPartParsing: parse_multipart_form
Expand Down
73 changes: 51 additions & 22 deletions src/Servers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ export listen, listen!, Server, forceclose, port

using Sockets, Logging, LoggingExtras, MbedTLS, Dates
using MbedTLS: SSLContext, SSLConfig
using ConcurrentUtilities: ConcurrentUtilities, Lockable, lock
using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions
import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str
using ..Accept: acceptmany

TRUE(x) = true
getinet(host::String, port::Integer) = Sockets.InetAddr(parse(IPAddr, host), port)
Expand Down Expand Up @@ -83,10 +85,19 @@ accept(s::Listener{SSLConfig}) = getsslcontext(Sockets.accept(s.server), s.ssl)

function getsslcontext(tcp, sslconfig)
try
handshake_done = Ref{Bool}(false)
ssl = MbedTLS.SSLContext()
MbedTLS.setup!(ssl, sslconfig)
MbedTLS.associate!(ssl, tcp)
MbedTLS.handshake!(ssl)
handshake_task = @async begin
MbedTLS.handshake!(ssl)
handshake_done[] = true
end
timedwait(5.0) do
handshake_done[] || istaskdone(handshake_task)
end
!istaskdone(handshake_task) && wait(handshake_task)
handshake_done[] || throw(Base.IOError("SSL handshake timed out", Base.ETIMEDOUT))
return ssl
catch e
@try Base.IOError close(tcp)
Expand Down Expand Up @@ -363,30 +374,48 @@ Accepts new tcp connections and spawns async tasks to handle them."
function listenloop(f, listener, conns, tcpisvalid,
max_connections, readtimeout, access_log, ready_to_accept, verbose)
sem = Base.Semaphore(max_connections)
ssl = Lockable(listener.ssl)
connections = Lockable(conns)
verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())"
notify(ready_to_accept)
while isopen(listener)
try
Base.acquire(sem)
io = accept(listener)
if io === nothing
@warnv 1 "unable to accept new connection"
continue
elseif !tcpisvalid(io)
@warnv 1 "!tcpisvalid: $io"
close(io)
continue
end
conn = Connection(io)
conn.state = IDLE
push!(conns, conn)
conn.host, conn.port = listener.hostname, listener.hostport
@async try
handle_connection(f, conn, listener, readtimeout, access_log)
finally
# handle_connection is in charge of closing the underlying io
delete!(conns, conn)
Base.release(sem)
for io in acceptmany(listener.server)
# I would prefer this inside the async, so we can loop and accept again,
# but https://github.com/JuliaWeb/HTTP.jl/pull/647/files says it's bad for performance
max_connections < typemax(Int) && Base.acquire(sem)
@async begin
local conn = nothing
isssl = !isnothing(listener.ssl)
try
if io === nothing
@warnv 1 "unable to accept new connection"
return
end
if isssl
io = lock(ssl) do ssl
return getsslcontext(io, ssl)
end
end
if !tcpisvalid(io)
close(io)
return
end
conn = Connection(io)
conn.state = IDLE
lock(connections) do conns
push!(conns, conn)
end
conn.host, conn.port = listener.hostname, listener.hostport
handle_connection(f, conn, listener, readtimeout, access_log)
finally
# handle_connection is in charge of closing the underlying io, but it may not get there
!isnothing(conn) && lock(connections) do conns
delete!(conns, conn)
end
max_connections < typemax(Int) && Base.release(sem)
end
end # @async
end
catch e
if e isa Base.IOError && e.code == Base.UV_ECONNABORTED
Expand Down Expand Up @@ -442,7 +471,7 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log)
request.response.status = 200

try
# invokelatest becuase the perf is negligible, but this makes live-editing handlers more Revise friendly
# invokelatest because the perf is negligible, but this makes live-editing handlers more Revise friendly
@debugv 1 "invoking handler"
Base.invokelatest(f, http)
# If `startwrite()` was never called, throw an error so we send a 500 and log this
Expand Down
2 changes: 1 addition & 1 deletion src/WebSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ function Base.close(ws::WebSocket, body::CloseFrameBody=CloseFrameBody(1000, "")
ws.readclosed = true
end
end
# we either recieved the responding CLOSE frame and readclosed was set
# we either received the responding CLOSE frame and readclosed was set
# or there was an error/timeout reading it; in any case, readclosed should be closed now
@assert ws.readclosed
# if we're the server, it's our job to close the underlying socket
Expand Down
47 changes: 47 additions & 0 deletions src/accept.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

module Accept

export acceptmany

using Base: iolock_begin, iolock_end, uv_error, preserve_handle, unpreserve_handle,
StatusClosing, StatusClosed, StatusActive, UV_EAGAIN, UV_ECONNABORTED
using Sockets

function acceptmany(server; MAXSIZE=Sockets.BACKLOG_DEFAULT)
result = Vector{TCPSocket}()
sizehint!(result, MAXSIZE)
iolock_begin()
if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
end
while isopen(server)
client = TCPSocket()
err = Sockets.accept_nonblock(server, client)
while err == 0 && length(result) < MAXSIZE # Don't try to fill more than half the buffer
push!(result, client)
client = TCPSocket()
err = Sockets.accept_nonblock(server, client)
end
if length(result) > 0
iolock_end()
return result
end
if err != UV_EAGAIN
uv_error("accept", err)
end
preserve_handle(server)
lock(server.cond)
iolock_end()
try
wait(server.cond)
finally
unlock(server.cond)
unpreserve_handle(server)
end
iolock_begin()
end
uv_error("accept", UV_ECONNABORTED)
nothing
end

end
Loading