Source file rsync_store.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
(* The rsync backend is intended for stability, portability and testing. It
   is not supposed to be fast nor is it supposed to be particularly memory
   efficient. *)
open Lwt.Infix

(* The caching approach (and much of the code) is copied from the btrfs
   implementation *)
type cache = {
  lock : Lwt_mutex.t;
  mutable gen : int;
}

type mode =
  | Copy
  | Hardlink
  | Hardlink_unsafe

type t = {
  path : string;
  mode : mode;
  caches : (string, cache) Hashtbl.t;
  mutable next : int;
}

let ( / ) = Filename.concat

module Rsync = struct
  let create dir = Lwt.return @@ Os.ensure_dir dir

  let delete dir =
    Os.sudo [ "rm"; "-r"; dir ]

  let rsync = [ "rsync"; "-aHq" ]

  let rename ~src ~dst =
    let cmd = [ "mv"; src; dst ] in
    Os.sudo cmd

  let rename_with_sharing ~mode ~base ~src ~dst = match mode, base with
    | Copy, _ | _, None -> rename ~src ~dst
    | _, Some base ->
      (* Attempt to hard-link existing files shared with [base] *)
      let safe = match mode with
        | Hardlink -> ["--checksum"]
        | _ -> []
      in
      let cmd = rsync @ safe @ ["--link-dest=" ^ base; src ^ "/"; dst ] in
      Os.ensure_dir dst;
      Os.sudo cmd >>= fun () ->
      delete src

  let copy_children ?chown ~src ~dst () =
    let chown = match chown with
      | Some uid_gid -> [ "--chown"; uid_gid ]
      | None -> []
    in
    let cmd = rsync @ chown @ [ src ^ "/"; dst ] in
    Os.ensure_dir dst;
    Os.sudo cmd
end

module Path = struct
  let state_dirname = "state"
  let cache_dirname = "cache"
  let cache_tmp_dirname = "cache-tmp"

  let result_dirname = "result"
  let result_tmp_dirname = "result-tmp"

  let dirs root =
    List.map ((/) root)
      [ state_dirname; cache_dirname; cache_tmp_dirname; result_dirname; result_tmp_dirname ]

  let result t id = t.path / result_dirname / id
  let cache t id = t.path / cache_dirname / id

  let cache_tmp t n id = t.path / cache_tmp_dirname / Printf.sprintf "%i-%s" n id

  let result_tmp t id = t.path / result_tmp_dirname / id
end

let root t = t.path

let create ~path ?(mode = Copy) () =
  Rsync.create path >>= fun () ->
  Lwt_list.iter_s Rsync.create (Path.dirs path) >|= fun () ->
  { path; mode; caches = Hashtbl.create 10; next = 0 }

let build t ?base ~id fn =
  Log.debug (fun f -> f "rsync: build %S" id);
  let result = Path.result t id in
  let result_tmp = Path.result_tmp t id in
  let base = Option.map (Path.result t) base in
  begin match base with
  | None -> Rsync.create result_tmp
  | Some src -> Rsync.copy_children ~src ~dst:result_tmp ()
  end
  >>= fun () ->
  Lwt.try_bind
    (fun () -> fn result_tmp)
    (fun r ->
      begin match r with
      | Ok () -> Rsync.rename_with_sharing ~mode:t.mode ~base ~src:result_tmp ~dst:result
      | Error _ -> Rsync.delete result_tmp
      end >>= fun () ->
      Lwt.return r
    )
    (fun ex ->
      Log.warn (fun f -> f "Uncaught exception from %S build function: %a" id Fmt.exn ex);
      Rsync.delete result_tmp >>= fun () ->
      Lwt.fail ex
    )

let delete t id =
  let path = Path.result t id in
  match Os.check_dir path with
  | `Present -> Rsync.delete path
  | `Missing -> Lwt.return_unit

let result t id =
  let dir = Path.result t id in
  match Os.check_dir dir with
  | `Present -> Lwt.return_some dir
  | `Missing -> Lwt.return_none

let log_file t id =
  result t id >|= function
  | Some dir -> dir / "log"
  | None -> (Path.result_tmp t id) / "log"

let state_dir t = t.path / Path.state_dirname

let get_cache t name =
  match Hashtbl.find_opt t.caches name with
  | Some c -> c
  | None ->
    let c = { lock = Lwt_mutex.create (); gen = 0 } in
    Hashtbl.add t.caches name c;
    c

let cache ~user t name =
  let cache = get_cache t name in
  Lwt_mutex.with_lock cache.lock @@ fun () ->
  let tmp = Path.cache_tmp t t.next name in
  t.next <- t.next + 1;
  let snapshot = Path.cache t name in
  (* Create cache if it doesn't already exist. *)
  begin match Os.check_dir snapshot with
    | `Missing -> Rsync.create snapshot
    | `Present -> Lwt.return_unit
  end >>= fun () ->
  (* Create writeable clone. *)
  let gen = cache.gen in
  let { Obuilder_spec.uid; gid } = match user with
    | `Unix user -> user
    | `Windows _ -> assert false (* rsync not supported on Windows *)
  in
  (* rsync --chown not supported by the rsync that macOS ships with *)
  Rsync.copy_children ~src:snapshot ~dst:tmp () >>= fun () ->
  Os.sudo [ "chown"; Printf.sprintf "%d:%d" uid gid; tmp ] >>= fun () ->
  let release () =
    Lwt_mutex.with_lock cache.lock @@ fun () ->
    begin
      if cache.gen = gen then (
        (* The cache hasn't changed since we cloned it. Update it. *)
        (* todo: check if it has actually changed. *)
        cache.gen <- cache.gen + 1;
        Rsync.delete snapshot >>= fun () ->
        Rsync.rename ~src:tmp ~dst:snapshot
      ) else Lwt.return_unit
    end
  in
  Lwt.return (tmp, release)


let delete_cache t name =
  let cache = get_cache t name in
  Lwt_mutex.with_lock cache.lock @@ fun () ->
  cache.gen <- cache.gen + 1;   (* Ensures in-progress writes will be discarded *)
  let snapshot = Path.cache t name in
  if Sys.file_exists snapshot then (
    Rsync.delete snapshot >>= fun () ->
    Lwt_result.return ()
  ) else Lwt_result.return ()

(* Don't think this applies to rsync *)
let complete_deletes _t = Lwt.return_unit