1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
open Lwt.Infix
module Make (IO : S.IO) (Net : S.Net with module IO = IO) = struct
module IO = IO
module Response = Make.Response (IO)
module Request = Make.Request (IO)
let src = Logs.Src.create "cohttp.lwt.client" ~doc:"Cohttp Lwt client"
module Log = (val Logs.src_log src : Logs.LOG)
type ctx = Net.ctx
let read_body ~closefn ic res =
match Response.has_body res with
| `Yes | `Unknown ->
let reader = Response.make_body_reader res ic in
let stream = Body.create_stream Response.read_body_chunk reader in
let body = Body.of_stream stream in
let closed = ref false in
Lwt.on_success (Lwt_stream.closed stream) (fun () ->
closed := true;
closefn ());
Gc.finalise_last
(fun () ->
if not !closed then
Log.warn (fun m ->
m
"Body not consumed, leaking stream! Refer to \
https://github.com/mirage/ocaml-cohttp/issues/730 for \
additional details"))
stream;
body
| `No ->
closefn ();
`Empty
let is_meth_chunked = function
| `HEAD -> false
| `GET -> false
| `DELETE -> false
| _ -> true
let call ?(ctx = Net.default_ctx) ? ?(body = `Empty) ?chunked meth uri
=
let = match headers with None -> Header.init () | Some h -> h in
Net.connect_uri ~ctx uri >>= fun (_conn, ic, oc) ->
let closefn () = Net.close ic oc in
let chunked =
match chunked with None -> is_meth_chunked meth | Some v -> v
in
let sent =
match chunked with
| true ->
let req = Request.make_for_client ~headers ~chunked meth uri in
Request.write
(fun writer -> Body.write_body (Request.write_body writer) body)
req oc
| false ->
Body.length body >>= fun (body_length, buf) ->
let req =
Request.make_for_client ~headers ~chunked ~body_length meth uri
in
Request.write
(fun writer -> Body.write_body (Request.write_body writer) buf)
req oc
in
sent >>= fun () ->
(Response.read ic >>= function
| `Invalid reason ->
Lwt.fail (Failure ("Failed to read response: " ^ reason))
| `Eof -> Lwt.fail (Failure "Server closed connection prematurely.")
| `Ok res -> (
match meth with
| `HEAD ->
closefn ();
Lwt.return (res, `Empty)
| _ ->
let body = read_body ~closefn ic res in
Lwt.return (res, body)))
|> fun t ->
Lwt.on_cancel t closefn;
Lwt.on_failure t (fun _exn -> closefn ());
t
let head ?ctx ? uri = call ?ctx ?headers `HEAD uri >|= fst
let get ?ctx ? uri = call ?ctx ?headers `GET uri
let delete ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `DELETE uri
let post ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `POST uri
let put ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `PUT uri
let patch ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `PATCH uri
let post_form ?ctx ? ~params uri =
let =
Header.add_opt_unless_exists headers "content-type"
"application/x-www-form-urlencoded"
in
let body = Body.of_string (Uri.encoded_of_query params) in
post ?ctx ~chunked:false ~headers ~body uri
let callv ?(ctx = Net.default_ctx) uri reqs =
Net.connect_uri ~ctx uri >>= fun (_conn, ic, oc) ->
let meth_stream =
Lwt_stream.map_s
(fun (req, body) ->
Request.write
(fun writer -> Body.write_body (Request.write_body writer) body)
req oc
>>= fun () -> Lwt.return (Request.meth req))
reqs
in
let read_m = Lwt_mutex.create () in
let closefn () = Lwt_mutex.unlock read_m in
let resps =
Lwt_stream.map_s
(fun meth ->
Lwt_mutex.with_lock read_m (fun () ->
(Response.read ic >>= function
| `Invalid reason ->
Lwt.fail (Failure ("Failed to read response: " ^ reason))
| `Eof ->
Lwt.fail (Failure "Server closed connection prematurely.")
| `Ok res -> (
match meth with
| `HEAD ->
closefn ();
Lwt.return (res, `Empty)
| _ ->
let body = read_body ~closefn ic res in
Lwt.return (res, body)))
|> fun t ->
Lwt.on_cancel t closefn;
Lwt.on_failure t (fun _exn -> closefn ());
t))
meth_stream
in
Lwt.on_success (Lwt_stream.closed resps) (fun () -> Net.close ic oc);
Lwt.return resps
end