Skip to content

Commit

Permalink
refactor: tighten bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed Oct 16, 2023
1 parent 32d8d1b commit 2085316
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 292 deletions.
26 changes: 14 additions & 12 deletions lib/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ let sendfile ?length path =

(* TODO: accept buffer for I/O, so that caller can pool buffers? *)
let stream_of_fd ?on_close fd =
let { Unix.st_size = length; _ } =
Eio_unix.run_in_systhread (fun () -> Unix.fstat fd)
let remaining =
let { Unix.st_size = length; _ } =
Eio_unix.run_in_systhread (fun () -> Unix.fstat fd)
in
Atomic.make length
in
let remaining = Atomic.make length in
Stream.from ~f:(fun () ->
let current = Atomic.get remaining in
if current = 0
Expand Down Expand Up @@ -343,15 +345,6 @@ module Raw = struct
let rec read_fn () =
let t = Lazy.force t in
let p, u = Promise.create () in
let on_read_direct buffer ~off ~len =
total_len := Int64.add !total_len (Int64.of_int len);
Promise.resolve u (Some (IOVec.make buffer ~off ~len))
and on_read_with_yield buffer ~off ~len =
total_len := Int64.add !total_len (Int64.of_int len);
Fiber.yield ();
Promise.resolve u (Some (IOVec.make buffer ~off ~len))
in
t.read_counter <- t.read_counter + 1;
let on_eof () =
Option.iter (fun f -> f t) on_eof;
Reader.close body;
Expand All @@ -367,12 +360,21 @@ module Raw = struct
Promise.resolve u None
in
let on_read =
let on_read_direct buffer ~off ~len =
total_len := Int64.add !total_len (Int64.of_int len);
Promise.resolve u (Some (IOVec.make buffer ~off ~len))
and on_read_with_yield buffer ~off ~len =
total_len := Int64.add !total_len (Int64.of_int len);
Fiber.yield ();
Promise.resolve u (Some (IOVec.make buffer ~off ~len))
in
if t.read_counter > 128
then (
t.read_counter <- 0;
on_read_with_yield)
else on_read_direct
in
t.read_counter <- t.read_counter + 1;
Reader.schedule_read body ~on_eof ~on_read;
Fiber.first
(fun () -> Promise.await p)
Expand Down
128 changes: 67 additions & 61 deletions lib/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ let create_https_connection ~sw ~config ~conn_info ~uri fd =
let*! { ssl = ssl_client; ssl_ctx } =
Openssl.connect ~config ~hostname:host fd
in
let ssl_socket = Eio_ssl.Context.ssl_socket ssl_ctx in
let (module Https), version =
let ssl_socket = Eio_ssl.Context.ssl_socket ssl_ctx in
match Ssl.get_negotiated_alpn_protocol ssl_socket with
| None ->
Logs.warn (fun m ->
Expand Down Expand Up @@ -127,9 +127,11 @@ let create_https_connection ~sw ~config ~conn_info ~uri fd =
ssl_client

let open_connection ~sw ~config ~uri env conn_info =
let clock = Eio.Stdenv.clock env in
let network = Eio.Stdenv.net env in
let*! socket = Connection.connect ~sw ~clock ~network ~config conn_info in
let*! socket =
let clock = Eio.Stdenv.clock env in
let network = Eio.Stdenv.net env in
Connection.connect ~sw ~clock ~network ~config conn_info
in
(if config.Config.tcp_nodelay
then
let fd = Eio_unix.Resource.fd_opt socket |> Option.get in
Expand Down Expand Up @@ -320,28 +322,27 @@ let make_request_info
=
let { Connection_info.host; scheme; _ } = info in
let is_h2c_upgrade = is_h2c_upgrade ~config ~version ~scheme in
let h2_settings = H2.Settings.to_base64 (Config.to_http2_settings config) in
let canonical_headers =
(* Important that this doesn't shadow the labeled `headers` argument
* above. We need the original headers as issued by the caller in order to
* reproduce them e.g. when following redirects. *)
let headers =
let open Headers in
if is_h2c_upgrade
then
(Well_known.connection, "Upgrade, HTTP2-Settings")
:: (Well_known.upgrade, "h2c")
:: ("HTTP2-Settings", Result.get_ok h2_settings)
:: headers
else headers
in
Headers.canonicalize_headers
~version
~host
~body_length:body.Body.length
headers
in
let request =
let canonical_headers =
let h2_settings =
H2.Settings.to_base64 (Config.to_http2_settings config)
in
let headers =
let open Headers in
if is_h2c_upgrade
then
(Well_known.connection, "Upgrade, HTTP2-Settings")
:: (Well_known.upgrade, "h2c")
:: ("HTTP2-Settings", Result.get_ok h2_settings)
:: headers
else headers
in
Headers.canonicalize_headers
~version
~host
~body_length:body.Body.length
headers
in
Request.create
~meth
~version
Expand Down Expand Up @@ -398,28 +399,29 @@ let rec send_request_and_handle_response
());
if Status.is_permanent_redirection response.status
then conn.uri <- new_uri;
let target = Uri.path_and_query new_uri in
(* From RFC7231§6.4:
* Note: In HTTP/1.0, the status codes 301 (Moved Permanently) and 302
* (Found) were defined for the first type of redirect ([RFC1945],
* Section 9.3). Early user agents split on whether the method applied
* to the redirect target would be the same as the original request or
* would be rewritten as GET. Although HTTP originally defined the former
* semantics for 301 and 302 (to match its original implementation at
* CERN), and defined 303 (See Other) to match the latter semantics,
* prevailing practice gradually converged on the latter semantics for
* 301 and 302 as well. The first revision of HTTP/1.1 added 307
* (Temporary Redirect) to indicate the former semantics without being
* impacted by divergent practice. Over 10 years later, most user agents
* still do method rewriting for 301 and 302; therefore, this
* specification makes that behavior conformant when the original request
* is POST. *)
let meth' =
match meth, response.status with
| `POST, (`Found | `Moved_permanently) -> `GET
| _ -> meth
in
let request_info' =
let target = Uri.path_and_query new_uri in
(* From RFC7231§6.4:
* Note: In HTTP/1.0, the status codes 301 (Moved Permanently) and
* 302 (Found) were defined for the first type of redirect
* ([RFC1945], Section 9.3). Early user agents split on whether
* the method applied to the redirect target would be the same as
* the original request or would be rewritten as GET. Although
* HTTP originally defined the former semantics for 301 and 302 (to
* match its original implementation at CERN), and defined 303 (See
* Other) to match the latter semantics, prevailing practice
* gradually converged on the latter semantics for 301 and 302 as
* well. The first revision of HTTP/1.1 added 307 (Temporary
* Redirect) to indicate the former semantics without being
* impacted by divergent practice. Over 10 years later, most user
* agents still do method rewriting for 301 and 302; therefore,
* this specification makes that behavior conformant when the
* original request is POST. *)
let meth' =
match meth, response.status with
| `POST, (`Found | `Moved_permanently) -> `GET
| _ -> meth
in
make_request_info
t
~remaining_redirects:(remaining_redirects - 1)
Expand Down Expand Up @@ -452,8 +454,10 @@ let call t ~meth ?(headers = []) ?(body = Body.empty) target =
match reused with
| Error #Error.client as err -> err
| Ok _ ->
let headers = t.config.default_headers @ headers in
let request_info = make_request_info t ~meth ~headers ~body target in
let request_info =
let headers = t.config.default_headers @ headers in
make_request_info t ~meth ~headers ~body target
in
let (Connection.Conn conn) = t.conn in
conn.persistent <- Request.persistent_connection request_info.request;
send_request_and_handle_response t ~body request_info
Expand All @@ -480,20 +484,22 @@ let ws_upgrade :
-> (Ws.Descriptor.t, [> Error.client ]) result
=
fun t ?(headers = []) target ->
let (Conn { info; _ }) = t.conn in
(* From RFC6455§4.1:
* The value of this header field MUST be a nonce consisting of a randomly
* selected 16-byte value that has been base64-encoded (see Section 4 of
* [RFC4648]). The nonce MUST be selected randomly for each connection. *)
let nonce = Openssl.random_string 16 in
let request =
Ws.upgrade_request
~headers:(Httpaf.Headers.of_list headers)
~scheme:info.scheme
~nonce
target
let*! response =
let request =
let (Conn { info; _ }) = t.conn in
(* From RFC6455§4.1:
* The value of this header field MUST be a nonce consisting of a randomly
* selected 16-byte value that has been base64-encoded (see Section 4 of
* [RFC4648]). The nonce MUST be selected randomly for each connection. *)
let nonce = Openssl.random_string 16 in
Ws.upgrade_request
~headers:(Httpaf.Headers.of_list headers)
~scheme:info.scheme
~nonce
target
in
send t request
in
let*! response = send t request in
match Body.drain response.body with
| Error #Error.t as err -> err
| Ok () -> Http_impl.upgrade_connection ~sw:t.sw t.conn
Expand Down
100 changes: 55 additions & 45 deletions lib/connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,51 +35,61 @@ module Version = Httpaf.Version
module Logs =
(val Logging.setup ~src:"piaf.connection" ~doc:"Piaf Connection module")

let resolve_host env ~config ~port hostname : (_, [> Error.client ]) result =
let clock = Eio.Stdenv.clock env in
let network = Eio.Stdenv.net env in
match
Eio.Time.with_timeout_exn clock config.Config.connect_timeout (fun () ->
Eio.Net.getaddrinfo_stream ~service:(string_of_int port) network hostname)
with
| [] ->
Error
(`Connect_error (Format.asprintf "Can't resolve hostname: %s" hostname))
| xs ->
(match config.Config.prefer_ip_version with
| `Both ->
let order_v4v6 = Eio.Net.Ipaddr.fold ~v4:(fun _ -> -1) ~v6:(fun _ -> 1) in
Ok
(* Sort IPv4 ahead of IPv6 for compatibility. *)
(List.sort
(fun a1 a2 ->
match a1, a2 with
| `Unix s1, `Unix s2 -> String.compare s1 s2
| `Tcp (ip1, _), `Tcp (ip2, _) ->
compare (order_v4v6 ip1) (order_v4v6 ip2)
| `Unix _, `Tcp _ -> 1
| `Tcp _, `Unix _ -> -1)
xs)
| `V4 ->
Ok
(List.filter
(function
| `Tcp (ip, _) ->
Eio.Net.Ipaddr.fold ~v4:(fun _ -> true) ~v6:(fun _ -> false) ip
| `Unix _ -> true)
xs)
| `V6 ->
Ok
(List.filter
(function
| `Tcp (ip, _) ->
Eio.Net.Ipaddr.fold ~v4:(fun _ -> false) ~v6:(fun _ -> true) ip
| `Unix _ -> true)
xs))
| exception Eio.Time.Timeout ->
Error
(`Connect_error
(Format.asprintf "Timed out resolving hostname: %s" hostname))
let resolve_host =
let order_v4v6 = Eio.Net.Ipaddr.fold ~v4:(fun _ -> -1) ~v6:(fun _ -> 1) in
fun (env : Eio_unix.Stdenv.base) ~config ~port hostname ->
let clock = Eio.Stdenv.clock env in
let network = Eio.Stdenv.net env in
match
Eio.Time.with_timeout_exn clock config.Config.connect_timeout (fun () ->
Eio.Net.getaddrinfo_stream
~service:(string_of_int port)
network
hostname)
with
| [] ->
Error
(`Connect_error (Format.asprintf "Can't resolve hostname: %s" hostname))
| xs ->
(match config.Config.prefer_ip_version with
| `Both ->
Ok
(* Sort IPv4 ahead of IPv6 for compatibility. *)
(List.sort
(fun a1 a2 ->
match a1, a2 with
| `Unix s1, `Unix s2 -> String.compare s1 s2
| `Tcp (ip1, _), `Tcp (ip2, _) ->
compare (order_v4v6 ip1) (order_v4v6 ip2)
| `Unix _, `Tcp _ -> 1
| `Tcp _, `Unix _ -> -1)
xs)
| `V4 ->
Ok
(List.filter
(function
| `Tcp (ip, _) ->
Eio.Net.Ipaddr.fold
~v4:(fun _ -> true)
~v6:(fun _ -> false)
ip
| `Unix _ -> true)
xs)
| `V6 ->
Ok
(List.filter
(function
| `Tcp (ip, _) ->
Eio.Net.Ipaddr.fold
~v4:(fun _ -> false)
~v6:(fun _ -> true)
ip
| `Unix _ -> true)
xs))
| exception Eio.Time.Timeout ->
Error
(`Connect_error
(Format.asprintf "Timed out resolving hostname: %s" hostname))

module Info = struct
(* This represents information that changes from connection to connection,
Expand Down
18 changes: 10 additions & 8 deletions lib/form.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ module Multipart = struct
let stream (* , _or_error *) = Body.to_stream request.body in
let kvs, push_to_kvs = Stream.create 128 in
let emit name stream = push_to_kvs (Some (name, stream)) in
let+! multipart =
Multipart.parse_multipart_form
~content_type
~max_chunk_size
~emit
~finish:(fun () -> push_to_kvs None)
stream
let+! multipart_fields =
let+! multipart =
Multipart.parse_multipart_form
~content_type
~max_chunk_size
~emit
~finish:(fun () -> push_to_kvs None)
stream
in
Multipart.result_fields multipart
in
let multipart_fields = Multipart.result_fields multipart in
Stream.map
~f:(fun (name, stream) ->
let name = Option.get name in
Expand Down
2 changes: 1 addition & 1 deletion lib/http2.ml
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ module HTTP : Http_intf.HTTP2 with type scheme = Scheme.http = struct
~error_handler:(make_client_error_handler error_handler `Connection)
(response_handler, response_error_handler)
in
Stdlib.Result.map
Result.map
(fun connection ->
(* Perform the runtime upgrade -- stop speaking HTTP/1.1, start
* speaking HTTP/2 by feeding Gluten the `H2.Client_connection`
Expand Down
Loading

0 comments on commit 2085316

Please sign in to comment.