Source file http_miou_client.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
let src = Logs.Src.create "http-miou-client"

module Log = (val Logs.src_log src : Logs.LOG)
open Http_miou_unix

module H1_Client_connection = struct
  include H1.Client_connection

  let yield_reader _ = assert false

  let next_read_operation t =
    (next_read_operation t :> [ `Close | `Read | `Yield | `Upgrade ])

  let next_write_operation t =
    (next_write_operation t
      :> [ `Close of int
         | `Write of Bigstringaf.t Faraday.iovec list
         | `Yield
         | `Upgrade ])
end

module H2_Client_connection = struct
  include H2.Client_connection

  let next_read_operation t =
    (next_read_operation t :> [ `Close | `Read | `Yield | `Upgrade ])

  let next_write_operation t =
    (next_write_operation t
      :> [ `Close of int
         | `Write of Bigstringaf.t Faraday.iovec list
         | `Yield
         | `Upgrade ])
end

module A = Runtime.Make (TLS) (H1_Client_connection)
module B = Runtime.Make (TCP) (H1_Client_connection)
module C = Runtime.Make (TLS) (H2_Client_connection)
module D = Runtime.Make (TCP) (H2_Client_connection)

type config = [ `V1 of H1.Config.t | `V2 of H2.Config.t ]
type flow = [ `Tls of Tls_miou_unix.t | `Tcp of Miou_unix.file_descr ]
type request = [ `V1 of H1.Request.t | `V2 of H2.Request.t ]
type response = [ `V1 of H1.Response.t | `V2 of H2.Response.t ]

type error =
  [ `V1 of H1.Client_connection.error
  | `V2 of H2.Client_connection.error
  | `Protocol of string
  | `Exn of exn ]

let pp_error ppf = function
  | `V1 (`Malformed_response msg) ->
      Fmt.pf ppf "Malformed HTTP/1.1 response: %s" msg
  | `V1 (`Invalid_response_body_length _resp) ->
      Fmt.pf ppf "Invalid response body length"
  | `V1 (`Exn exn) | `V2 (`Exn exn) ->
      Fmt.pf ppf "Got an unexpected exception: %S" (Printexc.to_string exn)
  | `V2 (`Malformed_response msg) -> Fmt.pf ppf "Malformed H2 response: %s" msg
  | `V2 (`Invalid_response_body_length _resp) ->
      Fmt.pf ppf "Invalid response body length"
  | `V2 (`Protocol_error (err, msg)) ->
      Fmt.pf ppf "Protocol error %a: %s" H2.Error_code.pp_hum err msg
  | `Protocol msg -> Fmt.string ppf msg
  | `Exn exn -> Fmt.pf ppf "%S" (Printexc.to_string exn)

