Source file carton_miou_flux.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
let src = Logs.Src.create "carton-miou.flux"
module Log = (val Logs.src_log src : Logs.LOG)
type entry =
[ `Number of int
| `Inflate of (Carton.Kind.t * int) option * string
| `Entry of Carton.First_pass.entry
| `Hash of string ]
type 'acc _first_pass = {
decoder: Carton.First_pass.decoder
; input: Bstr.t
; first: bool
; str: string
; src_off: int
; src_len: int
; acc: 'acc
}
let first_pass ~digest ~ref_length =
let flow (Flux.Sink k) =
let rec until_await_or_peek t =
assert (k.full t.acc = false);
match Carton.First_pass.decode t.decoder with
| `Inflate (payload, decoder) ->
let kind = Carton.First_pass.kind decoder in
let acc = k.push t.acc (`Inflate (kind, payload)) in
let t = { t with decoder; acc } in
if k.full acc then t else until_await_or_peek t
| `Entry (entry, decoder) when t.first ->
let n = Carton.First_pass.number_of_objects decoder in
let acc = t.acc in
let acc = k.push acc (`Number n) in
let t = { t with first= false; decoder; acc } in
if k.full acc then t
else
let acc = k.push acc (`Entry entry) in
let t = { t with acc } in
if k.full acc then t else until_await_or_peek t
| `Entry (entry, decoder) ->
let acc = k.push t.acc (`Entry entry) in
let t = { t with decoder; acc } in
if k.full acc then t else until_await_or_peek t
| `Malformed err -> failwith err
| `End hash ->
let acc = k.push t.acc (`Hash hash) in
{ t with acc }
| `Await decoder ->
if t.src_len = 0 then { t with decoder }
else begin
let len = Int.min (Bstr.length t.input) t.src_len in
let src_off = t.src_off in
Bstr.blit_from_string t.str ~src_off t.input ~dst_off:0 ~len;
let decoder = Carton.First_pass.src decoder t.input 0 len in
let src_off = src_off + len and src_len = t.src_len - len in
let t = { t with decoder; src_off; src_len } in
until_await_or_peek t
end
| `Peek decoder ->
let dst_off = Carton.First_pass.src_rem decoder in
if t.src_len = 0 then { t with decoder }
else begin
let len = Int.min (Bstr.length t.input - dst_off) t.src_len in
let src_off = t.src_off in
Bstr.blit_from_string t.str ~src_off t.input ~dst_off ~len;
let decoder =
Carton.First_pass.src decoder t.input 0 (dst_off + len)
in
let src_off = src_off + len and src_len = t.src_len - len in
let t = { t with decoder; src_off; src_len } in
until_await_or_peek t
end
in
let init () =
let window = De.make_window ~bits:15 in
let allocate _bits = window in
let output = Bstr.create De.io_buffer_size in
let input = Bstr.create De.io_buffer_size in
let decoder =
Carton.First_pass.decoder ~output ~allocate ~ref_length ~digest `Manual
in
let acc = k.init ()
and str = String.empty
and src_off = 0
and src_len = 0
and first = true in
{ decoder; input; acc; first; str; src_off; src_len }
in
let push t str =
if not (k.full t.acc) then
let src_off = 0 and src_len = String.length str in
let t = { t with str; src_off; src_len } in
until_await_or_peek t
else t
in
let full t = k.full t.acc in
let stop t = k.stop t.acc in
Flux.Sink { init; push; full; stop }
in
{ Flux.flow }
type offset = int
type uid = Carton.Uid.t
type size = Carton.Size.t
type crc = Optint.t
type _oracle =
| Oracle : {
children_by_offset: (offset, offset list) Hashtbl.t
; children_by_uid: (uid, offset list) Hashtbl.t
; sizes: (offset, size ref) Hashtbl.t
; where: (offset, int) Hashtbl.t
; cursors: (int, offset) Hashtbl.t
; crcs: (offset, crc) Hashtbl.t
; bases: (int, offset) Hashtbl.t
; index: (uid, offset) Hashtbl.t
; ref_index: (offset, uid) Hashtbl.t
; mutable number_of_objects: int
; mutable hash: string
; mutable ctx: 'ctx option
; identify: 'ctx Carton.First_pass.identify
}
-> _oracle
let oracle ~identify =
let update_size (Oracle t) ~parent offset (size : Carton.Size.t) =
let cell : Carton.Size.t ref = Hashtbl.find t.sizes parent in
(cell := Carton.Size.(max !cell size));
Hashtbl.replace t.sizes offset cell
in
let new_child (Oracle t) ~parent child =
match parent with
| `Ofs parent -> begin
match Hashtbl.find_opt t.children_by_offset parent with
| None -> Hashtbl.add t.children_by_offset parent [ child ]
| Some offsets ->
Hashtbl.replace t.children_by_offset parent (child :: offsets)
end
| `Ref parent -> begin
match Hashtbl.find_opt t.children_by_uid parent with
| None -> Hashtbl.add t.children_by_uid parent [ child ]
| Some offsets ->
Hashtbl.replace t.children_by_uid parent (child :: offsets)
end
in
let init () =
let children_by_offset = Hashtbl.create 0x7ff in
let children_by_uid = Hashtbl.create 0x7ff in
let sizes = Hashtbl.create 0x7ff in
let where = Hashtbl.create 0x7ff in
let cursors = Hashtbl.create 0x7ff in
let crcs = Hashtbl.create 0x7ff in
let bases = Hashtbl.create 0x7ff in
let index = Hashtbl.create 0x7ff in
let ref_index = Hashtbl.create 0x7ff in
let number_of_objects = 0 in
let hash = String.empty in
let ctx = None in
Oracle
{
children_by_offset
; children_by_uid
; sizes
; where
; cursors
; crcs
; bases
; index
; ref_index
; number_of_objects
; hash
; ctx
; identify
}
in
let push (Oracle t as oracle) entry =
let () =
match entry with
| `Number n -> t.number_of_objects <- n
| `Hash value -> t.hash <- value
| `Inflate (None, _) -> ()
| `Inflate (Some (k, size), str) -> begin
let open Carton in
let open First_pass in
match t.ctx with
| None ->
let ctx0 = t.identify.init k (Carton.Size.of_int_exn size) in
let ctx0 = t.identify.feed (Bstr.of_string str) ctx0 in
t.ctx <- Some ctx0
| Some ctx0 ->
let ctx0 = t.identify.feed (Bstr.of_string str) ctx0 in
t.ctx <- Some ctx0
end
| `Entry entry -> begin
let offset = entry.Carton.First_pass.offset in
let size = entry.Carton.First_pass.size in
let crc = entry.Carton.First_pass.crc in
Hashtbl.add t.where offset entry.number;
Hashtbl.add t.cursors entry.number offset;
Hashtbl.add t.crcs offset crc;
match entry.Carton.First_pass.kind with
| Carton.First_pass.Base kind ->
Hashtbl.add t.sizes offset (ref size);
Hashtbl.add t.bases entry.number offset;
let uid =
match Option.map t.identify.serialize t.ctx with
| Some uid -> uid
| None ->
let size = entry.Carton.First_pass.size in
let ctx = t.identify.init kind size in
t.identify.serialize ctx
in
t.ctx <- None;
Hashtbl.add t.index uid offset
| Ofs { sub; source; target; _ } ->
let abs_parent = offset - sub in
update_size oracle ~parent:abs_parent offset
(Carton.Size.max target source);
new_child oracle ~parent:(`Ofs abs_parent) offset
| Ref { ptr; source; target; _ } ->
let () =
match Hashtbl.find_opt t.index ptr with
| Some parent ->
update_size oracle ~parent offset
(Carton.Size.max source target)
| None ->
Hashtbl.add t.sizes offset
(ref (Carton.Size.max source target))
in
Hashtbl.add t.ref_index offset ptr;
new_child oracle ~parent:(`Ref ptr) offset
end
in
oracle
in
let full = Fun.const false in
let stop (Oracle t as oracle) =
Hashtbl.iter
(fun offset ptr ->
match Hashtbl.find_opt t.index ptr with
| Some parent ->
update_size oracle ~parent offset !(Hashtbl.find t.sizes offset)
| None -> ())
t.ref_index;
let children ~cursor ~uid =
match
( Hashtbl.find_opt t.children_by_offset cursor
, Hashtbl.find_opt t.children_by_uid uid )
with
| Some (_ :: _ as children), (Some [] | None) -> children
| (Some [] | None), Some (_ :: _ as children) -> children
| (None | Some []), (None | Some []) -> []
| Some lst0, Some lst1 ->
List.(sort_uniq Int.compare (rev_append lst0 lst1))
in
let where ~cursor = Hashtbl.find t.where cursor in
let size ~cursor = !(Hashtbl.find t.sizes cursor) in
let checksum ~cursor = Hashtbl.find t.crcs cursor in
let is_base ~pos = Hashtbl.find_opt t.bases pos in
let cursor ~pos = Hashtbl.find t.cursors pos in
{
Carton.identify= Carton.Identify identify
; children
; where
; cursor
; size
; checksum
; is_base
; number_of_objects= t.number_of_objects
; hash= t.hash
}
in
Flux.Sink { init; push; full; stop }
type base = { value: Carton.Value.t; uid: Carton.Uid.t; depth: int }
let identify (Carton.Identify gen) ~kind ~len bstr =
let ctx = gen.Carton.First_pass.init kind (Carton.Size.of_int_exn len) in
let ctx = gen.Carton.First_pass.feed (Bigarray.Array1.sub bstr 0 len) ctx in
gen.Carton.First_pass.serialize ctx
let rec resolve_tree q t oracle matrix ~base = function
| [||] -> ()
| [| cursor |] ->
let value = Carton.of_offset_with_source t base.value ~cursor in
let len = Carton.Value.length value
and bstr = Carton.Value.bigstring value
and kind = Carton.Value.kind value in
let uid = identify oracle.Carton.identify ~kind ~len bstr
and pos = oracle.where ~cursor
and crc = oracle.checksum ~cursor
and depth = succ base.depth in
let copy = Bstr.(copy (sub bstr ~off:0 ~len)) in
Flux.Bqueue.put q (Carton.Value.make ~kind ~depth copy, cursor, uid);
matrix.(pos) <-
Carton.Resolved_node { cursor; uid; crc; kind; depth; parent= base.uid };
let children = oracle.children ~cursor ~uid in
let children = Array.of_list children in
let value = Carton.Value.flip value in
let base = { value; uid; depth } in
resolve_tree q t oracle matrix ~base children
| cursors ->
let source = Carton.Value.source base.value in
let source = Bstr.copy source in
let rec go idx =
if idx < Array.length cursors then begin
let cursor = cursors.(idx) in
let dirty = Carton.Value.source base.value in
let src = Carton.Value.with_source ~source base.value in
let value = Carton.of_offset_with_source t src ~cursor in
let len = Carton.Value.length value
and bstr = Carton.Value.bigstring value
and kind = Carton.Value.kind value in
let uid = identify oracle.Carton.identify ~kind ~len bstr
and pos = oracle.where ~cursor
and crc = oracle.checksum ~cursor
and depth = succ base.depth in
let copy = Bstr.(copy (sub bstr ~off:0 ~len)) in
Flux.Bqueue.put q (Carton.Value.make ~kind ~depth copy, cursor, uid);
matrix.(pos) <-
Resolved_node { cursor; uid; crc; kind; depth; parent= base.uid };
let children = oracle.children ~cursor ~uid in
let children = Array.of_list children in
let value = Carton.Value.with_source ~source:dirty value in
let value = Carton.Value.flip value in
let base = { value; uid; depth } in
resolve_tree q t oracle matrix ~base children;
go (succ idx)
end
in
go 0
let is_unresolved_base = function
| Carton.Unresolved_base _ -> true
| _ -> false
let verify ?(threads = 4) q t oracle matrix =
let mutex = Miou.Mutex.create () in
let idx = Atomic.make 0 in
let rec fn t =
let pos =
Miou.Mutex.protect mutex @@ fun () ->
while
Atomic.get idx < Array.length matrix
&& is_unresolved_base matrix.(Atomic.get idx) = false
do
Atomic.incr idx
done;
Atomic.fetch_and_add idx 1
in
Log.debug (fun m -> m "Resolve object %d/%d" pos (Array.length matrix));
if pos < Array.length matrix then begin
let[@warning "-8"] (Carton.Unresolved_base { cursor }) = matrix.(pos) in
let size = oracle.Carton.size ~cursor in
let blob = Carton.Blob.make ~size in
let value = Carton.of_offset t blob ~cursor in
let len = Carton.Value.length value
and bstr = Carton.Value.bigstring value
and kind = Carton.Value.kind value in
let uid = identify oracle.Carton.identify ~kind ~len bstr
and crc = oracle.checksum ~cursor in
let copy = Bstr.(copy (sub bstr ~off:0 ~len)) in
Flux.Bqueue.put q (Carton.Value.make ~kind copy, cursor, uid);
matrix.(pos) <- Resolved_base { cursor; uid; crc; kind };
let children = oracle.children ~cursor ~uid in
let children = Array.of_list children in
let base = Carton.{ value= Value.flip value; uid; depth= 1 } in
resolve_tree q t oracle matrix ~base children;
fn t
end
in
let init _thread = Carton.copy t in
let results =
if threads > 0 then Miou.parallel fn (List.init threads init)
else try fn t; [ Ok () ] with exn -> [ Error exn ]
in
List.iter (function Ok () -> () | Error exn -> raise exn) results
let entries ?threads pack oracle =
Flux.Source.with_task ~size:0x7ff @@ fun q ->
let matrix =
Array.init oracle.Carton.number_of_objects @@ fun pos ->
match oracle.is_base ~pos with
| Some cursor -> Carton.Unresolved_base { cursor }
| None -> Unresolved_node { cursor= oracle.cursor ~pos }
in
verify ?threads q pack oracle matrix;
Flux.Bqueue.close q