1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
open Base
open Async_kernel
open Async_unix
module Net = struct
let lookup uri =
let host = Uri.host_with_default ~default:"localhost" uri in
match Uri_services.tcp_port_of_uri ~default:"http" uri with
| None ->
Deferred.Or_error.error_string
"Net.lookup: failed to get TCP port form Uri"
| Some port -> (
let open Unix in
Addr_info.get ~host
[ Addr_info.AI_FAMILY PF_INET; Addr_info.AI_SOCKTYPE SOCK_STREAM ]
>>| function
| { Addr_info.ai_addr = ADDR_INET (addr, _); _ } :: _ ->
Or_error.return (host, Ipaddr_unix.of_inet_addr addr, port)
| _ -> Or_error.error "Failed to resolve Uri" uri Uri_sexp.sexp_of_t)
let connect_uri ?interrupt ?ssl_config uri =
(match Uri.scheme uri with
| Some "httpunix" ->
let host = Uri.host_with_default ~default:"localhost" uri in
return @@ `Unix_domain_socket host
| _ -> (
lookup uri |> Deferred.Or_error.ok_exn >>= fun (host, addr, port) ->
return
@@
match (Uri.scheme uri, ssl_config) with
| Some "https", Some config -> `OpenSSL (addr, port, config)
| Some "https", None ->
let config = Conduit_async.V2.Ssl.Config.create ~hostname:host () in
`OpenSSL (addr, port, config)
| _ -> `TCP (addr, port)))
>>= fun mode ->
Conduit_async.V2.connect ?interrupt mode >>| fun (r, w) ->
(Input_channel.create r, w)
end
let read_response ic =
Io.Response.read ic >>| function
| `Eof -> failwith "Connection closed by remote host"
| `Invalid reason -> failwith reason
| `Ok res -> (
match Cohttp.Response.has_body res with
| `Yes | `Unknown ->
let reader = Io.Response.make_body_reader res ic in
let pipe =
Body.Private.pipe_of_body Io.Response.read_body_chunk reader
in
(res, pipe)
| `No ->
let pipe = Pipe.of_list [] in
(res, pipe))
let request ?interrupt ?ssl_config ?uri ?(body = `Empty) req =
let uri = match uri with Some t -> t | None -> Cohttp.Request.uri req in
Net.connect_uri ?interrupt ?ssl_config uri >>= fun (ic, oc) ->
try_with (fun () ->
Io.Request.write ~flush:false
(fun writer ->
Body.Private.write_body Io.Request.write_body body writer)
req oc
>>= fun () ->
read_response ic >>| fun (resp, body) ->
don't_wait_for
( Pipe.closed body >>= fun () ->
Deferred.all_unit [ Input_channel.close ic; Writer.close oc ] );
(resp, `Pipe body))
>>= function
| Ok res -> return res
| Error e ->
don't_wait_for (Input_channel.close ic);
don't_wait_for (Writer.close oc);
raise e
module Connection = struct
type t' = { ic : Input_channel.t; oc : Writer.t }
type t = t' Sequencer.t
let connect ?interrupt ?ssl_config uri =
Net.connect_uri ?interrupt ?ssl_config uri >>| fun (ic, oc) ->
let t = { ic; oc } |> Sequencer.create ~continue_on_error:false in
Throttle.at_kill t (fun { ic; oc } ->
Deferred.both (Writer.close oc) (Input_channel.close ic)
>>| fun ((), ()) -> ());
Deferred.any [ Writer.consumer_left oc; Input_channel.close_finished ic ]
>>| (fun () -> Throttle.kill t)
|> don't_wait_for;
t
let close t =
Throttle.kill t;
Throttle.cleaned t
let close_finished t = Throttle.cleaned t
let is_closed t = Throttle.is_dead t
let request ?(body = Body.empty) t req =
let res = Ivar.create () in
Throttle.enqueue t (fun { ic; oc } ->
Io.Request.write ~flush:false
(fun writer ->
Body.Private.write_body Io.Request.write_body body writer)
req oc
>>= fun () ->
read_response ic >>= fun (resp, body) ->
Ivar.fill_exn res (resp, `Pipe body);
Pipe.closed body)
|> don't_wait_for;
Ivar.read res
end
let callv ?interrupt ?ssl_config uri reqs =
Connection.connect ?interrupt ?ssl_config uri >>| fun connection ->
let responses =
Pipe.map' ~max_queue_length:1 reqs ~f:(fun reqs ->
Deferred.Queue.map ~how:`Sequential reqs ~f:(fun (req, body) ->
Connection.request ~body connection req))
in
Pipe.closed responses
>>= (fun () -> Connection.close connection)
|> don't_wait_for;
responses
let call ?interrupt ?ssl_config ? ?(chunked = false) ?(body = `Empty)
meth uri =
(match chunked with
| false ->
Body.Private.disable_chunked_encoding body >>| fun (body, body_length) ->
( Cohttp.Request.make_for_client ?headers ~chunked ~body_length meth uri,
body )
| true ->
Deferred.return
(match Body.is_empty body with
| `True ->
( Cohttp.Request.make_for_client ?headers ~chunked:false
~body_length:0L meth uri,
body )
| `Unknown | `False ->
( Cohttp.Request.make_for_client ?headers ~chunked:true meth uri,
body )))
>>= fun (req, body) -> request ?interrupt ?ssl_config ~body ~uri req
let get ?interrupt ?ssl_config ? uri =
call ?interrupt ?ssl_config ?headers ~chunked:false `GET uri
let head ?interrupt ?ssl_config ? uri =
call ?interrupt ?ssl_config ?headers ~chunked:false `HEAD uri
>>| fun (res, body) ->
(match body with `Pipe p -> Pipe.close_read p | _ -> ());
res
let post ?interrupt ?ssl_config ? ?(chunked = false) ?body uri =
call ?interrupt ?ssl_config ?headers ~chunked ?body `POST uri
let post_form ?interrupt ?ssl_config ? ~params uri =
let =
Cohttp.Header.add_opt_unless_exists headers "content-type"
"application/x-www-form-urlencoded"
in
let body = Body.of_string (Uri.encoded_of_query params) in
post ?interrupt ?ssl_config ~headers ~chunked:false ~body uri
let put ?interrupt ?ssl_config ? ?(chunked = false) ?body uri =
call ?interrupt ?ssl_config ?headers ~chunked ?body `PUT uri
let patch ?interrupt ?ssl_config ? ?(chunked = false) ?body uri =
call ?interrupt ?ssl_config ?headers ~chunked ?body `PATCH uri
let delete ?interrupt ?ssl_config ? ?(chunked = false) ?body uri =
call ?interrupt ?ssl_config ?headers ~chunked ?body `DELETE uri