diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 49ffb756..a22c5b43 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -13,7 +13,7 @@ env: PGPASSWORD: root jobs: test: - name: Julia ${{ matrix.version }} - PostgreSQL ${{ matrix.postgresql-version }} - ${{ matrix.os }} - ${{ matrix.arch }} + name: Julia ${{ matrix.version }} - PostgreSQL ${{ matrix.postgresql-version }} - ${{ matrix.os }} - ${{ matrix.arch }} - nonblocking connection(${{ matrix.nonblocking }}) runs-on: ${{ matrix.os }} strategy: fail-fast: false @@ -29,6 +29,8 @@ jobs: arch: - x64 - x86 + nonblocking: + - "false" exclude: # Don't test 32-bit on macOS - os: macOS-latest @@ -39,23 +41,48 @@ jobs: version: "1.6" arch: x64 postgresql-version: latest + nonblocking: "false" # Add older supported PostgreSQL Versions - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '13' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '12' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '11' + nonblocking: "false" - os: ubuntu-latest version: 1 arch: x64 postgresql-version: '10' + nonblocking: "false" + - os: ubuntu-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: "true" + - os: ubuntu-latest + version: "1.6" + arch: x64 + postgresql-version: latest + nonblocking: "true" + - os: windows-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: "true" + - os: macOS-latest + version: 1 + arch: x64 + postgresql-version: latest + nonblocking: "true" steps: - uses: actions/checkout@v2 - uses: julia-actions/setup-julia@v1 @@ -67,7 +94,7 @@ jobs: cache-name: cache-artifacts with: path: ~/.julia/artifacts - key: ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} + key: ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}-nonblocking-${{ matrix.nonblocking }}-${{ hashFiles('**/Project.toml') }} restore-keys: | ${{ runner.os }}-${{ matrix.arch }}-test-${{ env.cache-name }}- ${{ runner.os }}-${{ matrix.arch }}-test- @@ -116,6 +143,7 @@ jobs: pg_ctl -D $env:PGDATA status if: ${{ runner.os == 'Windows' }} # Run Tests + - run: echo "LIBPQJL_CONNECTION_NONBLOCKING=${{ matrix.nonblocking }}" >> $GITHUB_ENV - run: psql -c '\conninfo' - uses: julia-actions/julia-buildpkg@latest - uses: julia-actions/julia-runtest@latest diff --git a/src/asyncresults.jl b/src/asyncresults.jl index 47f5cf62..a69c433f 100644 --- a/src/asyncresults.jl +++ b/src/asyncresults.jl @@ -84,7 +84,8 @@ function _consume(jl_conn::Connection) # this is important? # https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266 # if we used non-blocking connections we would need to check for `1` as well - if libpq_c.PQflush(jl_conn.conn) < 0 + # See _flush(jl_conn::Connection) in connections.jl + if !_flush(jl_conn) error(LOGGER, Errors.PQConnectionError(jl_conn)) end @@ -231,7 +232,7 @@ end function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...) async_result = _async_execute(jl_conn; kwargs...) do jl_conn - _async_submit(jl_conn.conn, query) + _async_submit(jl_conn, query) end return async_result @@ -252,9 +253,10 @@ function async_execute( string_params = string_parameters(parameters) pointer_params = parameter_pointers(string_params) - async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn + async_result = + _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn GC.@preserve string_params _async_submit( - jl_conn.conn, query, pointer_params; binary_format=binary_format + jl_conn, query, pointer_params; binary_format=binary_format ) end @@ -289,16 +291,22 @@ function _async_execute( return async_result end -function _async_submit(conn_ptr::Ptr{libpq_c.PGconn}, query::AbstractString) - return libpq_c.PQsendQuery(conn_ptr, query) == 1 +function _async_submit(jl_conn::Connection, query::AbstractString) + send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query) + if isnonblocking(jl_conn) + return _flush(jl_conn) + else + return send_status == 1 + end end function _async_submit( - conn_ptr::Ptr{libpq_c.PGconn}, + jl_conn::Connection, query::AbstractString, parameters::Vector{Ptr{UInt8}}; binary_format::Bool=false, ) + conn_ptr::Ptr{libpq_c.PGconn} = jl_conn.conn num_params = length(parameters) send_status = libpq_c.PQsendQueryParams( @@ -311,6 +319,11 @@ function _async_submit( zeros(Cint, num_params), # all parameters in text format Cint(binary_format), # return result in text or binary format ) - - return send_status == 1 + # send_status must be 1 + # if nonblock, we also want to _flush + if isnonblocking(jl_conn) + return send_status == 1 && _flush(jl_conn) + else + return send_status == 1 + end end diff --git a/src/connections.jl b/src/connections.jl index 91372430..23ff808e 100644 --- a/src/connections.jl +++ b/src/connections.jl @@ -267,6 +267,7 @@ function Connection( throw_error::Bool=true, connect_timeout::Real=0, options::Dict{String, String}=CONNECTION_OPTION_DEFAULTS, + nonblocking::Bool=false, kwargs... ) if options === CONNECTION_OPTION_DEFAULTS @@ -300,7 +301,7 @@ function Connection( ) # If password needed and not entered, prompt the user - if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1 + connection = if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1 push!(keywords, "password") user = unsafe_string(libpq_c.PQuser(jl_conn.conn)) # close this connection; will open another one below with the user-provided password @@ -309,7 +310,7 @@ function Connection( pass = Base.getpass(prompt) push!(values, read(pass, String)) Base.shred!(pass) - return handle_new_connection( + handle_new_connection( Connection( _connect_nonblocking(keywords, values, false; timeout=connect_timeout); kwargs... @@ -317,11 +318,25 @@ function Connection( throw_error=throw_error, ) else - return handle_new_connection( + handle_new_connection( jl_conn; throw_error=throw_error, ) end + + if nonblocking + success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblocking)) == 0 + if success + return connection + elseif throw_error + close(connection) + error(LOGGER, "Could not provide a non-blocking connection") + else + warn(LOGGER, "Could not provide a non-blocking connection") + return connection + end + end + return connection end # AbstractLock primitives: @@ -672,7 +687,7 @@ end """ ConnectionOption(pq_opt::libpq_c.PQconninfoOption) -> ConnectionOption -Construct a `ConnectionOption` from a `libpg_c.PQconninfoOption`. +Construct a `ConnectionOption` from a `libpq_c.PQconninfoOption`. """ function ConnectionOption(pq_opt::libpq_c.PQconninfoOption) return ConnectionOption( @@ -789,3 +804,73 @@ function socket(conn::Ptr{libpq_c.PGconn}) end socket(jl_conn::Connection) = socket(jl_conn.conn) + +""" + setnonblocking(jl_conn::Connection; nonblocking=true) + +Sets the nonblocking connection status of the PG connections. +While async_execute is non-blocking on the receiving side, +the sending side is still nonblocking without this +Returns true on success, false on failure + +https://www.postgresql.org/docs/current/libpq-async.html +""" +function setnonblocking(jl_conn::Connection; nonblocking=true) + return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblocking)) == 0 +end + +""" + isnonblocking(jl_conn::Connection) +Checks whether the connection is non-blocking. +Returns true if the connection is set to non-blocking, false otherwise + +https://www.postgresql.org/docs/current/libpq-async.html +""" +function isnonblocking(jl_conn::Connection) + return libpq_c.PQisnonblocking(jl_conn.conn) == 1 +end + +""" + _flush(jl_conn::Connection) + +Do the _flush dance described in the libpq docs. Required when the +connections are set to nonblocking and we want do send queries/data +without blocking. + +https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFLUSH +""" +function _flush(jl_conn::Connection) + local watcher = nothing + if isnonblocking(jl_conn) + watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes + end + try + while true + flushstatus = libpq_c.PQflush(jl_conn.conn) + # 0 indicates success + if flushstatus == 0 + return true + # -1 indicates error + elseif flushstatus < 0 + return false + # 1 indicates that we could not send all data without blocking, + elseif flushstatus == 1 + # need to wait FD + # Only applicable when the connection is in nonblocking mode + wait(watcher) # Wait for the watcher + # If it becomes write-ready, call PQflush again. + if watcher.mask.writable + continue # Call PGflush again, to send more data + end + if watcher.mask.readable + # if the stream is readable, we have to consume data from the server first. + success = libpq_c.PQconsumeInput(jl_conn.conn) == 1 + !success && return false + end + end + end + finally + # Just close the watcher + !isnothing(watcher) && close(watcher) + end +end diff --git a/test/runtests.jl b/test/runtests.jl index 6a018f64..08b1ed03 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -40,7 +40,8 @@ function count_allocs(f, args...) return Base.gc_alloc_count(stats.gcstats) end -@testset "LibPQ" begin +usenonblocking = get(ENV, "LIBPQJL_CONNECTION_NONBLOCKING", "false") === "true" +@testset "LibPQ $(usenonblocking ? "(nonblocking connection)" : "")" begin @testset "ConninfoDisplay" begin @test parse(LibPQ.ConninfoDisplay, "") == LibPQ.Normal @@ -81,7 +82,8 @@ end DATABASE_USER = get(ENV, "LIBPQJL_DATABASE_USER", "postgres") @testset "Example SELECT" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=false) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=false, nonblocking=usenonblocking) + @test conn isa LibPQ.Connection @test isopen(conn) @test status(conn) == LibPQ.libpq_c.CONNECTION_OK @@ -189,7 +191,7 @@ end end @testset "Example INSERT and DELETE" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -332,7 +334,7 @@ end @testset "load!" begin @testset "issue #204" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) close(execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -356,7 +358,7 @@ end @testset "COPY FROM" begin @testset "Example COPY FROM" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -400,7 +402,7 @@ end end @testset "Wrong column order" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) result = execute(conn, """ CREATE TEMPORARY TABLE libpqjl_test ( @@ -457,7 +459,7 @@ end @testset "do" begin local saved_conn - was_open = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) do jl_conn + was_open = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) do jl_conn saved_conn = jl_conn return isopen(jl_conn) end @@ -465,7 +467,7 @@ end @test was_open @test !isopen(saved_conn) - @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) do jl_conn + @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) do jl_conn saved_conn = jl_conn @test false end @@ -474,14 +476,14 @@ end end @testset "Version Numbers" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) # update this test before PostgreSQL 20.0 ;) @test LibPQ.pqv"7" <= LibPQ.server_version(conn) <= LibPQ.pqv"20" end @testset "Encoding" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) @test LibPQ.encoding(conn) == "UTF8" @@ -512,6 +514,7 @@ end options=Dict("IntervalStyle" => "postgres_verbose"), throw_error=true, type_map=Dict(:interval => String), + nonblocking=usenonblocking ) conn_info = LibPQ.conninfo(conn) @@ -631,7 +634,8 @@ end @testset "Finalizer" begin closed_flags = map(1:50) do _ - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) + closed = conn.closed finalize(conn) return closed @@ -645,7 +649,8 @@ end results = LibPQ.Result[] closed_flags = map(1:50) do _ - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) + push!(results, execute(conn, "SELECT 1;")) return conn.closed end @@ -660,7 +665,8 @@ end # with AsyncResults, which hold a reference to Connection closed_flags = asyncmap(1:50) do _ - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) + wait(async_execute(conn, "SELECT pg_sleep(1);")) return conn.closed end @@ -706,7 +712,8 @@ end end @testset "throw_error=false" begin - conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) + conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false, nonblocking=usenonblocking) + @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @test isopen(conn) @@ -724,7 +731,8 @@ end @testset "throw_error=true" begin @test_throws LibPQ.Errors.PQConnectionError LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=true) - conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false) + conn = LibPQ.Connection("dbname=123fake user=$DATABASE_USER"; throw_error=false, nonblocking=usenonblocking) + @test conn isa LibPQ.Connection @test status(conn) == LibPQ.libpq_c.CONNECTION_BAD @test isopen(conn) @@ -739,7 +747,7 @@ end @testset "Results" begin @testset "Nulls" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT NULL"; throw_error=true) @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK @@ -823,7 +831,7 @@ end end @testset "Not Nulls" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT NULL"; not_null=[false], throw_error=true) @test status(result) == LibPQ.libpq_c.PGRES_TUPLES_OK @@ -948,7 +956,7 @@ end end @testset "Tables.jl" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, """ SELECT no_nulls, yes_nulls FROM ( @@ -990,7 +998,7 @@ end end @testset "Duplicate names" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 1 AS col, 2 AS col;", not_null=true, throw_error=true) columns = Tables.columns(result) @@ -1006,7 +1014,7 @@ end end @testset "Uppercase Columns" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 1 AS \"Column\";") @test num_columns(result) == 1 @@ -1024,7 +1032,7 @@ end end @testset "PQResultError" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) try execute(conn, "SELECT log(-1);") @@ -1071,7 +1079,7 @@ end @testset "Type Conversions" begin @testset "Deprecations" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 'infinity'::timestamp;") @@ -1088,7 +1096,7 @@ end end @testset "Automatic" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, """ SELECT oid, typname, typlen, typbyval, typcategory @@ -1119,7 +1127,7 @@ end end @testset "Overrides" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELECT 4::bigint;") @test first(first(result)) === Int64(4) @@ -1168,7 +1176,8 @@ end @testset for binary_format in (LibPQ.TEXT, LibPQ.BINARY) @testset "Default Types" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) + test_data = [ ("3", Cint(3)), @@ -1334,7 +1343,7 @@ end end @testset "Specified Types" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) test_data = [ ("3", UInt, UInt(3)), @@ -1427,7 +1436,7 @@ end end @testset "Parameters" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) @testset "Arrays" begin tests = ( @@ -1541,7 +1550,7 @@ end end @testset "SQLString" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) execute(conn, sql``` CREATE TEMPORARY TABLE libpq_test_users ( @@ -1576,7 +1585,7 @@ end @testset "Query Errors" begin @testset "Syntax Errors" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELORCT NUUL;"; throw_error=false) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR @@ -1594,7 +1603,7 @@ end end @testset "Wrong No. Parameters" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute(conn, "SELORCT \$1;", String[]; throw_error=false) @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR @@ -1618,7 +1627,7 @@ end end @testset "Interface Errors" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) result = execute( conn, @@ -1651,7 +1660,7 @@ end ] # Establish connection and construct temporary table. - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; nonblocking=usenonblocking) # Get the column. result = execute( @@ -1677,7 +1686,7 @@ end @testset "Statements" begin @testset "No Params, Output" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) stmt = prepare(conn, "SELECT oid, typname FROM pg_type") @@ -1698,7 +1707,7 @@ end end @testset "Params, Output" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) stmt = prepare(conn, "SELECT oid, typname FROM pg_type WHERE oid = \$1") @@ -1723,11 +1732,11 @@ end end end - @testset "AsyncResults" begin + @testset "AsyncResults (usenonblocking connection=$usenonblocking)" begin trywait(ar::LibPQ.AsyncResult) = (try wait(ar) catch end; nothing) @testset "Basic" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) ar = async_execute(conn, "SELECT pg_sleep(2);"; throw_error=false) yield() @@ -1749,7 +1758,7 @@ end end @testset "Parameters" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) ar = async_execute( conn, @@ -1781,7 +1790,7 @@ end # Ensures queries wait for previous query completion before starting @testset "Wait in line to complete" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) first_ar = async_execute(conn, "SELECT pg_sleep(4);") yield() @@ -1810,7 +1819,7 @@ end end @testset "Cancel" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) # final query needs to be one that actually does something # on Windows, first query also needs to do something @@ -1842,7 +1851,7 @@ end end @testset "Canceled by closing connection" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) # final query needs to be one that actually does something # on Windows, first query also needs to do something @@ -1874,7 +1883,7 @@ end end @testset "FDWatcher: bad file descriptor (EBADF)" begin - conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true) + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER"; throw_error=true, nonblocking=usenonblocking) ar = async_execute(conn, "SELECT pg_sleep(3); SELECT * FROM pg_type;") yield()