1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
module Websocketaf = Dream_websocketaf.Websocketaf
module Stream = Dream_pure.Stream
let websocket_handler stream socket =
let frames, push_frame = Lwt_stream.create () in
let message_is_binary = ref `Binary in
let frame ~opcode ~is_fin ~len:_ payload =
match opcode with
| `Connection_close ->
push_frame (Some (`Close, payload))
| `Ping ->
push_frame (Some (`Ping, payload))
| `Pong ->
push_frame (Some (`Pong, payload))
| `Other _ ->
push_frame (Some (`Other, payload))
| `Text ->
message_is_binary := `Text;
push_frame (Some (`Data (`Text, is_fin), payload))
| `Binary ->
message_is_binary := `Binary;
push_frame (Some (`Data (`Binary, is_fin), payload))
| `Continuation ->
push_frame (Some (`Data (!message_is_binary, is_fin), payload))
in
let eof () =
push_frame None in
let closed = ref false in
let close_code = ref 1005 in
let current_payload = ref None in
let last_chunk = ref None in
let first_chunk_received = ref false in
let first_chunk = ref Bigstringaf.empty in
let first_chunk_offset = ref 0 in
let first_chunk_length = ref 0 in
let rec drain_payload payload continuation =
Websocketaf.Payload.schedule_read
payload
~on_read:(fun buffer ~off ~len ->
if not !first_chunk_received then begin
first_chunk := buffer;
first_chunk_offset := off;
first_chunk_length := len;
first_chunk_received := true
end
else
();
drain_payload payload continuation)
~on_eof:(fun () ->
let payload = !first_chunk in
let offset = !first_chunk_offset in
let length = !first_chunk_length in
first_chunk_received := false;
first_chunk := Bigstringaf.empty;
first_chunk_offset := 0;
first_chunk_length := 0;
continuation payload offset length)
in
let rec read ~data ~flush ~ping ~pong ~close ~exn =
if !closed then
close !close_code
else
match !current_payload with
| None ->
Lwt.on_success (Lwt_stream.get frames) begin function
| None ->
if not !closed then begin
closed := true;
close_code := 1005
end;
Websocketaf.Wsd.close socket;
close !close_code
| Some (`Close, payload) ->
drain_payload payload @@ fun buffer offset length ->
let code =
if length < 2 then
1005
else
let high_byte = Char.code buffer.{offset}
and low_byte = Char.code buffer.{offset + 1} in
high_byte lsl 8 lor low_byte
in
if not !closed then
close_code := code;
close !close_code
| Some (`Ping, payload) ->
drain_payload payload @@
ping
| Some (`Pong, payload) ->
drain_payload payload @@
pong
| Some (`Other, payload) ->
drain_payload payload @@ fun _buffer _offset length ->
ignore length;
read ~data ~flush ~ping ~pong ~close ~exn
| Some (`Data properties, payload) ->
current_payload := Some (properties, payload);
read ~data ~flush ~ping ~pong ~close ~exn
end
| Some ((binary, fin), payload) ->
Websocketaf.Payload.schedule_read
payload
~on_read:(fun buffer ~off ~len ->
match !last_chunk with
| None ->
last_chunk := Some (buffer, off, len);
read ~data ~flush ~ping ~pong ~close ~exn
| Some (last_buffer, last_offset, last_length) ->
last_chunk := Some (buffer, off, len);
let binary = binary = `Binary in
data last_buffer last_offset last_length binary false)
~on_eof:(fun () ->
current_payload := None;
match !last_chunk with
| None ->
read ~data ~flush ~ping ~pong ~close ~exn
| Some (last_buffer, last_offset, last_length) ->
last_chunk := None;
let binary = binary = `Binary in
data last_buffer last_offset last_length binary fin)
in
let bytes_since_flush = ref 0 in
let flush ~close ok =
bytes_since_flush := 0;
if !closed then
close !close_code
else
Websocketaf.Wsd.flushed socket ok
in
let close code =
if not !closed then begin
Websocketaf.Wsd.close ~code:(`Other code) socket
end
in
let abort _exn = close 1005 in
let reader = Stream.reader ~read ~close ~abort in
Stream.forward reader stream;
let rec outgoing_loop () =
Stream.read
stream
~data:(fun buffer offset length binary fin ->
let kind = if binary then `Binary else `Text in
if !closed then
close !close_code
else begin
Websocketaf.Wsd.schedule
socket ~is_fin:fin ~kind buffer ~off:offset ~len:length;
bytes_since_flush := !bytes_since_flush + length;
if !bytes_since_flush >= 4096 then
flush ~close outgoing_loop
else
outgoing_loop ()
end)
~flush:(fun () -> flush ~close outgoing_loop)
~ping:(fun buffer offset length ->
if length > 125 then
raise (Failure "Ping payload cannot exceed 125 bytes");
if !closed then
close !close_code
else begin
if length = 0 then
Websocketaf.Wsd.send_ping socket
else
Websocketaf.Wsd.send_ping
~application_data:{Faraday.buffer; off = offset; len = length}
socket;
outgoing_loop ()
end)
~pong:(fun buffer offset length ->
if length > 125 then
raise (Failure "Pong payload cannot exceed 125 bytes");
if !closed then
close !close_code
else begin
if length = 0 then
Websocketaf.Wsd.send_pong socket
else
Websocketaf.Wsd.send_pong
~application_data:{Faraday.buffer; off = offset; len = length}
socket;
outgoing_loop ()
end)
~close
~exn:abort
in
outgoing_loop ();
Websocketaf.Websocket_connection.{frame; eof}