1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
module Httpaf = Dream_httpaf_.Httpaf
module Websocketaf_lwt_unix = Dream_websocketaf_lwt_unix.Websocketaf_lwt_unix
module Message = Dream_pure.Message
module Stream = Dream_pure.Stream
let () =
Mirage_crypto_rng_lwt.initialize (module Mirage_crypto_rng.Fortuna)
let ws socket request =
let target = Uri.of_string (Message.target request) in
let host = Option.value (Uri.host target) ~default:"" in
let port =
match Uri.port target with
| Some port -> port
| None ->
match Uri.scheme target with
| Some "wss" -> 443
| _ -> 80
in
let response_promise, receive_response = Lwt.wait () in
let received_response = ref false in
let reported_exn = ref None in
let exn_handler = ref ignore in
let error_handler = function
| `Handshake_failure (httpaf_response, httpaf_response_body_reader) ->
received_response := true;
let got_eof = ref false in
let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn =
match !reported_exn with
| Some the_exn ->
exn the_exn
| None ->
if !got_eof then
close 1000
else begin
exn_handler := exn;
Httpaf.Body.Reader.schedule_read
httpaf_response_body_reader
~on_eof:(fun () ->
got_eof := true;
exn_handler := ignore;
close 1000)
~on_read:(fun buffer ~off ~len ->
exn_handler := ignore;
data buffer off len true false)
end
and close _code =
Httpaf.Body.Reader.close httpaf_response_body_reader
and abort _exn =
Httpaf.Body.Reader.close httpaf_response_body_reader in
let client_stream =
Stream.stream (Stream.reader ~read ~close ~abort) Stream.no_writer in
Message.response
~code:(Httpaf.Status.to_code httpaf_response.Httpaf.Response.status)
~headers:(Httpaf.Headers.to_list httpaf_response.headers)
client_stream
Stream.null
|> Lwt.wakeup_later receive_response
| `Malformed_response explanation ->
Lwt.wakeup_later_exn receive_response
(Failure ("malformed response: " ^ explanation))
| `Invalid_response_body_length _ ->
Lwt.wakeup_later_exn receive_response
(Failure "invalid response body length")
| `Exn exn ->
if not !received_response then
Lwt.wakeup_later_exn receive_response exn
else begin
reported_exn := Some exn;
let handler = !exn_handler in
exn_handler := ignore;
handler exn
end
in
let websocket_handler socket =
let response =
Message.response ~status:`Switching_Protocols Stream.empty Stream.null in
let (_, server_stream) = Message.create_websocket response in
Lwt.wakeup_later receive_response response;
Dream_httpaf.Websocket.client_websocket_handler server_stream socket
in
let%lwt client =
Websocketaf_lwt_unix.Client.connect
~nonce:(Cstruct.to_string (Mirage_crypto_rng.generate 16))
~host
~port
~resource:(Uri.path_and_query target)
~error_handler
~websocket_handler
socket
in
ignore client;
response_promise