Source file reconstruct_index.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
open! Import
module IO = IO.Unix
module Stats : sig
type t
val empty : unit -> t
val add : t -> Pack_value.Kind.t -> unit
val pp : t Fmt.t
end = struct
open Pack_value.Kind
type t = int array
let empty () = Array.make 4 0
let incr t n = t.(n) <- t.(n) + 1
let add t = function
| Contents -> incr t 0
| Commit -> incr t 1
| Node -> incr t 2
| Inode -> incr t 3
let pp =
let open Fmt.Dump in
record
[
field "Contents" (fun t -> t.(0)) Fmt.int;
field "Commit" (fun t -> t.(1)) Fmt.int;
field "Node" (fun t -> t.(2)) Fmt.int;
field "Inode" (fun t -> t.(3)) Fmt.int;
]
end
module type Args = sig
module Version : Version.S
module Hash : Irmin.Hash.S
module Index : Pack_index.S with type key := Hash.t
module Inode : Inode.S with type key := Hash.t
module Dict : Pack_dict.S
module Contents : Pack_value.S
module Commit : Pack_value.S
end
module Make (Args : Args) : sig
val run : ?output:string -> Irmin.config -> unit
end = struct
open Args
let pp_key = Irmin.Type.pp Hash.t
let decode_key = Irmin.Type.(unstage (decode_bin Hash.t))
let decode_kind = Irmin.Type.(unstage (decode_bin Pack_value.Kind.t))
exception Not_enough_buffer
type index_binding = { key : Hash.t; data : Index.value }
let decode_entry_length = function
| Pack_value.Kind.Contents -> Contents.decode_bin_length
| Commit -> Commit.decode_bin_length
| Node | Inode -> Inode.decode_bin_length
let decode_entry_exn ~off ~buffer ~buffer_off =
try
let off_after_key, key = decode_key buffer buffer_off in
assert (off_after_key = buffer_off + Hash.hash_size);
let off_after_kind, kind = decode_kind buffer off_after_key in
assert (off_after_kind = buffer_off + Hash.hash_size + 1);
let entry_len = decode_entry_length kind buffer buffer_off in
{ key; data = (off, entry_len, kind) }
with
| Invalid_argument msg when msg = "index out of bounds" ->
raise Not_enough_buffer
| Invalid_argument msg when msg = "String.blit / Bytes.blit_string" ->
raise Not_enough_buffer
let ingest_data_file ~progress ~total pack index =
let buffer = ref (Bytes.create 1024) in
let refill_buffer ~from =
let read = IO.read pack ~off:from !buffer in
let filled = read = Bytes.length !buffer in
let eof = Int63.equal total (Int63.add from (Int63.of_int read)) in
if (not filled) && not eof then
Fmt.failwith
"When refilling from offset %#Ld (total %#Ld), read %#d but expected \
%#d"
(Int63.to_int64 from) (Int63.to_int64 total) read
(Bytes.length !buffer)
in
let expand_and_refill_buffer ~from =
let length = Bytes.length !buffer in
if length > 1_000_000_000 then
Fmt.failwith
"Couldn't decode the value at offset %a in %d of buffer space. \
Corrupted data file?"
Int63.pp from length
else (
buffer := Bytes.create (2 * length);
refill_buffer ~from)
in
let stats = Stats.empty () in
let rec loop_entries ~buffer_off off =
if off >= total then stats
else
let buffer_off, off =
match
decode_entry_exn ~off
~buffer:(Bytes.unsafe_to_string !buffer)
~buffer_off
with
| { key; data } ->
let off', entry_len, kind = data in
let entry_lenL = Int63.of_int entry_len in
assert (off = off');
Log.debug (fun l ->
l "k = %a (off, len, kind) = (%a, %d, %a)" pp_key key Int63.pp
off entry_len Pack_value.Kind.pp kind);
Stats.add stats kind;
Index.add index key data;
progress entry_lenL;
(buffer_off + entry_len, off ++ entry_lenL)
| exception Not_enough_buffer ->
let () =
if buffer_off > 0 then
refill_buffer ~from:off
else
expand_and_refill_buffer ~from:off
in
(0, off)
in
loop_entries ~buffer_off off
in
refill_buffer ~from:Int63.zero;
loop_entries ~buffer_off:0 Int63.zero
let run ?output config =
if Conf.readonly config then raise S.RO_not_allowed;
let run_duration = Mtime_clock.counter () in
let root = Conf.root config in
let dest = match output with Some path -> path | None -> root in
let log_size = Conf.index_log_size config in
Log.app (fun f ->
f "Beginning index reconstruction with parameters: { log_size = %d }"
log_size);
let index = Index.v ~fresh:true ~readonly:false ~log_size dest in
let pack_file = Filename.concat root "store.pack" in
let pack =
IO.v ~fresh:false ~readonly:true ~version:(Some Version.version) pack_file
in
let total = IO.offset pack in
let bar, progress =
Utils.Progress.counter ~total ~sampling_interval:100
~message:"Reconstructing index" ~pp_count:Utils.pp_bytes ()
in
let stats = ingest_data_file ~progress ~total pack index in
Utils.Progress.finalise bar;
Log.app (fun f ->
f "Completed indexing of pack entries. Running a final merge ...");
Index.try_merge index;
Index.close index;
IO.close pack;
let run_duration = Mtime_clock.count run_duration in
Log.app (fun f ->
f "%a in %a. Store statistics:@, @[<v 0>%a@]"
Fmt.(styled `Green string)
"Success" Mtime.Span.pp run_duration Stats.pp stats)
end