diff --git a/src/Connections.jl b/src/Connections.jl index 414e74393..ea912e026 100644 --- a/src/Connections.jl +++ b/src/Connections.jl @@ -74,7 +74,7 @@ Fields: mutable struct Connection{IO_t <: IO} <: IO host::String port::String - proxy::Union{Nothing,String} + proxy::Union{Nothing, String} idle_timeout::Int require_ssl_verification::Bool keepalive::Bool @@ -102,9 +102,9 @@ same parameters, we can re-use it (as long as it's not already being used, obvio """ connectionkey(x::Connection) = (x.host, x.port, x.proxy, x.require_ssl_verification, x.keepalive, x.clientconnection) -const ConnectionKeyType = Tuple{AbstractString, AbstractString, Union{Nothing,AbstractString}, Bool, Bool, Bool} +const ConnectionKeyType = Tuple{AbstractString, AbstractString, Union{Nothing, String}, Bool, Bool, Bool} -Connection(host::AbstractString, port::AbstractString, proxy::Union{Nothing,AbstractString}, +Connection(host::AbstractString, port::AbstractString, proxy::Union{Nothing, String}, idle_timeout::Int, require_ssl_verification::Bool, keepalive::Bool, io::T, client=true) where {T}= Connection{T}(host, port, proxy, idle_timeout, @@ -437,7 +437,7 @@ or create a new `Connection` if required. """ function newconnection(wrapconnection::Function, url::URI; - proxy::Union{Nothing, AbstractString}=nothing, + proxy::Union{Nothing, String}=nothing, socket_type::Type, socket_type_tls::Type, pool::Union{Nothing, Pool}=nothing, diff --git a/test/client.jl b/test/client.jl index 5c7a2490d..398b14c04 100644 --- a/test/client.jl +++ b/test/client.jl @@ -562,24 +562,24 @@ end @testset "HTTP CONNECT Proxy pool" begin function forwardclosetask(src, dst) - errormonitor(@async begin + @async begin forwardstream(src, dst) close(src) close(dst) - end) + end end # Stores the http message passed by the client - messages = [] - upstreams = Set() + downstreamcount = 0 + upstreamsockets = Base.IdSet{TCPSocket}() # Simple implementation of a https proxy server proxy = HTTP.listen!(IPv4(0), 8082; stream = true) do http::HTTP.Stream - push!(messages, http.message) + downstreamcount += 1 hostport = split(http.message.target, ":") targetstream = connect(hostport[1], parse(Int, get(hostport, 2, "443"))) - push!(upstreams, targetstream) + push!(upstreamsockets, targetstream) try HTTP.setstatus(http, 200) HTTP.startwrite(http) @@ -589,25 +589,23 @@ end wait(up) wait(down) finally - delete!(upstreams, targetstream) + delete!(upstreamsockets, targetstream) end end try # Make the HTTP request r1 = HTTP.get("https://$httpbin/ip"; proxy="http://localhost:8082", retry=false, status_exception=true) - @test length(messages) == 1 - @test first(messages).method == "CONNECT" - @test length(upstreams) == 1 && isopen(first(upstreams)) # still alive + @test downstreamcount == 1 + @test length(upstreamsockets) == 1 && isopen(first(upstreamsockets)) # still alive # Make another request # This should reuse the connection pool and not make another request to the proxy - empty!(messages) r2 = HTTP.get("https://$httpbin/ip"; proxy="http://localhost:8082", retry=false, status_exception=true) - @test isempty(messages) # no new message to the proxy - @test length(upstreams) == 1 && isopen(first(upstreams)) # still only one stream alive + @test downstreamcount == 1 # no new message to the proxy + @test length(upstreamsockets) == 1 && isopen(first(upstreamsockets)) # still only one stream alive finally - close.(upstreams) + close.(upstreamsockets) close(proxy) HTTP.Connections.closeall() wait(proxy) @@ -616,20 +614,23 @@ end @testset "HTTP Proxy pool" begin # Stores the http request passed by the client - downstreamconnections = Set{HTTP.Connections.Connection}() - upstreamconnections = Set{HTTP.Connections.Connection}() - finished_request = Base.Event(true) + downstreamconnections = Base.IdSet{HTTP.Connections.Connection}() + upstreamconnections = Base.IdSet{HTTP.Connections.Connection}() + downstreamcount = 0 + upstreamcount = 0 # Simple implementation of a http proxy server proxy = HTTP.listen!(IPv4(0), 8082; stream = true) do http::HTTP.Stream push!(downstreamconnections, http.stream) + downstreamcount += 1 HTTP.open(http.message.method, http.message.target, http.message.headers; decompress = false, version = http.message.version, retry=false, redirect = false) do targetstream push!(upstreamconnections, targetstream.stream) + upstreamcount += 1 - up = errormonitor(@async forwardstream(http, targetstream)) + up = @async forwardstream(http, targetstream) targetresponse = startread(targetstream) HTTP.setstatus(http, targetresponse.status) @@ -638,30 +639,30 @@ end end HTTP.startwrite(http) - down = errormonitor(@async forwardstream(targetstream, http)) + down = @async forwardstream(targetstream, http) wait(up) wait(down) - - notify(finished_request) end end try # Make the HTTP request r1 = HTTP.get("http://$httpbin/ip"; proxy="http://localhost:8082", retry=false, redirect = false, status_exception=true) - wait(finished_request) @test length(downstreamconnections) == 1 @test length(upstreamconnections) == 1 + @test downstreamcount == 1 + @test upstreamcount == 1 # Make another request # This should reuse a connection pool in both the client and proxy r2 = HTTP.get("http://$httpbin/ip"; proxy="http://localhost:8082", retry=false, redirect = false, status_exception=true) - # Check that notify was actually called, but that the set of connections remains of size 1 - wait(finished_request) + # Check that the set of connections remains of size 1 when handling additional requests downstream and upstream @test length(downstreamconnections) == 1 @test length(upstreamconnections) == 1 + @test downstreamcount == 2 + @test upstreamcount == 2 finally close(proxy) HTTP.Connections.closeall()