1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
module H2 = Dream_h2.H2
module H2_lwt_unix = Dream_h2_lwt_unix.H2_lwt_unix
module Message = Dream_pure.Message
module Method = Dream_pure.Method
module Stream = Dream_pure.Stream
let https (connection : H2_lwt_unix.Client.SSL.t) (request : Message.request) =
let h2_request : H2.Request.t =
Message.drop_content_length_headers request;
Message.lowercase_headers request;
let =
Message.all_headers request
|> H2.Headers.of_list
and scheme = "https"
and method_ =
H2.Method.of_string (Method.method_to_string (Message.method_ request))
and target = Uri.path_and_query (Uri.of_string (Message.target request)) in
H2.Request.create ~headers ~scheme method_ target
in
let response_promise, receive_response = Lwt.wait () in
let received_response = ref false in
let reported_exn = ref None in
let exn_handler = ref ignore in
let report_error exn =
if not !received_response then
Lwt.wakeup_later_exn receive_response exn
else begin
reported_exn := Some exn;
let handler = !exn_handler in
exn_handler := ignore;
handler exn
end
in
let error_handler = function
| `Malformed_response explanation ->
report_error (Failure ("malformed response: " ^ explanation))
| `Invalid_response_body_length _response ->
report_error (Failure "invalid response body length")
| `Protocol_error (code, _explanation) ->
report_error
(Failure ("protocol error: " ^ (H2.Error_code.to_string code)))
| `Exn exn ->
report_error exn
in
let response_handler (h2_response : H2.Response.t) h2_response_body =
received_response := true;
let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn =
match !reported_exn with
| Some the_exn ->
exn the_exn
| None ->
exn_handler := exn;
H2.Body.Reader.schedule_read
h2_response_body
~on_eof:(fun () ->
exn_handler := ignore;
close 1000)
~on_read:(fun buffer ~off ~len ->
exn_handler := ignore;
data buffer off len true false)
and close _code =
H2.Body.Reader.close h2_response_body
and abort exn =
H2.Client_connection.report_exn connection.connection exn;
report_error exn in
let client_stream =
Stream.stream (Stream.reader ~read ~close ~abort) Stream.no_writer in
Message.response
~code:(H2.Status.to_code h2_response.status)
~headers:(H2.Headers.to_list h2_response.headers)
client_stream
Stream.null
|> Lwt.wakeup_later receive_response
in
let h2_request_body_writer =
H2_lwt_unix.Client.SSL.request
connection
h2_request
~error_handler
~response_handler
in
let bytes_since_flush = ref 0 in
let rec send () =
Stream.read
(Message.server_stream request) ~data ~flush ~ping ~pong ~close ~exn
and data buffer offset length _binary _fin =
H2.Body.Writer.write_bigstring
h2_request_body_writer
~off:offset
~len:length
buffer;
bytes_since_flush := !bytes_since_flush + length;
if !bytes_since_flush >= 4096 then begin
bytes_since_flush := 0;
H2.Body.Writer.flush h2_request_body_writer send
end
else
send ()
and flush () =
bytes_since_flush := 0;
H2.Body.Writer.flush h2_request_body_writer send
and ping _buffer _offset _length =
send ()
and pong _buffer _offset _length =
send ()
and close _code =
H2.Body.Writer.close h2_request_body_writer
and exn exn =
H2.Client_connection.report_exn connection.connection exn;
report_error exn in
send ();
response_promise