type ('conn, 'resp, 'body) version =
  | V1 : (H1.Client_connection.t, H1.Response.t, H1.Body.Writer.t) version
  | V2 : (H2.Client_connection.t, H2.Response.t, H2.Body.Writer.t) version

exception Error of error

let empty = Printexc.get_callstack 0

type 'acc process =
  | Process : {
        version: ('conn, 'resp, 'body) version
      ; acc: 'acc ref
      ; response: 'resp Miou.Computation.t
      ; body: 'body
      ; conn: 'conn
      ; process: unit Miou.t
    }
      -> 'acc process

let http_1_1_response_handler ~f acc =
  let acc = ref acc in
  let response = Miou.Computation.create () in
  let response_handler resp body =
    let rec on_eof () = H1.Body.Reader.close body
    and on_read bstr ~off ~len =
      let str = Bigstringaf.substring bstr ~off ~len in
      acc := f (`V1 resp) !acc str;
      H1.Body.Reader.schedule_read body ~on_read ~on_eof
    in
    H1.Body.Reader.schedule_read body ~on_read ~on_eof;
    ignore (Miou.Computation.try_return response resp)
  in
  (response_handler, response, acc)

let http_1_1_error_handler response err =
  let err = `V1 err in
  let _set = Miou.Computation.try_cancel response (Error err, empty) in
  Log.err (fun m -> m "%a" pp_error err)

let h2_response_handler conn ~f response acc =
  let acc = ref acc in
  let response_handler resp body =
    let rec on_eof () =
      H2.Body.Reader.close body;
      H2.Client_connection.shutdown conn
    and on_read bstr ~off ~len =
      let str = Bigstringaf.substring bstr ~off ~len in
      acc := f (`V2 resp) !acc str;
      H2.Body.Reader.schedule_read body ~on_read ~on_eof
    in
    H2.Body.Reader.schedule_read body ~on_read ~on_eof;
    ignore (Miou.Computation.try_return response resp)
  in
  (response_handler, acc)

let h2_error_handler conn response err =
  let err = `V2 err in
  let _set = Miou.Computation.try_cancel response (Error err, empty) in
  H2.Client_connection.shutdown (Lazy.force conn);
  Log.err (fun m -> m "%a" pp_error err)

let pp_request ppf (flow, request) =
  match (flow, request) with
  | `Tls _, `V1 _ -> Fmt.string ppf "http/1.1 + tls"
  | `Tcp _, `V1 _ -> Fmt.string ppf "http/1.1"
  | `Tls _, `V2 _ -> Fmt.string ppf "h2 + tls"
  | `Tcp _, `V2 _ -> Fmt.string ppf "h2"

let run ~f acc config flow request =
  Log.debug (fun m -> m "start a new %a request" pp_request (flow, request));
  match (flow, config, request) with
  | `Tls flow, `V1 config, `V1 request ->
      let read_buffer_size = config.H1.Config.read_buffer_size in
      let response_handler, response, acc = http_1_1_response_handler ~f acc in
      let error_handler = http_1_1_error_handler response in
      let body, conn =
        H1.Client_connection.request ~config request ~error_handler
          ~response_handler
      in
      let prm = A.run conn ~read_buffer_size flow in
      Process { version= V1; acc; response; body; conn; process= prm }
  | `Tcp flow, `V1 config, `V1 request ->
      let read_buffer_size = config.H1.Config.read_buffer_size in
      let response_handler, response, acc = http_1_1_response_handler ~f acc in
      let error_handler = http_1_1_error_handler response in
      let body, conn =
        H1.Client_connection.request ~config request ~error_handler
          ~response_handler
      in
      let prm = B.run conn ~read_buffer_size flow in
      Process { version= V1; acc; response; body; conn; process= prm }
  | `Tls flow, `V2 config, `V2 request ->
      let read_buffer_size = config.H2.Config.read_buffer_size in
      let response = Miou.Computation.create () in
      (* NOTE(dinosaure): With regard to [h2], there are two levels of error:
         one at the protocol level and one at the request level. [httpcats] is
         currently designed to make only one request, even with [h2]. Thus, if
         an error occurs at the request level, it means that the connection must
         be "shutdown" — see [h2_error_handler].

         Here we use the “lazy”/“rec” trick to have the instance of our [conn]
         connection in our [error_handler] at both levels. *)
      let rec error_handler = fun err -> h2_error_handler conn response err
      and conn = lazy (H2.Client_connection.create ~config ~error_handler ()) in
      let conn = Lazy.force conn in
      let response_handler, acc = h2_response_handler conn ~f response acc in
      let body =
        H2.Client_connection.request conn ~error_handler ~response_handler
          request
      in
      let prm = C.run conn ~read_buffer_size flow in
      Process { version= V2; acc; response; body; conn; process= prm }
  | `Tcp flow, `V2 config, `V2 request ->
      let read_buffer_size = config.H2.Config.read_buffer_size in
      let response = Miou.Computation.create () in
      let rec error_handler = fun err -> h2_error_handler conn response err
      and conn = lazy (H2.Client_connection.create ~config ~error_handler ()) in
      let conn = Lazy.force conn in
      let response_handler, acc = h2_response_handler conn ~f response acc in
      let body =
        H2.Client_connection.request conn ~error_handler ~response_handler
          request
      in
      let prm = D.run conn ~read_buffer_size flow in
      Process { version= V2; acc; response; body; conn; process= prm }
  | _ -> invalid_arg "Http_miou_client.run"