1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
open! Import
module Payload = Control_file.Payload.Upper.Latest
exception Pack_error = Errors.Pack_error
module Make (Args : Gc_args.S) = struct
open Args
module Io = Fm.Io
module Lower = Fm.Lower
module Sparse = Dispatcher.Fm.Sparse
module Ao = Append_only_file.Make (Fm.Io) (Errs)
let string_of_key = Irmin.Type.to_string key_t
module Priority_queue = struct
module Offset_rev = struct
type t = int63
let equal = Int63.equal
let hash (t : t) = Hashtbl.hash t
let compare a b = Int63.compare b a
end
module Table = Hashtbl.Make (Offset_rev)
module Pq = Binary_heap.Make (Offset_rev)
type 'a t = { pq : Pq.t; marks : 'a Table.t }
let create size =
{ pq = Pq.create ~dummy:Int63.zero size; marks = Table.create size }
let is_empty t = Pq.is_empty t.pq
let pop { pq; marks } =
let elt = Pq.pop_minimum pq in
let payload = Table.find marks elt in
Table.remove marks elt;
(elt, payload)
let add { pq; marks } elt payload =
if not (Table.mem marks elt) then (
Table.add marks elt payload;
Pq.add pq elt)
end
type kind = Contents | Node | Commit
(** [iter_reachable ~parents ~min_offset commit_key commit_store node_store]
returns the list of compacted [(offset, length)] of the reachable tree
objects and parent commits from [commit_key] in [commit_store] and
[node_store].
- If [parents] is true, then parents commits and their tree objects are
traversed recursively. Otherwise, only the immediate parent are included
(but not their tree objects).
- [min_offset] restricts the traversal to objects/commits with a larger or
equal offset. *)
let iter_reachable ~parents ~min_offset commit_key commit_store node_store =
let live = Ranges.make () in
let todos = Priority_queue.create 1024 in
let rec loop () =
if not (Priority_queue.is_empty todos) then (
let offset, kind = Priority_queue.pop todos in
iter_node offset kind;
loop ())
and iter_node off = function
| (Contents | Node) as kind -> (
let node_key = Node_store.key_of_offset node_store off in
let len = Node_store.get_length node_store node_key in
Ranges.add live ~off ~len;
if kind = Node then
match Node_store.unsafe_find_no_prefetch node_store node_key with
| None ->
raise (Pack_error (`Dangling_key (string_of_key node_key)))
| Some node ->
List.iter
(fun (_step, kinded_key) -> schedule_kinded kinded_key)
(Node_value.pred node))
| Commit -> (
let commit_key = Commit_store.key_of_offset commit_store off in
let len = Commit_store.get_length commit_store commit_key in
Ranges.add live ~off ~len;
match
Commit_store.unsafe_find ~check_integrity:false commit_store
commit_key
with
| None ->
raise (Pack_error (`Dangling_key (string_of_key commit_key)))
| Some commit ->
List.iter schedule_commit (Commit_value.parents commit);
schedule_kinded (`Node (Commit_value.node commit)))
and schedule_kinded kinded_key =
let key, kind =
match kinded_key with
| `Contents key -> (key, Contents)
| `Inode key | `Node key -> (key, Node)
in
match Node_store.get_offset node_store key with
| offset -> schedule ~offset kind
| exception Pack_store.Dangling_hash -> ()
and schedule_commit key =
match Commit_store.get_offset commit_store key with
| offset ->
let kind =
if parents then Commit
else
Contents
in
schedule ~offset kind
| exception Pack_store.Dangling_hash -> ()
and schedule ~offset kind =
if offset >= min_offset then Priority_queue.add todos offset kind
in
let offset = Commit_store.get_offset commit_store commit_key in
schedule ~offset Commit;
loop ();
live
(** [snaphshot_commit commit_key commit_store node_store] returns the list of
compacted [(offset, length)] of the reachable tree objects and its direct
parent commits. *)
let snapshot_commit commit_key commit_store node_store =
iter_reachable ~parents:false ~min_offset:Int63.zero commit_key commit_store
node_store
(** [traverse_range ~min_offset commit_key commit_store node_store] returns
the list of compacted [(offset, length)] of the recursively reachable tree
objects and parent commits, such that [offset >= min_offset]. *)
let traverse_range ~min_offset commit_key commit_store node_store =
iter_reachable ~parents:true ~min_offset commit_key commit_store node_store
let magic_parent =
Pack_value.Kind.to_magic Pack_value.Kind.Dangling_parent_commit
|> String.make 1
let transfer_parent_commit_exn ~write_exn key =
match Pack_key.inspect key with
| Indexed _ ->
()
| Direct { offset = off; _ } ->
let off = Int63.(Syntax.(off + of_int Hash.hash_size)) in
write_exn ~off ~len:1 magic_parent
let prefix_file_sizes ~root ~generation =
let open Result_syntax in
let* prefix_size =
if generation = 0 then Ok Int63.zero
else Irmin_pack.Layout.V3.prefix ~root ~generation |> Io.size_of_path
in
let+ mapping_size =
if generation = 0 then Ok Int63.zero
else Irmin_pack.Layout.V3.mapping ~root ~generation |> Io.size_of_path
in
(mapping_size, prefix_size)
let report_old_file_sizes ~root ~generation stats =
let open Result_syntax in
let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in
stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size
let report_new_file_sizes ~root ~generation stats =
let open Result_syntax in
let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in
stats := Gc_stats.Worker.add_file_size !stats "prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "mapping" mapping_size
type suffix_params = {
start_offset : int63;
chunk_start_idx : int;
dead_bytes : int63;
}
[@@deriving irmin]
type gc_results = {
suffix_params : suffix_params;
mapping_size : int63;
removable_chunk_idxs : int list;
modified_volume : Lower.volume_identifier option;
stats : Stats.Latest_gc.worker;
}
[@@deriving irmin]
type gc_output = (gc_results, Args.Errs.t) result [@@deriving irmin]
let run ~lower_root ~generation ~new_files_path root commit_key
new_suffix_start_offset =
let open Result_syntax in
let config =
Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 ~lower_root
root
in
[%log.debug "GC: opening files in RO mode"];
let stats = ref (Gc_stats.Worker.create "open files") in
let () =
report_old_file_sizes ~root ~generation:(generation - 1) stats |> ignore
in
let fm = Fm.open_ro config |> Errs.raise_if_error in
Errors.finalise_exn (fun _outcome ->
Fm.close fm |> Errs.log_if_error "GC: Close File_manager")
@@ fun () ->
let dict = Dict.v fm |> Errs.raise_if_error in
let dispatcher = Dispatcher.v fm |> Errs.raise_if_error in
let lru = Lru.create config in
let node_store = Node_store.v ~config ~fm ~dict ~dispatcher ~lru in
let commit_store = Commit_store.v ~config ~fm ~dict ~dispatcher ~lru in
stats := Gc_stats.Worker.finish_current_step !stats "load commit";
let commit =
match
Commit_store.unsafe_find ~check_integrity:false commit_store commit_key
with
| None ->
Errs.raise_error (`Commit_key_is_dangling (string_of_key commit_key))
| Some commit -> commit
in
let live_entries =
stats := Gc_stats.Worker.finish_current_step !stats "mapping: start";
let live_entries = snapshot_commit commit_key commit_store node_store in
stats :=
Gc_stats.Worker.finish_current_step !stats "mapping: of reachable";
stats :=
Gc_stats.Worker.set_objects_traversed !stats (Ranges.count live_entries);
live_entries
in
let mapping_size =
stats := Gc_stats.Worker.finish_current_step !stats "prefix: start";
let mapping =
Irmin_pack.Layout.V4.mapping ~root:new_files_path ~generation
in
let data = Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation in
let mapping_size =
let prefix = Sparse.Ao.create ~mapping ~data |> Errs.raise_if_error in
[%log.debug "GC: transfering to the new prefix"];
stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer";
Errors.finalise_exn (fun _ ->
Sparse.Ao.flush prefix
>>= (fun _ -> Sparse.Ao.close prefix)
|> Errs.log_if_error "GC: Close prefix after data copy")
@@ fun () ->
Ranges.iter
(fun ~off ~len ->
let str = Dispatcher.read_seq_exn dispatcher ~off ~len in
Sparse.Ao.append_seq_exn prefix ~off str)
live_entries;
Int63.to_int (Sparse.Ao.mapping_size prefix)
in
let () =
stats :=
Gc_stats.Worker.finish_current_step !stats
"prefix: rewrite commit parents";
let prefix =
Sparse.Wo.open_wo ~mapping_size ~mapping ~data |> Errs.raise_if_error
in
Errors.finalise_exn (fun _outcome ->
Sparse.Wo.fsync prefix
>>= (fun _ -> Sparse.Wo.close prefix)
|> Errs.log_if_error "GC: Close prefix after parent rewrite")
@@ fun () ->
let write_exn = Sparse.Wo.write_exn prefix in
List.iter
(fun key -> transfer_parent_commit_exn ~write_exn key)
(Commit_value.parents commit)
in
Int63.of_int mapping_size
in
let () = report_new_file_sizes ~root ~generation stats |> ignore in
let suffix_params, mapping_size, removable_chunk_idxs =
stats :=
Gc_stats.Worker.finish_current_step !stats
"suffix: calculate new values";
let suffix = Fm.suffix fm in
let soff = Dispatcher.soff_of_offset dispatcher new_suffix_start_offset in
assert (Int63.Syntax.(soff >= Int63.zero));
let open struct
type chunk = { idx : int; end_suffix_off : int63 }
end in
let removable_chunks =
match Fm.Suffix.chunk_num suffix with
| 1 -> []
| _ ->
Fm.Suffix.fold_chunks
(fun ~acc ~idx ~start_suffix_off ~end_suffix_off ~is_appendable ->
let is_empty = start_suffix_off = end_suffix_off in
let ends_with_or_is_before_soff = end_suffix_off <= soff in
let is_removable =
(not is_appendable)
&& (not is_empty)
&& ends_with_or_is_before_soff
in
if is_removable then { idx; end_suffix_off } :: acc else acc)
[] suffix
in
let chunk_start_idx =
match removable_chunks with
| [] -> Fm.Suffix.start_idx suffix
| last_removed_chunk :: _ -> succ last_removed_chunk.idx
in
let suffix_dead_bytes =
match removable_chunks with
| [] -> Dispatcher.soff_of_offset dispatcher new_suffix_start_offset
| last_removed_chunk :: _ ->
let removed_end_offset =
last_removed_chunk.end_suffix_off
|> Dispatcher.offset_of_soff dispatcher
in
Int63.Syntax.(new_suffix_start_offset - removed_end_offset)
in
assert (Int63.Syntax.(suffix_dead_bytes >= Int63.zero));
let removable_chunk_idxs =
removable_chunks |> List.map (fun c -> c.idx)
in
( {
start_offset = new_suffix_start_offset;
dead_bytes = suffix_dead_bytes;
chunk_start_idx;
},
mapping_size,
removable_chunk_idxs )
in
let modified_volume =
match Fm.gc_destination fm with
| `Delete -> None
| `Archive lower ->
[%log.debug "GC: archiving into lower"];
stats :=
Gc_stats.Worker.finish_current_step !stats "archive: iter reachable";
let min_offset = Dispatcher.suffix_start_offset dispatcher in
let to_archive = ref [] in
Ranges.iter
(fun ~off ~len ->
to_archive :=
(off, Dispatcher.read_seq_exn dispatcher ~off ~len)
:: !to_archive)
(traverse_range ~min_offset commit_key commit_store node_store);
let to_archive = List.rev !to_archive in
stats :=
Gc_stats.Worker.finish_current_step !stats "archive: copy to lower";
Lower.set_readonly lower false;
let vol =
Lower.archive_seq_exn ~upper_root:root ~generation ~to_archive lower
in
Lower.set_readonly lower true;
Some vol
in
let stats = Gc_stats.Worker.finalise !stats in
{
suffix_params;
mapping_size;
removable_chunk_idxs;
modified_volume;
stats;
}
let write_gc_output ~root ~generation output =
let open Result_syntax in
let path = Irmin_pack.Layout.V4.gc_result ~root ~generation in
let* io = Io.create ~path ~overwrite:true in
let out = Irmin.Type.to_json_string gc_output_t output in
let* () = Io.write_string io ~off:Int63.zero out in
let* () = Io.fsync io in
Io.close io
let run_and_output_result ~lower_root ~generation ~new_files_path root
commit_key new_suffix_start_offset =
let result =
Errs.catch (fun () ->
run ~lower_root ~generation ~new_files_path root commit_key
new_suffix_start_offset)
in
Errs.log_if_error "gc run" result;
let write_result = write_gc_output ~root ~generation result in
write_result |> Errs.log_if_error "writing gc output"
end