1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
open Import
open Lwt.Syntax
open Lwt.Infix
open Irmin_server
include Server_intf
let html = [%blob "index.html"]
module Make (Codec : Conn.Codec.S) (Store : Irmin.Generic_key.S) = struct
module Command = Command.Make (IO) (Codec) (Store)
module Store = Store
module Conn = Command.Conn
type t = {
ctx : Conduit_lwt_unix.ctx;
uri : Uri.t;
dashboard : Conduit_lwt_unix.server option;
server : Conduit_lwt_unix.server;
config : Irmin.config;
repo : Store.Repo.t;
info : Command.Server_info.t;
}
let readonly conf =
Irmin.Backend.Conf.add conf Irmin_pack.Conf.Key.readonly true
let v ?tls_config ?dashboard ~uri config =
let scheme = Uri.scheme uri |> Option.value ~default:"tcp" in
let* ctx, server =
match String.lowercase_ascii scheme with
| "unix" ->
let file = Uri.path uri in
let+ () =
Lwt.catch
(fun () -> Lwt_unix.unlink file)
(fun _ -> Lwt.return_unit)
in
( Lazy.force Conduit_lwt_unix.default_ctx,
`Unix_domain_socket (`File file) )
| "tcp" | "ws" | "wss" -> (
let addr = Uri.host_with_default ~default:"127.0.0.1" uri in
let ip = Unix.gethostbyname addr in
let addr = ip.h_addr_list.(0) |> Unix.string_of_inet_addr in
let+ ctx = Conduit_lwt_unix.init ~src:addr () in
let port = Uri.port uri |> Option.value ~default:9181 in
match tls_config with
| None -> (ctx, `TCP (`Port port))
| Some (`Cert_file crt, `Key_file key) ->
( ctx,
`TLS
( `Crt_file_path crt,
`Key_file_path key,
`No_password,
`Port port ) ))
| x -> invalid_arg ("Unknown server scheme: " ^ x)
in
let+ repo = Store.Repo.v config in
let start_time = Unix.time () in
let info = Command.Server_info.{ start_time } in
{ ctx; uri; server; dashboard; config; repo; info }
let commands = Hashtbl.create (List.length Command.commands)
let () = Hashtbl.replace_seq commands (List.to_seq Command.commands)
let invalid_arguments a = Error.unwrap "Invalid arguments" a [@@inline]
let command_lock = Lwt_mutex.create ()
let[@tailrec] rec loop repo conn client info : unit Lwt.t =
if Conn.is_closed conn then
let* () =
match client.Command.watch with
| Some w -> Store.unwatch w
| None -> Lwt.return_unit
in
let* () =
match client.Command.branch_watch with
| Some w ->
let b = Store.Backend.Repo.branch_t client.repo in
Store.Backend.Branch.unwatch b w
| None -> Lwt.return_unit
in
Lwt.return_unit
else
Lwt.catch
(fun () ->
[%log.debug "Receiving next command"];
let* Conn.Request.{ command } = Conn.Request.read_header conn in
match Hashtbl.find_opt commands command with
| None ->
if String.length command = 0 then Lwt.return_unit
else Conn.err conn ("unknown command: " ^ command)
| Some (module Cmd : Command.CMD) ->
let* req = Conn.read conn Cmd.req_t >|= invalid_arguments in
[%log.debug "Command: %s" Cmd.name];
let* res =
Lwt_mutex.with_lock command_lock @@ fun () ->
Cmd.run conn client info req
in
Conn.Return.finish res)
(function
| Error.Error s ->
[%log.err "Error response: %s" s];
let* () = Conn.err conn s in
Lwt_unix.sleep 0.01
| End_of_file ->
let* () = Lwt_io.close conn.ic in
Lwt.return_unit
| exn ->
if Conn.is_closed conn then Lwt.return_unit
else
let s = Printexc.to_string exn in
[%log.err "Exception: %s\n%s" s (Printexc.get_backtrace ())];
let* () = Conn.err conn s in
Lwt_unix.sleep 0.01)
>>= fun () -> loop repo conn client info
let callback { repo; info; config; _ } ic oc =
let conn = Conn.v ic oc in
let* check =
Lwt.catch
(fun () -> Conn.Handshake.V1.check (module Store) conn)
(fun _ -> Lwt.return_false)
in
if not check then (
[%log.info "Client closed because of invalid handshake"];
Lwt_io.close ic)
else
let client =
Command.{ conn; repo; watch = None; branch_watch = None; config }
in
loop repo conn client info
module Websocket_protocol = struct
open Lwt.Infix
let read_exactly ~length ic =
let buff = Bytes.create length in
Lwt_io.read_into_exactly ic buff 0 length >|= fun () ->
Bytes.to_string buff
let read_handshake ic =
Lwt_io.BE.read_int64 ic >>= fun b_length ->
let length = Int64.to_int b_length in
read_exactly ~length ic >|= fun data ->
let buf = Buffer.create (8 + length) in
Buffer.add_int64_be buf b_length;
Buffer.add_string buf data;
Buffer.contents buf
let read_response ic =
Lwt_io.read_char ic >>= fun status ->
Lwt_io.BE.read_int64 ic >>= fun b_length ->
let length = Int64.to_int b_length in
read_exactly ~length ic >|= fun data ->
let buf = Buffer.create (1 + 8 + length) in
Buffer.add_char buf status;
Buffer.add_int64_be buf b_length;
Buffer.add_string buf data;
Buffer.contents buf
end
let websocket_handler server client =
let rec fill_ic channel other_channel client =
if Lwt_io.is_closed other_channel then Lwt_io.close channel
else
Lwt.catch
(fun () ->
let* frame = Websocket_lwt_unix.Connected_client.recv client in
if frame.opcode <> Binary then fill_ic channel other_channel client
else (
[%log.debug "<<< Server received frame"];
Lwt_io.write channel frame.content >>= fun () ->
fill_ic channel other_channel client))
(function
| End_of_file ->
Lwt_io.close channel >>= fun () -> Lwt_io.close other_channel
| exn -> Lwt.fail exn)
in
let rec send_oc handshake channel other_channel client =
if Lwt_io.is_closed other_channel then Lwt_io.close channel
else
(if handshake then Websocket_protocol.read_handshake channel
else Websocket_protocol.read_response channel)
>>= fun content ->
[%log.debug ">>> Server sent frame"];
Lwt.catch
(fun () ->
Websocket_lwt_unix.Connected_client.send client
(Websocket.Frame.create ~opcode:Binary ~content ())
>>= fun () -> send_oc false channel other_channel client)
(function
| End_of_file ->
Lwt_io.close channel >>= fun () -> Lwt_io.close other_channel
| exn -> Lwt.fail exn)
in
let input_ic, input_oc = Lwt_io.pipe () in
let output_ic, output_oc = Lwt_io.pipe () in
Lwt.async (fun () -> fill_ic input_oc input_ic client);
Lwt.async (fun () -> send_oc true output_ic output_oc client);
callback server input_ic output_oc
let on_exn x = [%log.err "EXCEPTION: %s" (Printexc.to_string x)]
let dashboard t mode =
let list store prefix =
let* keys = Store.list store prefix in
let+ keys =
Lwt_list.map_s
(fun (path, tree) ->
let path = Store.Path.rcons prefix path in
let* kind = Store.Tree.kind tree Store.Path.empty in
match kind with
| Some `Contents ->
Lwt.return_some (path, "contents", Store.Tree.hash tree)
| Some `Node -> Lwt.return_some (path, "node", Store.Tree.hash tree)
| None -> Lwt.return_none)
keys
in
List.filter_map Fun.id keys
in
let data_callback prefix branch =
let* store =
match branch with
| `Hash commit -> (
let* commit = Store.Commit.of_hash t.repo commit in
match commit with
| Some commit -> Store.of_commit commit
| None -> failwith "Invalid commit")
| `Branch branch -> Store.of_branch t.repo branch
in
let* is_contents =
Store.kind store prefix >|= function
| Some `Contents -> true
| _ -> false
in
let res = Cohttp_lwt_unix.Response.make ~status:`OK () in
if is_contents then
let* contents = Store.get store prefix in
let contents' = Irmin.Type.to_json_string Store.contents_t contents in
let body =
Printf.sprintf {|{"contents": %s, "hash": %s }|} contents'
(Irmin.Type.to_json_string Store.hash_t
(Store.Contents.hash contents))
in
let body = Cohttp_lwt.Body.of_string body in
Lwt.return (res, body)
else
let* keys = list store prefix in
let* keys =
Lwt_list.map_s
(fun (path, kind, hash) ->
Format.sprintf {|{"path": "%s", "kind": "%s", "hash": "%s"}|}
(Irmin.Type.to_string Store.path_t path)
kind
(Irmin.Type.to_string Store.hash_t hash)
|> Lwt.return)
keys
in
let keys = String.concat "," keys in
let body = Cohttp_lwt.Body.of_string (Printf.sprintf "[%s]" keys) in
Lwt.return (res, body)
in
let callback _conn req body =
let* () = Cohttp_lwt.Body.drain_body body in
let uri = Cohttp_lwt_unix.Request.uri req in
let path = Uri.path uri in
let branch_name =
Uri.get_query_param uri "branch" |> Option.value ~default:"main"
in
let branch =
match Irmin.Type.of_string Store.hash_t branch_name with
| Ok x -> `Hash x
| Error _ ->
`Branch
(Result.get_ok @@ Irmin.Type.of_string Store.branch_t branch_name)
in
let prefix = Irmin.Type.of_string Store.path_t path |> Result.get_ok in
let meth = Cohttp_lwt_unix.Request.meth req in
match meth with
| `POST -> data_callback prefix branch
| `GET ->
let res = Cohttp_lwt_unix.Response.make () in
let body = Cohttp_lwt.Body.of_string html in
Lwt.return (res, body)
| _ ->
let res = Cohttp_lwt_unix.Response.make ~status:`Not_found () in
let body = Cohttp_lwt.Body.of_string "Not found" in
Lwt.return (res, body)
in
let server = Cohttp_lwt_unix.Server.make ~callback () in
Cohttp_lwt_unix.Server.create ~mode server
let serve ?stop t =
let unlink () =
match Uri.scheme t.uri with
| Some "unix" -> Unix.unlink (Uri.path t.uri)
| _ -> ()
in
let _ =
Lwt_unix.on_signal Sys.sigint (fun _ ->
unlink ();
exit 0)
in
let _ =
Lwt_unix.on_signal Sys.sigterm (fun _ ->
unlink ();
exit 0)
in
let dashboard =
match t.dashboard with
| Some server -> dashboard t server
| None -> Lwt.return_unit
in
let server =
match Uri.scheme t.uri with
| Some "ws" | Some "wss" ->
Websocket_lwt_unix.establish_standard_server ~ctx:t.ctx ~mode:t.server
~on_exn
~check_request:(fun _ -> true)
(websocket_handler t)
| _ ->
Conduit_lwt_unix.serve ?stop ~ctx:t.ctx ~on_exn ~mode:t.server
(fun _ ic oc -> callback t ic oc)
in
let* () = Lwt.join [ server; dashboard ] in
Lwt.wrap (fun () -> unlink ())
end