Source file irmin_http_server.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
(*
 * Copyright (c) 2013-2017 Thomas Gazagnaire <thomas@gazagnaire.org>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 *)

open! Import
open Irmin_http_common
module T = Irmin.Type

let to_json = Irmin.Type.to_json_string
let of_json = Irmin.Type.of_json_string
let src = Logs.Src.create "irmin.http-srv" ~doc:"Irmin REST API server"

module Log = (val Logs.src_log src : Logs.LOG)

module type S = sig
  type repo
  type t

  val v : ?strict:bool -> repo -> t
end

module Make (HTTP : Cohttp_lwt.S.Server) (S : Irmin.S) = struct
  module Wm = struct
    module Rd = Webmachine.Rd

    module Clock = struct
      let now () = int_of_float (Unix.gettimeofday ())
    end

    include Webmachine.Make (HTTP.IO) (Clock)
  end

  module P = S.Private

  class virtual resource =
    object
      inherit [Cohttp_lwt.Body.t] Wm.resource

      method! finish_request rd =
        Wm.Rd.with_resp_headers
          (fun h -> Cohttp.Header.add h irmin_version Irmin.version)
          rd
        |> Wm.continue ()
    end

  (** A [POST] endpoint that performs a given repository operation. *)
  class post_unit_endpoint ~op repo =
    object
      inherit resource
      method! allowed_methods rd = Wm.continue [ `POST ] rd

      method content_types_provided rd =
        Wm.continue [ ("application/json", fun _ -> assert false) ] rd

      method content_types_accepted rd = Wm.continue [] rd

      method! process_post rd =
        let* _ = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
        op repo >>= fun () ->
        let resp_body = `String "" in
        Wm.continue true { rd with Wm.Rd.resp_body }
    end

  let parse_error rd str (`Msg e) =
    let err = Fmt.str "Parse error %S: %s" str e in
    Wm.respond ~body:(`String err) 400 rd

  module Content_addressable (S : sig
    include Irmin.CONTENT_ADDRESSABLE_STORE

    val batch : P.Repo.t -> (read_write t -> 'a Lwt.t) -> 'a Lwt.t
  end)
  (K : Irmin.Type.S with type t = S.key)
  (V : Irmin.Type.S with type t = S.value) =
  struct
    let with_key rd f =
      match Irmin.Type.of_string K.t (Wm.Rd.lookup_path_info_exn "id" rd) with
      | Ok key -> f key
      | Error _ -> Wm.respond 404 rd

    let add rd repo =
      let* body = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
      match Irmin.Type.of_string V.t body with
      | Error e -> parse_error rd body e
      | Ok body ->
          S.batch repo @@ fun db ->
          let* new_id = S.add db body in
          let resp_body = `String (Irmin.Type.to_string K.t new_id) in
          Wm.continue true { rd with Wm.Rd.resp_body }

    let unsafe_add rd repo key =
      let* body = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
      match Irmin.Type.of_string V.t body with
      | Error e -> parse_error rd body e
      | Ok body ->
          S.batch repo @@ fun db ->
          S.unsafe_add db key body >>= fun () ->
          let resp_body = `String "" in
          Wm.continue true { rd with Wm.Rd.resp_body }

    class items repo =
      object
        inherit resource
        method! allowed_methods rd = Wm.continue [ `POST ] rd

        method content_types_provided rd =
          Wm.continue [ ("application/json", fun _ -> assert false) ] rd

        method content_types_accepted rd = Wm.continue [] rd
        method! process_post rd = add rd repo
      end

    class unsafe_items repo =
      object
        inherit resource
        method! allowed_methods rd = Wm.continue [ `POST ] rd

        method content_types_provided rd =
          Wm.continue [ ("application/json", fun _ -> assert false) ] rd

        method content_types_accepted rd = Wm.continue [] rd
        method! process_post rd = with_key rd (unsafe_add rd repo)
      end

    class merge merge repo =
      object
        inherit resource
        method! allowed_methods rd = Wm.continue [ `POST ] rd

        method content_types_provided rd =
          Wm.continue [ ("application/json", fun _ -> assert false) ] rd

        method content_types_accepted rd = Wm.continue [] rd

        method! process_post rd =
          let* body = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
          match Irmin.Type.(of_string (merge_t (option K.t))) body with
          | Error e -> parse_error rd body e
          | Ok { old; left; right } ->
              S.batch repo @@ fun db ->
              let old = Irmin.Merge.promise old in
              let* m = Irmin.Merge.f (merge db) ~old left right in
              let result = Irmin.Merge.result_t (Irmin.Type.option K.t) in
              let resp_body = `String Irmin.(Type.to_string result m) in
              Wm.continue true { rd with Wm.Rd.resp_body }
      end

    class item db =
      object (self)
        inherit resource

        method private to_json rd =
          with_key rd (fun key ->
              let str = Irmin.Type.to_string V.t in
              S.find db key >>= function
              | Some value -> Wm.continue (`String (str value)) rd
              | None -> Wm.respond 404 rd)

        method! allowed_methods rd = Wm.continue [ `GET; `HEAD ] rd
        method content_types_accepted rd = Wm.continue [] rd

        method! resource_exists rd =
          with_key rd (fun key ->
              let* mem = S.mem db key in
              Wm.continue mem rd)

        method content_types_provided rd =
          Wm.continue [ ("application/json", self#to_json) ] rd
      end

    class clear = post_unit_endpoint ~op:S.clear
  end

  module Atomic_write
      (S : Irmin.ATOMIC_WRITE_STORE)
      (K : Irmin.Type.S with type t = S.key)
      (V : Irmin.Type.S with type t = S.value) =
  struct
    class items db =
      object (self)
        inherit resource
        method! allowed_methods rd = Wm.continue [ `GET; `HEAD ] rd
        method content_types_accepted rd = Wm.continue [] rd

        method private to_json rd =
          let* keys = S.list db in
          let json = to_json T.(list K.t) keys in
          Wm.continue (`String json) rd

        method content_types_provided rd =
          Wm.continue [ ("application/json", self#to_json) ] rd
      end

    let with_key rd f =
      match Irmin.Type.of_string K.t rd.Wm.Rd.dispatch_path with
      | Ok x -> f x
      | Error _ -> Wm.respond 404 rd

    class item db =
      object (self)
        inherit resource

        method private of_json rd =
          let* body = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
          match of_json (set_t V.t) body with
          | Error e -> parse_error rd body e
          | Ok v ->
              with_key rd (fun key ->
                  match v.v with
                  | Some v ->
                      S.set db key v >>= fun () ->
                      let resp_body = `String (to_json status_t "ok") in
                      let rd = { rd with Wm.Rd.resp_body } in
                      Wm.continue true rd
                  | None ->
                      let* b = S.test_and_set db key ~test:v.test ~set:v.set in
                      let resp_body =
                        `String (to_json status_t (string_of_bool b))
                      in
                      let rd = { rd with Wm.Rd.resp_body } in
                      Wm.continue b rd)

        method private to_json rd =
          with_key rd (fun key ->
              let str = Irmin.Type.to_string V.t in
              S.find db key >>= function
              | Some value -> Wm.continue (`String (str value)) rd
              | None -> Wm.respond 404 rd)

        method! resource_exists rd =
          with_key rd (fun key ->
              let* mem = S.mem db key in
              Wm.continue mem rd)

        method! allowed_methods rd =
          Wm.continue [ `GET; `HEAD; `PUT; `DELETE ] rd

        method content_types_provided rd =
          Wm.continue [ ("application/json", self#to_json) ] rd

        method content_types_accepted rd =
          Wm.continue [ ("application/json", self#of_json) ] rd

        method! delete_resource rd =
          with_key rd (fun key ->
              S.remove db key >>= fun () ->
              let resp_body = `String (to_json status_t "ok") in
              Wm.continue true { rd with Wm.Rd.resp_body })
      end

    class watches db =
      object (self)
        inherit resource
        method! allowed_methods rd = Wm.continue [ `GET; `HEAD; `POST ] rd
        method content_types_accepted rd = Wm.continue [] rd

        method private stream ?init () =
          let stream, push = Lwt_stream.create () in
          let+ w =
            S.watch ?init db (fun key diff ->
                let v = to_json (event_t K.t V.t) (key, diff) in
                push (Some v);
                push (Some ",");
                Lwt.return_unit)
          in
          Lwt.async (fun () ->
              Lwt_stream.closed stream >>= fun () -> S.unwatch db w);
          push (Some "[");
          `Stream stream

        method! process_post rd =
          let* body = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
          match of_json T.(list (init_t K.t V.t)) body with
          | Error e -> parse_error rd body e
          | Ok init ->
              let* resp_body = self#stream ~init () in
              Wm.continue true { rd with Wm.Rd.resp_body }

        method private of_json rd =
          let* body = self#stream () in
          Wm.continue body rd

        method content_types_provided rd =
          Wm.continue [ ("application/json", self#of_json) ] rd
      end

    class watch db =
      object (self)
        inherit resource
        method! allowed_methods rd = Wm.continue [ `GET; `HEAD; `POST ] rd
        method content_types_accepted rd = Wm.continue [] rd

        method private stream ?init key =
          let stream, push = Lwt_stream.create () in
          let+ w =
            S.watch_key ?init db key (fun diff ->
                let v = to_json (event_t K.t V.t) (key, diff) in
                push (Some v);
                push (Some ",");
                Lwt.return_unit)
          in
          Lwt.async (fun () ->
              Lwt_stream.closed stream >>= fun () -> S.unwatch db w);
          push (Some "[");
          `Stream stream

        method! process_post rd =
          let* body = Cohttp_lwt.Body.to_string rd.Wm.Rd.req_body in
          match of_json V.t body with
          | Error e -> parse_error rd body e
          | Ok init ->
              with_key rd (fun key ->
                  let* resp_body = self#stream ~init key in
                  Wm.continue true { rd with Wm.Rd.resp_body })

        method private of_json rd =
          with_key rd (fun key ->
              let* body = self#stream key in
              Wm.continue body rd)

        method content_types_provided rd =
          Wm.continue [ ("application/json", self#of_json) ] rd
      end

    class clear = post_unit_endpoint ~op:S.clear
  end

  module Blob =
    Content_addressable
      (struct
        include P.Contents

        let batch t f = P.Repo.batch t @@ fun x _ _ -> f x
      end)
      (P.Contents.Key)
      (P.Contents.Val)

  module Tree =
    Content_addressable
      (struct
        include P.Node

        let batch t f = P.Repo.batch t @@ fun _ x _ -> f x
      end)
      (P.Node.Key)
      (P.Node.Val)

  module Commit =
    Content_addressable
      (struct
        include P.Commit

        let batch t f = P.Repo.batch t @@ fun _ _ x -> f x
      end)
      (P.Commit.Key)
      (P.Commit.Val)

  module Branch = Atomic_write (P.Branch) (P.Branch.Key) (P.Branch.Val)

  type repo = S.Repo.t
  type t = HTTP.t

  let v ?strict:_ db =
    let blob = P.Repo.contents_t db in
    let tree = P.Repo.node_t db in
    let commit = P.Repo.commit_t db in
    let branch = P.Repo.branch_t db in
    let routes =
      [
        ("/blobs", fun () -> new Blob.items db);
        ("/blob/:id", fun () -> new Blob.item blob);
        ("/trees", fun () -> new Tree.items db);
        ("/trees/merge", fun () -> new Tree.merge S.Private.Node.merge db);
        ("/tree/:id", fun () -> new Tree.item tree);
        ("/commits", fun () -> new Commit.items db);
        ("/commit/:id", fun () -> new Commit.item commit);
        ("/unsafe/blobs/:id", fun () -> new Blob.unsafe_items db);
        ("/unsafe/trees/:id", fun () -> new Tree.unsafe_items db);
        ("/unsafe/commits/:id", fun () -> new Commit.unsafe_items db);
        ("/branches", fun () -> new Branch.items branch);
        ("/branch/*", fun () -> new Branch.item branch);
        ("/watches", fun () -> new Branch.watches branch);
        ("/watch/*", fun () -> new Branch.watch branch);
        ("/clear/blobs", fun () -> new Blob.clear blob);
        ("/clear/trees", fun () -> new Tree.clear tree);
        ("/clear/commits", fun () -> new Commit.clear commit);
        ("/clear/branches", fun () -> new Branch.clear branch);
      ]
    in
    let pp_con = Fmt.of_to_string Cohttp.Connection.to_string in
    let callback (_ch, conn) request body =
      let open Cohttp in
      Log.debug (fun l -> l "new connection %a" pp_con conn);
      let* status, headers, body, _path =
        Wm.dispatch' routes ~body ~request >|= function
        | None -> (`Not_found, Header.init (), `String "Not found", [])
        | Some result -> result
      in
      Log.info (fun l ->
          l "[%a] %d - %s %s" pp_con conn
            (Code.code_of_status status)
            (Code.string_of_method (Request.meth request))
            (Uri.path (Request.uri request)));

      (* Finally, send the response to the client *)
      HTTP.respond ~headers ~body ~status ()
    in
    (* create the server and handle requests with the function defined above *)
    let conn_closed (_, conn) =
      Log.debug (fun l -> l "connection %a closed" pp_con conn)
    in
    HTTP.make ~callback ~conn_closed ()
end