1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
module Httpaf = Dream_httpaf_.Httpaf
module Httpaf_lwt_unix = Dream_httpaf__lwt_unix.Httpaf_lwt_unix
module Message = Dream_pure.Message
module Method = Dream_pure.Method
module Stream = Dream_pure.Stream
(** Runs a request for both cleartext and SSL HTTP/1.1 connections, since the
code is the same once the request is created. *)
let general send_request client connection (request : Message.request) =
let httpaf_request : Httpaf.Request.t =
Message.set_content_length_headers request;
let =
Message.all_headers request
|> Httpaf.Headers.of_list
and method_ =
Httpaf.Method.of_string
(Method.method_to_string (Message.method_ request))
and target = Uri.path_and_query (Uri.of_string (Message.target request)) in
Httpaf.Request.create ~headers method_ target
in
let response_promise, receive_response = Lwt.wait () in
let received_response = ref false in
let reported_exn = ref None in
let exn_handler = ref ignore in
let response_handler
(httpaf_response : Httpaf.Response.t) httpaf_response_body =
received_response := true;
let got_eof = ref false in
let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn =
match !reported_exn with
| Some the_exn ->
exn the_exn
| None ->
if !got_eof then
close 1000
else begin
exn_handler := exn;
Httpaf.Body.Reader.schedule_read
httpaf_response_body
~on_eof:(fun () ->
got_eof := true;
exn_handler := ignore;
close 1000)
~on_read:(fun buffer ~off ~len ->
exn_handler := ignore;
data buffer off len true false)
end
and close _code =
Httpaf.Body.Reader.close httpaf_response_body
and abort exn =
reported_exn := Some exn;
Httpaf.Client_connection.report_exn connection exn
in
let client_stream =
Stream.stream (Stream.reader ~read ~close ~abort) Stream.no_writer in
Message.response
~code:(Httpaf.Status.to_code httpaf_response.status)
~headers:(Httpaf.Headers.to_list httpaf_response.headers)
client_stream
Stream.null
|> Lwt.wakeup_later receive_response
in
let error_handler = function
| `Malformed_response explanation ->
Lwt.wakeup_later_exn receive_response
(Failure ("malformed response: " ^ explanation))
| `Invalid_response_body_length _response ->
Lwt.wakeup_later_exn receive_response
(Failure "invalid response body length")
| `Exn exn ->
if not !received_response then
Lwt.wakeup_later_exn receive_response exn
else begin
reported_exn := Some exn;
let handler = !exn_handler in
exn_handler := ignore;
handler exn
end
in
let httpaf_request_body_writer =
send_request
client
?flush_headers_immediately:(Some true)
httpaf_request
~error_handler
~response_handler
in
let bytes_since_flush = ref 0 in
let rec send () =
Stream.read
(Message.server_stream request) ~data ~flush ~ping ~pong ~close ~exn
and data buffer offset length _binary _fin =
Httpaf.Body.Writer.write_bigstring
httpaf_request_body_writer
~off:offset
~len:length
buffer;
bytes_since_flush := !bytes_since_flush + length;
if !bytes_since_flush >= 4096 then begin
bytes_since_flush := 0;
Httpaf.Body.Writer.flush httpaf_request_body_writer send
end
else
send ()
and flush () =
bytes_since_flush := 0;
Httpaf.Body.Writer.flush httpaf_request_body_writer send
and ping _buffer _offset _length =
send ()
and pong _buffer _offset _length =
send ()
and close _code =
Httpaf.Body.Writer.close httpaf_request_body_writer
and exn exn =
Httpaf.Client_connection.report_exn connection exn in
send ();
response_promise
let http client =
general Httpaf_lwt_unix.Client.request client client.connection
let https client =
general Httpaf_lwt_unix.Client.SSL.request client client.connection