Source file dispatcher.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
open Import
include Dispatcher_intf
module Payload = Control_file.Latest_payload
module Make (Fm : File_manager.S with module Io = Io.Unix) :
S with module Fm = Fm = struct
module Fm = Fm
module Io = Fm.Io
module Suffix = Fm.Suffix
module Mapping_file = Fm.Mapping_file
module Errs = Fm.Errs
module Control = Fm.Control
type t = { fm : Fm.t }
type location = Prefix | Suffix [@@deriving irmin ~pp]
type accessor = { poff : int63; len : int63; location : location }
[@@deriving irmin]
(** [poff] is a physical offset in a file. It is meant to be passed to [Io] or
[Append_only]
[len] is a number of bytes following [poff].
[location] is a file identifier. *)
let v fm =
let t = { fm } in
Ok t
let get_prefix t =
match Fm.prefix t.fm with
| Some prefix -> prefix
| None -> raise (Errors.Pack_error (`Invalid_prefix_read "no prefix found"))
let get_mapping t =
match Fm.mapping t.fm with
| Some mapping -> mapping
| None ->
raise (Errors.Pack_error (`Invalid_mapping_read "no mapping found"))
let suffix_start_offset t =
let pl = Control.payload (Fm.control t.fm) in
match pl.status with
| Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
| No_gc_yet ->
Int63.zero
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
assert false
| Gced { suffix_start_offset; _ } -> suffix_start_offset
let suffix_dead_bytes t =
let pl = Control.payload (Fm.control t.fm) in
match pl.status with
| Payload.From_v1_v2_post_upgrade _ | Used_non_minimal_indexing_strategy
| No_gc_yet ->
Int63.zero
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
| T15 ->
assert false
| Gced { suffix_dead_bytes; _ } -> suffix_dead_bytes
module Suffix_arithmetic = struct
let soff_of_off t off =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
let suffix_dead_bytes = suffix_dead_bytes t in
off - suffix_start_offset + suffix_dead_bytes
let off_of_soff t soff =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
let suffix_dead_bytes = suffix_dead_bytes t in
suffix_start_offset + soff - suffix_dead_bytes
end
let offset_of_soff = Suffix_arithmetic.off_of_soff
let soff_of_offset = Suffix_arithmetic.soff_of_off
let end_offset t =
let end_soff = Suffix.end_soff (Fm.suffix t.fm) in
offset_of_soff t end_soff
module Prefix_arithmetic = struct
let chunk_of_off_exn mapping off_start =
let open Int63 in
let open Int63.Syntax in
let res = Mapping_file.find_nearest_leq mapping off_start in
match res with
| None ->
let s =
Fmt.str
"offset %a is before the first chunk, or the prefix is empty"
Int63.pp off_start
in
raise (Errors.Pack_error (`Invalid_read_of_gced_object s))
| Some entry ->
let chunk_off_start = entry.off in
assert (chunk_off_start <= off_start);
let chunk_len = entry.len in
let chunk_off_end = chunk_off_start + of_int chunk_len in
(if chunk_off_end <= off_start then
let s =
Fmt.str
"offset %a is supposed to be contained in chunk \
(off=%a,poff=%a,len=%d) but starts after chunk"
Int63.pp off_start Int63.pp chunk_off_start Int63.pp entry.poff
entry.len
in
raise (Errors.Pack_error (`Invalid_read_of_gced_object s)));
let shift_in_chunk = off_start - chunk_off_start in
let max_entry_len = of_int chunk_len - shift_in_chunk in
assert (max_entry_len >= Int63.zero);
(entry, shift_in_chunk, max_entry_len)
let poff_of_entry_exn mapping ~off ~len =
let chunk, shift_in_chunk, max_entry_len = chunk_of_off_exn mapping off in
let open Int63.Syntax in
(if len > max_entry_len then
let s =
Fmt.str
"entry (off=%a, len=%a) is supposed to be contained in chunk \
(poff=%a,len=%d) and starting at %a but is larger than it can be\n\
\ contained in chunk" Int63.pp off Int63.pp len Int63.pp chunk.poff
chunk.len Int63.pp shift_in_chunk
in
raise (Errors.Pack_error (`Invalid_prefix_read s)));
chunk.poff + shift_in_chunk
end
module Accessor = struct
let v_in_suffix_exn t ~off ~len =
let open Int63.Syntax in
let entry_end_offset = off + len in
if entry_end_offset > end_offset t then
raise (Errors.Pack_error `Read_out_of_bounds)
else
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }
let v_in_prefix_exn mapping ~off ~len =
let poff = Prefix_arithmetic.poff_of_entry_exn mapping ~off ~len in
{ poff; len; location = Prefix }
let v_exn t ~off ~len =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
if off >= suffix_start_offset then v_in_suffix_exn t ~off ~len
else v_in_prefix_exn (get_mapping t) ~off ~len
let v_range_in_suffix_exn t ~off ~min_len ~max_len =
let len =
let open Int63.Syntax in
let bytes_after_off = end_offset t - off in
if bytes_after_off < min_len then
raise (Errors.Pack_error `Read_out_of_bounds)
else if bytes_after_off > max_len then max_len
else bytes_after_off
in
let poff = Suffix_arithmetic.soff_of_off t off in
{ poff; len; location = Suffix }
let v_range_in_prefix_exn t ~off ~min_len ~max_len =
let mapping = get_mapping t in
let chunk, shift_in_chunk, max_entry_len =
Prefix_arithmetic.chunk_of_off_exn mapping off
in
let open Int63.Syntax in
let len =
if max_entry_len < min_len then
raise (Errors.Pack_error `Read_out_of_bounds)
else if max_entry_len > max_len then max_len
else max_entry_len
in
let poff = chunk.poff + shift_in_chunk in
{ poff; len; location = Prefix }
let v_range_exn t ~off ~min_len ~max_len =
let open Int63.Syntax in
let suffix_start_offset = suffix_start_offset t in
if off >= suffix_start_offset then
v_range_in_suffix_exn t ~off ~min_len ~max_len
else v_range_in_prefix_exn t ~off ~min_len ~max_len
end
let read_exn t { poff; len; location } buf =
[%log.debug
"read_exn in %a at %a for %a" (Irmin.Type.pp location_t) location Int63.pp
poff Int63.pp len];
assert (len <= Int63.of_int Stdlib.max_int);
let len = Int63.to_int len in
match location with
| Prefix -> Io.read_exn (get_prefix t) ~off:poff ~len buf
| Suffix -> Suffix.read_exn (Fm.suffix t.fm) ~off:poff ~len buf
let read_bytes_exn t ~f ~off ~len =
let open Int63.Syntax in
let bytes_in_prefix =
let prefix_bytes_after_off = suffix_start_offset t - off in
if prefix_bytes_after_off <= Int63.zero then Int63.zero
else min len prefix_bytes_after_off
in
let bytes_in_suffix =
if bytes_in_prefix < len then len - bytes_in_prefix else Int63.zero
in
assert (bytes_in_prefix + bytes_in_suffix = len);
let prefix_accessor_opt =
if bytes_in_prefix > Int63.zero then
Some (Accessor.v_exn t ~off ~len:bytes_in_prefix)
else None
in
let suffix_accessor_opt =
if bytes_in_suffix > Int63.zero then
let off = off + bytes_in_prefix in
Some (Accessor.v_exn t ~off ~len:bytes_in_suffix)
else None
in
let max_read_size = 8192 in
let buffer = Bytes.create max_read_size in
let max_read_size = Int63.of_int max_read_size in
let rec aux accessor =
if accessor.len = Int63.zero then ()
else if accessor.len < max_read_size then (
read_exn t accessor buffer;
f (Bytes.sub_string buffer 0 (Int63.to_int accessor.len)))
else
let left, right =
( { accessor with len = max_read_size },
{
accessor with
poff = accessor.poff + max_read_size;
len = accessor.len - max_read_size;
} )
in
read_exn t left buffer;
f (Bytes.to_string buffer);
aux right
in
Option.iter aux prefix_accessor_opt;
Option.iter aux suffix_accessor_opt
let create_accessor_exn t ~off ~len =
let len = Int63.of_int len in
Accessor.v_exn t ~off ~len
let create_accessor_from_range_exn t ~off ~min_len ~max_len =
let min_len = Int63.of_int min_len in
let max_len = Int63.of_int max_len in
Accessor.v_range_exn t ~off ~min_len ~max_len
let create_accessor_to_prefix_exn t ~off ~len =
let len = Int63.of_int len in
Accessor.v_in_prefix_exn t ~off ~len
let shrink_accessor_exn a ~new_len =
let open Int63.Syntax in
let new_len = Int63.of_int new_len in
if new_len > a.len then failwith "shrink_accessor_exn to larger accessor";
{ a with len = new_len }
let create_sequential_accessor_exn location rem_len ~poff ~len =
if len > rem_len then raise (Errors.Pack_error `Read_out_of_bounds)
else { poff; len = Int63.of_int len; location }
let create_sequential_accessor_from_range_exn location rem_len ~poff ~min_len
~max_len =
let len =
if rem_len < min_len then raise (Errors.Pack_error `Read_out_of_bounds)
else if rem_len > max_len then max_len
else rem_len
in
{ poff; len = Int63.of_int len; location }
let create_sequential_accessor_seq t ~ ~ ~read_len
=
let preffix_chunks =
match Fm.mapping t.fm with
| Some mapping ->
let preffix_chunks = ref [] in
Mapping_file.iter mapping (fun ~off ~len ->
preffix_chunks := (off, len) :: !preffix_chunks)
|> Errs.raise_if_error;
List.rev !preffix_chunks
| None -> []
in
let suffix_end_soff = Fm.Suffix.end_soff (Fm.suffix t.fm) in
let suffix_start_offset = suffix_start_offset t in
let get_entry_accessor rem_len location poff =
let accessor =
create_sequential_accessor_from_range_exn location rem_len ~poff
~min_len:min_header_len ~max_len:max_header_len
in
let buf = Bytes.create max_header_len in
read_exn t accessor buf;
let entry_len = read_len buf in
( entry_len,
create_sequential_accessor_exn location rem_len ~poff ~len:entry_len )
in
let rec suffix_accessors soff () =
let open Seq in
let open Int63.Syntax in
if soff >= suffix_end_soff then Nil
else
let rem_len = Int63.to_int (suffix_end_soff - soff) in
let entry_len, accessor = get_entry_accessor rem_len Suffix soff in
let r = (suffix_start_offset + soff, accessor) in
let soff = soff + Int63.of_int entry_len in
let f = suffix_accessors soff in
Cons (r, f)
in
let rec prefix_accessors poff acc () =
let open Seq in
match acc with
| [] -> suffix_accessors Int63.zero ()
| (off, rem_len) :: acc ->
if rem_len <= 0 then prefix_accessors poff acc ()
else
let entry_len, accessor = get_entry_accessor rem_len Prefix poff in
let r = (off, accessor) in
let rem_len = rem_len - entry_len in
let open Int63.Syntax in
let poff = poff + Int63.of_int entry_len in
let off = off + Int63.of_int entry_len in
let f = prefix_accessors poff ((off, rem_len) :: acc) in
Cons (r, f)
in
prefix_accessors Int63.zero preffix_chunks
end