1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
open! Import
(** Maker for a module that can manage GC processes. *)
module Make (Args : Gc_args.S) = struct
module Args = Args
open Args
module Io = Fm.Io
module Ao = Append_only_file.Make (Io) (Errs)
module Worker = Gc_worker.Make (Args)
type t = {
root : string;
generation : int;
task : Async.t;
unlink : bool;
new_suffix_start_offset : int63;
resolver : (Stats.Latest_gc.stats, Errs.t) result Lwt.u;
promise : (Stats.Latest_gc.stats, Errs.t) result Lwt.t;
dispatcher : Dispatcher.t;
fm : Fm.t;
contents : read Contents_store.t;
node : read Node_store.t;
commit : read Commit_store.t;
mutable partial_stats : Gc_stats.Main.t;
mutable resulting_stats : Stats.Latest_gc.stats option;
latest_gc_target_offset : int63;
}
let v ~root ~new_files_path ~generation ~unlink ~dispatcher ~fm ~contents
~node ~commit commit_key =
let new_suffix_start_offset, latest_gc_target_offset =
let state : _ Pack_key.state = Pack_key.inspect commit_key in
match state with
| Direct x ->
let len = x.length |> Int63.of_int in
(Int63.Syntax.(x.offset + len), x.offset)
| Indexed _ ->
assert false
in
let partial_stats =
let commit_offset = latest_gc_target_offset in
let before_suffix_start_offset =
Dispatcher.suffix_start_offset dispatcher
in
let before_suffix_end_offset = Dispatcher.end_offset dispatcher in
Gc_stats.Main.create "worker startup" ~commit_offset ~generation
~before_suffix_start_offset ~before_suffix_end_offset
~after_suffix_start_offset:new_suffix_start_offset
in
let unlink_result_file () =
let result_file = Irmin_pack.Layout.V4.gc_result ~root ~generation in
match Io.unlink result_file with
| Ok () -> ()
| Error (`Sys_error msg as err) ->
if msg <> Fmt.str "%s: No such file or directory" result_file then
[%log.warn
"Unlinking temporary files from previous failed gc. Failed with \
error %a"
(Irmin.Type.pp Errs.t) err]
in
unlink_result_file ();
let promise, resolver = Lwt.wait () in
let task =
Async.async (fun () ->
Worker.run_and_output_result root commit_key new_suffix_start_offset
~generation ~new_files_path)
in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "before finalise"
in
{
root;
generation;
unlink;
new_suffix_start_offset;
task;
promise;
resolver;
dispatcher;
fm;
contents;
node;
commit;
partial_stats;
resulting_stats = None;
latest_gc_target_offset;
}
let swap t removable_chunk_num suffix_params =
let { generation; latest_gc_target_offset; _ } = t in
let Worker.
{
start_offset = suffix_start_offset;
chunk_start_idx;
dead_bytes = suffix_dead_bytes;
} =
suffix_params
in
let suffix = Fm.suffix t.fm in
let chunk_num = Fm.Suffix.chunk_num suffix - removable_chunk_num in
assert (chunk_num >= 1);
Fm.swap t.fm ~generation ~suffix_start_offset ~chunk_start_idx ~chunk_num
~suffix_dead_bytes ~latest_gc_target_offset
let unlink_all { root; generation; _ } removable_chunk_idxs =
let result =
let open Result_syntax in
let* () =
removable_chunk_idxs
|> List.iter_result @@ fun chunk_idx ->
let path = Irmin_pack.Layout.V4.suffix_chunk ~root ~chunk_idx in
Io.unlink path
in
let* () =
if generation >= 2 then
let prefix =
Irmin_pack.Layout.V4.prefix ~root ~generation:(generation - 1)
in
let* () = Io.unlink prefix in
let mapping =
Irmin_pack.Layout.V4.mapping ~root ~generation:(generation - 1)
in
let* () = Io.unlink mapping in
Ok ()
else Ok ()
in
let result = Irmin_pack.Layout.V4.gc_result ~root ~generation in
Io.unlink result
in
match result with
| Error e ->
[%log.warn
"Unlinking temporary files after gc, failed with error %a"
(Irmin.Type.pp Errs.t) e]
| Ok () -> ()
let gc_errors status gc_output =
let extend_error s = function
| `Gc_process_error str -> `Gc_process_error (Fmt.str "%s %s" s str)
| `Corrupted_gc_result_file str ->
`Gc_process_died_without_result_file (Fmt.str "%s %s" s str)
in
match (status, gc_output) with
| `Failure s, Error e -> Error (extend_error s e)
| `Cancelled, Error e -> Error (extend_error "cancelled" e)
| `Success, Error e -> Error (extend_error "success" e)
| `Cancelled, Ok _ -> Error (`Gc_process_error "cancelled")
| `Failure s, Ok _ -> Error (`Gc_process_error s)
| `Success, Ok _ -> assert false
let read_gc_output ~root ~generation =
let open Result_syntax in
let read_file () =
let path = Irmin_pack.Layout.V4.gc_result ~root ~generation in
let* io = Io.open_ ~path ~readonly:true in
let* len = Io.read_size io in
let len = Int63.to_int len in
let* string = Io.read_to_string io ~off:Int63.zero ~len in
let* () = Io.close io in
Ok string
in
let read_error err =
`Corrupted_gc_result_file (Irmin.Type.to_string Errs.t err)
in
let gc_error err = `Gc_process_error (Irmin.Type.to_string Errs.t err) in
let* s = read_file () |> Result.map_error read_error in
match Irmin.Type.of_json_string Worker.gc_output_t s with
| Error (`Msg error) -> Error (`Corrupted_gc_result_file error)
| Ok ok -> ok |> Result.map_error gc_error
let clean_after_abort t = Fm.cleanup t.fm
let finalise ~wait t =
match t.resulting_stats with
| Some partial_stats -> Lwt.return_ok (`Finalised partial_stats)
| None -> (
let partial_stats = t.partial_stats in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "worker wait"
in
let go status =
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "read output"
in
let gc_output =
read_gc_output ~root:t.root ~generation:t.generation
in
let result =
let open Result_syntax in
match (status, gc_output) with
| ( `Success,
Ok { suffix_params; removable_chunk_idxs; stats = worker_stats }
) ->
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats
"swap and purge"
in
let* () =
swap t (List.length removable_chunk_idxs) suffix_params
in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "unlink"
in
if t.unlink then unlink_all t removable_chunk_idxs;
let stats =
let after_suffix_end_offset =
Dispatcher.end_offset t.dispatcher
in
Gc_stats.Main.finalise partial_stats worker_stats
~after_suffix_end_offset
in
Stats.report_latest_gc stats;
t.resulting_stats <- Some stats;
[%log.debug
"Gc ended successfully. %a"
(Irmin.Type.pp Stats.Latest_gc.stats_t)
stats];
let () = Lwt.wakeup_later t.resolver (Ok stats) in
Ok (`Finalised stats)
| _ ->
clean_after_abort t;
let err = gc_errors status gc_output in
let () = Lwt.wakeup_later t.resolver err in
err
in
Lwt.return result
in
if wait then
let* status = Async.await t.task in
go status
else
match Async.status t.task with
| `Running -> Lwt.return_ok `Running
| #Async.outcome as status -> go status)
let finalise_without_swap t =
let* status = Async.await t.task in
let gc_output = read_gc_output ~root:t.root ~generation:t.generation in
match (status, gc_output) with
| `Success, Ok _ ->
Lwt.return (t.latest_gc_target_offset, t.new_suffix_start_offset)
| _ ->
let r = gc_errors status gc_output |> Errs.raise_if_error in
Lwt.return r
let on_finalise t f =
let _ = Lwt.bind t.promise f in
()
let cancel t =
let cancelled = Async.cancel t.task in
if cancelled then clean_after_abort t;
cancelled
end