1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
open Import
type flow = Conduit_lwt_unix.flow
type ic = Conduit_lwt_unix.ic
type oc = Conduit_lwt_unix.oc
type ctx = Conduit_lwt_unix.ctx
exception Timeout = Lwt_unix.Timeout
let default_ctx = Conduit_lwt_unix.default_ctx
let is_closed (x : ic) = Lwt_io.is_closed x
let write_int64_be = Lwt_io.BE.write_int64
let read_int64_be = Lwt_io.BE.read_int64
let flush = Lwt_io.flush
let write = Lwt_io.write
let read_into_exactly = Lwt_io.read_into_exactly
let write_char = Lwt_io.write_char
let read_char = Lwt_io.read_char
module Websocket_protocol = struct
open Lwt.Infix
let read_exactly ~length ic =
let buff = Bytes.create length in
read_into_exactly ic buff 0 length >|= fun () -> Bytes.to_string buff
let read_handshake ic =
Lwt_io.BE.read_int64 ic >>= fun b_length ->
let length = Int64.to_int b_length in
read_exactly ~length ic >|= fun data ->
let buf = Buffer.create (8 + length) in
Buffer.add_int64_be buf b_length;
Buffer.add_string buf data;
Buffer.contents buf
let read_request ic =
Lwt_io.read_char ic >>= fun cmd_length ->
let cl = int_of_char cmd_length in
read_exactly ~length:cl ic >>= fun cmd ->
read_int64_be ic >>= fun b_length ->
let length = Int64.to_int b_length in
read_exactly ~length ic >|= fun data ->
let buf = Buffer.create (1 + cl + 8 + length) in
Buffer.add_char buf cmd_length;
Buffer.add_string buf cmd;
Buffer.add_int64_be buf b_length;
Buffer.add_string buf data;
Buffer.contents buf
end
let websocket_to_flow client =
let open Lwt.Infix in
let rec fill_ic channel client =
Lwt.catch
(fun () ->
Websocket_lwt_unix.read client >>= fun frame ->
[%log.debug "<<< Client received frame"];
Lwt_io.write channel frame.content >>= fun () -> fill_ic channel client)
(function End_of_file -> Lwt_io.close channel | exn -> Lwt.fail exn)
in
let rec send_oc handshake channel client =
(if handshake then Websocket_protocol.read_handshake channel
else Websocket_protocol.read_request channel)
>>= fun content ->
[%log.debug ">>> Client sent frame"];
Lwt.catch
(fun () ->
Websocket_lwt_unix.write client
(Websocket.Frame.create ~opcode:Binary ~content ())
>>= fun () -> send_oc false channel client)
(function End_of_file -> Lwt_io.close channel | exn -> Lwt.fail exn)
in
let input_ic, input_oc = Lwt_io.pipe () in
let output_ic, output_oc = Lwt_io.pipe () in
Lwt.async (fun () -> fill_ic input_oc client);
Lwt.async (fun () -> send_oc true output_ic client);
(input_ic, output_oc)
let connect ~ctx (client : Irmin_client.addr) =
let open Lwt.Infix in
match client with
| (`TLS _ | `TCP _ | `Unix_domain_socket _) as client ->
Conduit_lwt_unix.connect ~ctx (client :> Conduit_lwt_unix.client)
>|= fun (_, ic, oc) -> (ic, oc)
| `Ws (Some (host, port), uri) ->
Websocket_lwt_unix.connect ~ctx (`TCP (host, port)) (Uri.of_string uri)
>|= fun ws -> websocket_to_flow ws
| `Ws _ -> failwith "The Unix client requires a IP address and port number"
let close (c : ic * oc) = Conduit_lwt_server.close c
let with_timeout = Lwt_unix.with_timeout
let time = Unix.time