Source file irmin_client_jsoo.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
open Irmin_server
module Error = Error
let date = Jv.get Jv.global "Date"
let get_time () =
let d = Jv.new' date [||] in
let t = (Jv.call d "getTime" [||] |> Jv.to_float) /. 1000. in
t
module Info (I : Irmin.Info.S) = struct
include I
let init = v
let v ?author fmt =
Fmt.kstr
(fun message () ->
let date = Int64.of_float @@ get_time () in
init ?author ~message date)
fmt
end
module Buff = struct
open Lwt.Infix
type t = {
mutable buf : Bigstringaf.t;
mutable off : int;
mutable len : int;
waiters : unit Lwt.u Lwt_dllist.t;
}
let of_bigstring ~off ~len buf =
assert (off >= 0);
assert (Bigstringaf.length buf >= len - off);
{ buf; off; len; waiters = Lwt_dllist.create () }
let create len = of_bigstring ~off:0 ~len:0 (Bigstringaf.create len)
let writable_space t = Bigstringaf.length t.buf - t.len
let trailing_space t = Bigstringaf.length t.buf - (t.off + t.len)
let compress t =
Bigstringaf.unsafe_blit t.buf ~src_off:t.off t.buf ~dst_off:0
~len:(t.len - t.off);
t.len <- t.len - t.off;
t.off <- 0
let grow t to_copy =
let old_len = Bigstringaf.length t.buf in
let new_len = ref old_len in
let space = writable_space t in
while space + !new_len - old_len < to_copy do
new_len := 3 * !new_len / 2
done;
let new_buf = Bigstringaf.create !new_len in
Bigstringaf.unsafe_blit t.buf ~src_off:t.off new_buf ~dst_off:0
~len:(t.len - t.off);
t.buf <- new_buf;
t.len <- t.len - t.off;
t.off <- 0
let ensure t to_copy =
if trailing_space t < to_copy then
if writable_space t >= to_copy then compress t else grow t to_copy
let write_pos t = t.len
let unread_data t = t.len - t.off
let wakeup t =
if Lwt_dllist.is_empty t.waiters then ()
else Lwt.wakeup_later (Lwt_dllist.take_l t.waiters) ()
let add_string t str =
let off = 0 and len = String.length str in
ensure t len;
Bigstringaf.unsafe_blit_from_string str ~src_off:off t.buf
~dst_off:(write_pos t) ~len;
t.len <- t.len + len;
wakeup t
let add_int64_be t i =
ensure t 8;
Bigstringaf.set_int64_be t.buf (write_pos t) i;
t.len <- t.len + 8;
wakeup t
let add_char t i =
ensure t 1;
Bigstringaf.set t.buf (write_pos t) i;
t.len <- t.len + 1;
wakeup t
let read_int64_be t =
let get_int64 t =
let i = Bigstringaf.get_int64_be t.buf t.off in
t.off <- t.off + 8;
Lwt.return i
in
if unread_data t < 8 || not (Lwt_dllist.is_empty t.waiters) then
let p, r = Lwt.wait () in
let _node = Lwt_dllist.add_r r t.waiters in
p >>= fun () -> get_int64 t
else get_int64 t
let read_char t =
let get_char t =
let i = Bigstringaf.get t.buf t.off in
t.off <- t.off + 1;
Lwt.return i
in
if unread_data t < 1 || not (Lwt_dllist.is_empty t.waiters) then
let p, r = Lwt.wait () in
let _node = Lwt_dllist.add_r r t.waiters in
p >>= fun () -> get_char t
else get_char t
let read_into_exactly ~off ~len ~buf:t bs =
let read t =
Bigstringaf.blit_to_bytes t.buf ~src_off:t.off bs ~dst_off:off ~len;
t.off <- t.off + len;
Lwt.return ()
in
if unread_data t < len || not (Lwt_dllist.is_empty t.waiters) then
let p, r = Lwt.wait () in
let _node = Lwt_dllist.add_r r t.waiters in
p >>= fun () -> read t
else read t
end
module IO = struct
type flow = unit
type channel = { closed : Lwt_switch.t; buff : Buff.t }
type ctx = unit
let default_ctx = Lazy.from_val ()
type ic = channel
type oc = channel
exception Timeout
let is_closed { closed; _ } = not (Lwt_switch.is_on closed)
let write_int64_be { buff; _ } i = Lwt.return @@ Buff.add_int64_be buff i
let read_int64_be { buff; _ } = Buff.read_int64_be buff
let write { buff; _ } i = Lwt.return @@ Buff.add_string buff i
let read_into_exactly { buff; _ } bs off len =
Buff.read_into_exactly ~off ~len ~buf:buff bs
let write_char { buff; _ } c = Lwt.return @@ Buff.add_char buff c
let read_char { buff; _ } = Buff.read_char buff
let with_timeout d f =
let open Lwt.Infix in
Lwt.pick
[ (Js_of_ocaml_lwt.Lwt_js.sleep d >>= fun () -> Lwt.fail Timeout); f () ]
let time = get_time
let flush _ = Lwt.return ()
module Websocket_protocol = struct
open Lwt.Infix
let read_exactly ~length ic =
let buff = Bytes.create length in
read_into_exactly ic buff 0 length >|= fun () -> Bytes.to_string buff
let read_handshake ic =
read_int64_be ic >>= fun b_length ->
let length = Int64.to_int b_length in
read_exactly ~length ic >|= fun data ->
let buf = Buffer.create (8 + length) in
Buffer.add_int64_be buf b_length;
Buffer.add_string buf data;
Buffer.contents buf
let read_request ic =
read_char ic >>= fun cmd_length ->
let cl = int_of_char cmd_length in
read_exactly ~length:cl ic >>= fun cmd ->
read_int64_be ic >>= fun b_length ->
let length = Int64.to_int b_length in
read_exactly ~length ic >|= fun data ->
let buf = Buffer.create (1 + cl + 8 + length) in
Buffer.add_char buf cmd_length;
Buffer.add_string buf cmd;
Buffer.add_int64_be buf b_length;
Buffer.add_string buf data;
Buffer.contents buf
end
let websocket_to_flow ws =
let open Lwt.Infix in
let open Brr in
let open Brr_io in
let fill_ic channel msg =
let msg =
Ev.as_type msg
|> Message.Ev.data
|> Tarray.Buffer.of_jv
|> Tarray.of_buffer Tarray.Uint8
in
let msg = Tarray.to_string msg in
Logs.debug (fun f -> f "<<< Client received frame");
Lwt.async (fun () -> write channel msg)
in
let rec send_oc handshake channel ws =
(if handshake then Websocket_protocol.read_handshake channel
else Websocket_protocol.read_request channel)
>>= fun content ->
Logs.debug (fun f -> f ">>> Client sent frame");
let len = String.length content in
let content = Bigstringaf.of_string ~off:0 ~len content in
let content =
Js_of_ocaml.Typed_array.Bigstring.to_arrayBuffer content
|> Jv.repr
|> Tarray.Buffer.of_jv
in
Websocket.send_array_buffer ws content;
send_oc false channel ws
in
let c1_switch = Lwt_switch.create () in
Lwt_switch.add_hook (Some c1_switch) (fun () ->
Lwt.return @@ Websocket.close ws);
let c2_switch = Lwt_switch.create () in
Lwt_switch.add_hook (Some c2_switch) (fun () ->
Lwt.return @@ Websocket.close ws);
let c1 = { closed = c1_switch; buff = Buff.create 4096 } in
let c2 = { closed = c1_switch; buff = Buff.create 4096 } in
let _ev =
Brr.Ev.listen Message.Ev.message (fill_ic c1) (Websocket.as_target ws)
in
Lwt.async (fun () -> send_oc true c2 ws);
(c1, c2)
let connect ~ctx:_ (client : Irmin_client.addr) =
let open Lwt.Infix in
match client with
| `Ws (None, s) ->
let open Brr_io in
let ws = Websocket.create @@ Jstr.v s in
let () =
Websocket.set_binary_type ws Websocket.Binary_type.arraybuffer
in
let p, r = Lwt.wait () in
let _ev =
Brr.Ev.listen Brr.Ev.open'
(fun _ -> Lwt.wakeup_later r ())
(Websocket.as_target ws)
in
let _ev =
Brr.Ev.listen Brr.Ev.error
(fun _err ->
Lwt.wakeup_later_exn r (Invalid_argument "Websocket Failure"))
(Websocket.as_target ws)
in
if Websocket.ready_state ws = Websocket.Ready_state.open' then
Lwt.wakeup_later r ();
p >|= fun () -> websocket_to_flow ws
| `Ws _ | `TLS _ | `TCP _ | `Unix_domain_socket _ ->
failwith "Unsupported Websocket Protocol"
let close (ic, oc) =
let open Lwt.Infix in
Lwt_switch.turn_off ic.closed >>= fun () -> Lwt_switch.turn_off oc.closed
end
let normalize_uri ?hostname uri =
let addr = Uri.host_with_default ~default:"127.0.0.1" uri in
(uri, Option.value ~default:addr hostname)
let config ?tls ?hostname uri =
let uri, addr = normalize_uri ?hostname uri in
Irmin_client.config ?tls ~hostname:(Option.value ~default:addr hostname) uri
module Make_codec (Codec : Conn.Codec.S) (Store : Irmin.Generic_key.S) = struct
include Irmin_client.Make_codec (IO) (Codec) (Store)
let connect ?tls ?hostname uri =
let uri, hostname = normalize_uri ?hostname uri in
connect ?tls ~hostname uri
end
module Make (Store : Irmin.Generic_key.S) = struct
include Make_codec (Conn.Codec.Bin) (Store)
end
module Make_json (Store : Irmin.Generic_key.S) = struct
include Make_codec (Conn.Codec.Json) (Store)
end