1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
open Lwt.Infix
module Make
(IO:S.IO)
(Net:S.Net with module IO = IO) = struct
module IO = IO
module Response = Make.Response(IO)
module Request = Make.Request(IO)
type ctx = Net.ctx [@@deriving sexp_of]
let default_ctx = Net.default_ctx
let read_response ~closefn ic _oc meth =
Response.read ic >>= begin function
| `Invalid reason ->
Lwt.fail (Failure ("Failed to read response: " ^ reason))
| `Eof -> Lwt.fail (Failure "Client connection was closed")
| `Ok res -> begin
let has_body = match meth with
| `HEAD -> `No
| _ -> Response.has_body res
in
match has_body with
| `Yes | `Unknown ->
let reader = Response.make_body_reader res ic in
let stream = Body.create_stream Response.read_body_chunk reader in
let closefn = closefn in
Lwt.on_success (Lwt_stream.closed stream) closefn;
let gcfn _st = closefn () in
Gc.finalise gcfn stream;
let body = Body.of_stream stream in
Lwt.return (res, body)
| `No -> closefn (); Lwt.return (res, `Empty)
end
end
|> fun t ->
Lwt.on_cancel t closefn;
Lwt.on_failure t (fun _exn -> closefn ());
t
let is_meth_chunked = function
| `HEAD -> false
| `GET -> false
| `DELETE -> false
| _ -> true
let call ?(ctx=default_ctx) ? ?(body=`Empty) ?chunked meth uri =
let = match headers with None -> Header.init () | Some h -> h in
Net.connect_uri ~ctx uri >>= fun (_conn, ic, oc) ->
let closefn () = Net.close ic oc in
let chunked =
match chunked with
| None -> is_meth_chunked meth
| Some v -> v in
let sent = match chunked with
| true ->
let req = Request.make_for_client ~headers ~chunked meth uri in
Request.write (fun writer ->
Body.write_body (Request.write_body writer) body) req oc
| false ->
Body.length body >>= fun (body_length, buf) ->
let req =
Request.make_for_client ~headers ~chunked ~body_length meth uri
in
Request.write (fun writer ->
Body.write_body (Request.write_body writer) buf) req oc
in
sent >>= fun () ->
read_response ~closefn ic oc meth
let head ?ctx:_ ? uri =
call ?headers `HEAD uri
>|= fst
let get ?ctx ? uri = call ?ctx ?headers `GET uri
let delete ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `DELETE uri
let post ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `POST uri
let put ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `PUT uri
let patch ?ctx ?body ?chunked ? uri =
call ?ctx ?headers ?body ?chunked `PATCH uri
let post_form ?ctx ? ~params uri =
let = Header.add_opt_unless_exists headers
"content-type" "application/x-www-form-urlencoded" in
let body = Body.of_string (Uri.encoded_of_query params) in
post ?ctx ~chunked:false ~headers ~body uri
let callv ?(ctx=default_ctx) uri reqs =
Net.connect_uri ~ctx uri >>= fun (_conn, ic, oc) ->
let meth_stream = Lwt_stream.map_s (fun (req,body) ->
Request.write (fun writer ->
Body.write_body (Request.write_body writer) body
) req oc >>= fun () ->
Lwt.return (Request.meth req)
) reqs in
let read_m = Lwt_mutex.create () in
let last_body = ref None in
let closefn () = Lwt_mutex.unlock read_m in
let resps = Lwt_stream.map_s (fun meth ->
begin match !last_body with
| None -> Lwt.return_unit
| Some body -> Body.drain_body body
end >>= fun () ->
Lwt_mutex.with_lock read_m (fun () -> read_response ~closefn ic oc meth)
>|= (fun ((_,body) as x) ->
last_body := Some body;
x
)
) meth_stream in
Lwt.on_success (Lwt_stream.closed resps) (fun () -> Net.close ic oc);
Lwt.return resps
end