1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
open! Import
module Payload = Control_file.Latest_payload
exception Pack_error = Errors.Pack_error
module Make (Args : Gc_args.S) = struct
open Args
module Io = Fm.Io
module Mapping_file = Dispatcher.Mapping_file
module Ao = struct
include Append_only_file.Make (Fm.Io) (Errs)
let create_rw_exn ~path =
create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_procedure:`Internal
|> Errs.raise_if_error
end
let string_of_key = Irmin.Type.to_string key_t
module Priority_queue = struct
module Offset_rev = struct
type t = int63
let equal = Int63.equal
let hash (t : t) = Hashtbl.hash t
let compare a b = Int63.compare b a
end
module Table = Hashtbl.Make (Offset_rev)
module Pq = Binary_heap.Make (Offset_rev)
type 'a t = { pq : Pq.t; marks : 'a Table.t }
let create size =
{ pq = Pq.create ~dummy:Int63.zero size; marks = Table.create size }
let is_empty t = Pq.is_empty t.pq
let pop { pq; marks } =
let elt = Pq.pop_minimum pq in
let payload = Table.find marks elt in
Table.remove marks elt;
(elt, payload)
let add { pq; marks } elt payload =
if not (Table.mem marks elt) then (
Table.add marks elt payload;
Pq.add pq elt)
end
(** [iter_reachable commit node_store ~f] calls [f ~off ~len] once for each
[offset] and [length] of the reachable tree objects and immediate parent
commits from [commit] in [node_store]. *)
let iter_reachable commit node_store ~f =
let todos = Priority_queue.create 1024 in
let rec loop () =
if not (Priority_queue.is_empty todos) then (
let offset, has_children = Priority_queue.pop todos in
let node_key = Node_store.key_of_offset node_store offset in
let length =
match Pack_key.inspect node_key with
| Direct { length; _ } -> length
| Indexed _ -> assert false
in
f ~off:offset ~len:length;
if has_children then iter_node node_key;
loop ())
and iter_node node_key =
match Node_store.unsafe_find_no_prefetch node_store node_key with
| None -> raise (Pack_error (`Dangling_key (string_of_key node_key)))
| Some node ->
List.iter
(fun (_step, kinded_key) -> schedule_kinded kinded_key)
(Node_value.pred node)
and schedule_kinded kinded_key =
let key, has_children =
match kinded_key with
| `Contents key -> (key, false)
| `Inode key | `Node key -> (key, true)
in
let offset =
match Pack_key.to_offset key with
| Some offset -> offset
| None ->
raise
(Pack_error (`Node_or_contents_key_is_indexed (string_of_key key)))
in
schedule offset has_children
and schedule offset has_children =
Priority_queue.add todos offset has_children
in
let schedule_parent_exn key =
match Pack_key.to_offset key with
| Some offset -> schedule offset false
| None -> ()
in
List.iter schedule_parent_exn (Commit_value.parents commit);
schedule_kinded (`Node (Commit_value.node commit));
loop ()
let magic_parent =
Pack_value.Kind.to_magic Pack_value.Kind.Dangling_parent_commit
let transfer_parent_commit_exn ~read_exn ~write_exn ~mapping key =
match Pack_key.inspect key with
| Indexed _ ->
()
| Direct { offset = off; length = len; _ } ->
let buffer = Bytes.create len in
read_exn ~off ~len buffer;
let accessor =
Dispatcher.create_accessor_to_prefix_exn mapping ~off ~len
in
Bytes.set buffer Hash.hash_size magic_parent;
write_exn ~off:accessor.poff ~len (Bytes.unsafe_to_string buffer)
let report_old_file_sizes ~root ~generation stats =
let open Result_syntax in
let* prefix_size =
if generation = 0 then Ok Int63.zero
else Irmin_pack.Layout.V3.prefix ~root ~generation |> Io.size_of_path
in
let+ mapping_size =
if generation = 0 then Ok Int63.zero
else Irmin_pack.Layout.V3.mapping ~root ~generation |> Io.size_of_path
in
stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size
type suffix_params = {
start_offset : int63;
chunk_start_idx : int;
dead_bytes : int63;
}
[@@deriving irmin]
type gc_results = {
suffix_params : suffix_params;
removable_chunk_idxs : int list;
stats : Stats.Latest_gc.worker;
}
[@@deriving irmin]
type gc_output = (gc_results, Args.Errs.t) result [@@deriving irmin]
let run ~generation ~new_files_path root commit_key new_suffix_start_offset =
let open Result_syntax in
let config =
Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 root
in
[%log.debug "GC: opening files in RO mode"];
let stats = ref (Gc_stats.Worker.create "open files") in
let () =
report_old_file_sizes ~root ~generation:(generation - 1) stats |> ignore
in
let fm = Fm.open_ro config |> Errs.raise_if_error in
Errors.finalise_exn (fun _outcome ->
Fm.close fm |> Errs.log_if_error "GC: Close File_manager")
@@ fun () ->
let dict = Dict.v fm |> Errs.raise_if_error in
let dispatcher = Dispatcher.v fm |> Errs.raise_if_error in
let node_store = Node_store.v ~config ~fm ~dict ~dispatcher in
let commit_store = Commit_store.v ~config ~fm ~dict ~dispatcher in
stats := Gc_stats.Worker.finish_current_step !stats "load commit";
let commit =
match
Commit_store.unsafe_find ~check_integrity:false commit_store commit_key
with
| None ->
Errs.raise_error (`Commit_key_is_dangling (string_of_key commit_key))
| Some commit -> commit
in
let mapping =
stats := Gc_stats.Worker.finish_current_step !stats "mapping: start";
(fun f ->
let report_mapping_size size =
stats := Gc_stats.Worker.add_file_size !stats "mapping" size
in
Mapping_file.create ~report_mapping_size ~root:new_files_path
~generation ~register_entries:f ()
|> Errs.raise_if_error)
@@ fun ~register_entry ->
stats :=
Gc_stats.Worker.finish_current_step !stats
"mapping: commits to reachable";
let register_entry ~off ~len =
stats := Gc_stats.Worker.incr_objects_traversed !stats;
register_entry ~off ~len
in
let register_object_exn key =
match Pack_key.inspect key with
| Direct { offset; length; _ } -> register_entry ~off:offset ~len:length
| Indexed _ -> ()
in
register_object_exn commit_key;
stats :=
Gc_stats.Worker.finish_current_step !stats
"mapping: objects to reachable";
iter_reachable commit node_store ~f:register_entry;
stats :=
Gc_stats.Worker.finish_current_step !stats "mapping: of reachable";
()
in
let () =
stats := Gc_stats.Worker.finish_current_step !stats "prefix: start";
let prefix =
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Ao.create_rw_exn ~path
in
let () =
Errors.finalise_exn (fun _outcome ->
stats :=
Gc_stats.Worker.add_file_size !stats "prefix" (Ao.end_poff prefix);
Ao.close prefix |> Errs.log_if_error "GC: Close prefix")
@@ fun () ->
[%log.debug "GC: transfering to the new prefix"];
stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer";
let append_exn = Ao.append_exn prefix in
let f ~off ~len =
let len = Int63.of_int len in
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
let () = Mapping_file.iter_exn mapping f in
Ao.flush prefix |> Errs.raise_if_error
in
stats :=
Gc_stats.Worker.finish_current_step !stats
"prefix: rewrite commit parents";
let read_exn ~off ~len buf =
let accessor = Dispatcher.create_accessor_exn dispatcher ~off ~len in
Dispatcher.read_exn dispatcher accessor buf
in
let prefix =
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Io.open_ ~path ~readonly:false |> Errs.raise_if_error
in
Errors.finalise_exn (fun _outcome ->
Io.fsync prefix
>>= (fun _ -> Io.close prefix)
|> Errs.log_if_error "GC: Close prefix after parent rewrite")
@@ fun () ->
let write_exn = Io.write_exn prefix in
List.iter
(fun key ->
transfer_parent_commit_exn ~read_exn ~write_exn ~mapping key)
(Commit_value.parents commit)
in
let suffix_params, removable_chunk_idxs =
stats :=
Gc_stats.Worker.finish_current_step !stats
"suffix: calculate new values";
let suffix = Fm.suffix fm in
let soff = Dispatcher.soff_of_offset dispatcher new_suffix_start_offset in
let open struct
type chunk = { idx : int; end_suffix_off : int63 }
end in
let removable_chunks =
match Fm.Suffix.chunk_num suffix with
| 1 -> []
| _ ->
Fm.Suffix.fold_chunks
(fun ~acc ~idx ~start_suffix_off ~end_suffix_off ~is_appendable ->
let is_empty = start_suffix_off = end_suffix_off in
let ends_with_or_is_before_soff = end_suffix_off <= soff in
let is_removable =
(not is_appendable)
&& (not is_empty)
&& ends_with_or_is_before_soff
in
if is_removable then { idx; end_suffix_off } :: acc else acc)
[] suffix
in
let chunk_start_idx =
match removable_chunks with
| [] -> Fm.Suffix.start_idx suffix
| last_removed_chunk :: _ -> succ last_removed_chunk.idx
in
let suffix_dead_bytes =
match removable_chunks with
| [] -> Dispatcher.soff_of_offset dispatcher new_suffix_start_offset
| last_removed_chunk :: _ ->
let removed_end_offset =
last_removed_chunk.end_suffix_off
|> Dispatcher.offset_of_soff dispatcher
in
Int63.Syntax.(new_suffix_start_offset - removed_end_offset)
in
assert (Int63.Syntax.(suffix_dead_bytes >= Int63.zero));
let removable_chunk_idxs =
removable_chunks |> List.map (fun c -> c.idx)
in
( {
start_offset = new_suffix_start_offset;
dead_bytes = suffix_dead_bytes;
chunk_start_idx;
},
removable_chunk_idxs )
in
let stats = Gc_stats.Worker.finalise !stats in
{ suffix_params; removable_chunk_idxs; stats }
let write_gc_output ~root ~generation output =
let open Result_syntax in
let path = Irmin_pack.Layout.V4.gc_result ~root ~generation in
let* io = Io.create ~path ~overwrite:true in
let out = Irmin.Type.to_json_string gc_output_t output in
let* () = Io.write_string io ~off:Int63.zero out in
let* () = Io.fsync io in
Io.close io
let run_and_output_result ~generation ~new_files_path root commit_key
new_suffix_start_offset =
let result =
Errs.catch (fun () ->
run ~generation ~new_files_path root commit_key
new_suffix_start_offset)
in
Errs.log_if_error "gc run" result;
let write_result = write_gc_output ~root ~generation result in
write_result |> Errs.log_if_error "writing gc output"
end