123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498(*
* Copyright (c) 2018-2022 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)(** An implementation of "external sorting" (sorting on-disk data) and various
other related routines.
Most of these routines work with mmap-ed data, as a one dimensional array of
integers, where each pair of integers represents a [(key,value)] pair.
These routines exist to support the implementation of the sparse file. The
documentation in the sparse file should also be read.
Usage: We start with a file containing [(off,len)] pairs. These describe
which regions of a file contain data that we need when creating a sparse
file. We first sort these by offset, using {!sort}. We then combine adjacent
extents using {!calculate_extents_oc}. For example, a region [(10,10)] and a
region [(20,10)] will be combined into the single extent [(10,20)]. When
combining extents, we also want to allow some flexibility if two regions are
"almost adjacent". For example, a region [(10,10)] and a region [(21,10)]
will be combined into the single extent [(10,21)], even though there is a
single byte at offset 20 that we do not actually need. The parameter
[gap_tolerance] defines how large this gap between regions can be for them
to be combined in this way. The reason for doing this is that we want the
sparse file to have a small map if possible, and we are happy to include
some unneeded data in the sparse data file if this will make the map
smaller. *)open!ImportincludeMapping_file_intfmoduleBigArr1=Bigarray.Array1typeint_bigarray=(int,Bigarray.int_elt,Bigarray.c_layout)Bigarray.Array1.ttypeint64_bigarray=(int64,Bigarray.int64_elt,Bigarray.c_layout)Bigarray.Array1.t(* each entry consists of [step] ints; there is the possibility to generalize to
arbitrary step sizes, but the following code always works with (key,value) pairs, ie
step size is 2 *)letstep_2=2(* Should be a multiple of 2 *)letchunk_sz=1_000_000/8(* Set to 0 until we find decide what to do about sequential traversal of pack files *)letgap_tolerance=0moduleInt_mmap:sigtypet=private{fn:string;fd:Unix.file_descr;mutablearr:int_bigarray;}valcreate:fn:string->sz:int->tvalopen_ro:fn:string->sz:int->t(** NOTE [open_ro ~fn ~sz] can use [sz=-1] to open with size based on the size
of the underlying file *)valclose:t->unitend=structtypet={fn:string;fd:Unix.file_descr;mutablearr:int_bigarray}(* NOTE both following are shared *)letcreate~fn~sz=letshared=trueinassert((not(Sys.file_existsfn))||(Printf.printf"File exists: %s\n%!"fn;false));letfd=Unix.(openfilefn[O_CREAT;O_RDWR;O_TRUNC;O_EXCL;O_CLOEXEC]0o660)inletarr=letopenBigarrayinUnix.map_filefdIntc_layoutshared[|sz|]|>array1_of_genarrayin{fn;fd;arr}(* NOTE sz=-1 is recognized by [map_file] as "derive from size of file"; if we want a
different size (eg because we want the file to grow) we can provide it explicitly *)letopen_ro~fn~sz=letshared=falseinassert(Sys.file_existsfn);letfd=Unix.(openfilefn[O_RDONLY]0o660)inletarr=letopenBigarrayinUnix.map_filefdIntc_layoutshared[|sz|]|>array1_of_genarrayin{fn;fd;arr}letcloset=Unix.closet.fd;(* following tries to make the array unreachable, so GC'able; however, no guarantee
that arr actually is unreachable *)t.arr<-Bigarray.(Array1.createIntc_layout0);()endmoduleInt64_mmap:sigtypet=private{fn:string;fd:Unix.file_descr;mutablearr:int64_bigarray;}valopen_ro:fn:string->sz:int->t(** NOTE [open_ ~fn ~sz] can use [sz=-1] to open with size based on the size
of the underlying file *)valclose:t->unitend=structtypet={fn:string;fd:Unix.file_descr;mutablearr:int64_bigarray}(* NOTE sz=-1 is recognized by [map_file] as "derive from size of file"; if we want a
different size (eg because we want the file to grow) we can provide it explicitly *)letopen_ro~fn~sz=letshared=falseinassert(Sys.file_existsfn);letfd=Unix.(openfilefn[O_RDONLY]0o660)inletarr=letopenBigarrayinUnix.map_filefdInt64c_layoutshared[|sz|]|>array1_of_genarrayin{fn;fd;arr}letcloset=Unix.closet.fd;(* following tries to make the array unreachable, so GC'able; however, no guarantee
that arr actually is unreachable *)t.arr<-Bigarray.(Array1.createInt64c_layout0);()end(** [sort_chunks ~arr] sorts each chunk in the bigarray [arr].
The [arr] should contain [(k,v)] integer pairs stored successively in the
array. The last chunk may have size less than [chunk_sz] - we don't require
the [arr] to be sized as a multiple of [chunk_sz].
The implementation reads chunk-sized amounts of ints into memory as a list
of tuples, sorts the list, and writes the list back out.
[chunk_sz] is the number of ints that are kept in memory, and so the overall
memory usage is something like [8 * chunk_sz] (with some overhead for the
list.. FIXME perhaps an array would be better) *)letsort_chunks~(arr:int_bigarray)=letarr_sz=Bigarray.Array1.dimarrin0|>iter_k(fun~k:kont1off->matchoff>arr_szwith|true->()|false->letsz=minchunk_sz(arr_sz-off)in(* read in as a list; we may prefer to sort an array instead *)assert(szmodstep_2=0);letxs=List.init(sz/step_2)(funi->(arr.{off+(2*i)},arr.{off+(2*i)+1}))in(* sort list *)letxs=List.sort(fun(k,_)(k',_)->Int.comparekk')xsin(* write back out *)let_write_out=(xs,off)|>iter_k(fun~k:kont2(xs,off)->matchxswith|[]->()|(k,v)::rest->arr.{off}<-k;arr.{off+1}<-v;kont2(rest,off+2))in(* do next chunk *)kont1(off+chunk_sz));()(* [merge_chunks ~src ~dst] takes previously sorted chunks of [(k,v)] data in
[src] and performs an n-way merge into [dst]. *)letmerge_chunks~(src:int_bigarray)~(dst:int_bigarray)=letsrc_sz,dst_sz=(BigArr1.dimsrc,BigArr1.dimdst)inlet_initial_checks=assert(step_2=2);assert(chunk_szmodstep_2=0);assert(dst_sz>=src_sz);()in(* form subarrays of size [chunk_sz] from [src] *)letxs=(0,[])|>iter_k(fun~k(off,xs)->matchoff<src_szwith|false->xs|true->letarr=BigArr1.subsrcoff(minchunk_sz(src_sz-off))ink(off+chunk_sz,arr::xs))in(* for each subarr, we start at position 0, and successively move through the array
until the end; we keep the tuple (arr.{off}, off, arr) in a priority queue *)letopenstructtypepos_in_arr={key:int;off:int;arr:int_bigarray}(* Q stands for "priority queue" *)moduleQ=Binary_heap.Make(structtypet=pos_in_arrletcomparexy=comparex.keyy.keyend)endinletxs=xs|>List.map(funarr->{key=arr.{0};off=0;arr})in(* form priority queue *)letq=letq=Q.create~dummy:{key=0;off=0;arr=BigArr1.subsrc00}(List.lengthxs)inlet_=xs|>List.iter(funx->Q.addqx)inqin(* now repeatedly pull the min elt from q, put corresponding entry in dst, advance elt
offset and put elt back in q *)letdst_off=0|>iter_k(fun~kdst_off->matchQ.is_emptyqwith|true->(* return so we can check it is what we think it should be *)dst_off|false->(let{key;off;arr}=Q.pop_minimumqinletv=arr.{off+1}indst.{dst_off}<-key;dst.{dst_off+1}<-v;matchoff+2<BigArr1.dimarrwith|true->letoff=off+2inQ.addq{key=arr.{off};off;arr};k(dst_off+2)|false->(* finished with this chunk *)k(dst_off+2)))inassert(dst_off=src_sz);()(** [sort ~src ~dst] sorts the [src] array of [(k,v)] pairs and places the
result in [dst]. [src] and [dst] must be disjoint. [dst] must be large
enough to hold the result. The data is sorted in chunks; [chunk_sz] is the
number of ints that are kept in memory when sorting each chunk. *)(** [sort ~src ~dst] sorts the (key,value) integer data in [src] and places it
in [dst] ([src] and [dst] must be disjoint); [chunk_sz] is the number of
integers that are held in memory when sorting each chunk. *)letsort~(src:int_bigarray)~(dst:int_bigarray)=sort_chunks~arr:src;merge_chunks~src~dst;()(** [calculate_extents_oc ~src_is_sorted ~src ~dst] uses the sorted reachability
data in [src] and outputs extent data on [dst]. [gap_tolerance] specifies
how much gap between two extents is allowed for them to be combined into a
single extent. *)(** [calculate_extents_oc ~src_is_sorted ~src ~dst] takes {b sorted} [(off,len)]
data from [src], combines adjacent extents, and outputs a minimal set of
(sorted) extents to [dst:out_channel]; the return value is the length of the
part of [dst] that was filled. [gap_tolerance] is used to provide some
looseness when combining extents: if the next extent starts within
[gap_tolerance] of the end of the previous extent, then it is combined with
the previous (the data in the gap, which is not originally part of an
extent, will be counted as part of the resulting extent). This can reduce
the number of extents significantly, at a cost of including gaps where the
data is not actually needed. *)letcalculate_extents_oc~(src_is_sorted:unit)~(src:int_bigarray)~(register_entry:off:int->len:int->unit):unit=ignoresrc_is_sorted;letsrc_sz=BigArr1.dimsrcinlet_=assert(src_sz>=2);assert(src_szmodstep_2=0);()inletoff,len=(src.{0},src.{1})inletregions_combined=ref0inletdst_off=(* iterate over entries in src, combining adjacent entries *)(2,off,len)|>iter_k(fun~k(src_off,off,len)->matchsrc_off>=src_szwith|true->(* write out "current" extent *)register_entry~off~len;()|false->((* check if we can combine the next region *)letoff',len'=(src.{src_off},src.{src_off+1})inassert(off<=off');matchoff'<=off+len+gap_tolerancewith|false->(* we can't, so write out current extent and move to next *)register_entry~off~len;k(src_off+2,off',len')|true->(* we can combine *)incrregions_combined;assert(off<=off');(* offs are sorted *)letlen=maxlen(off'+len'-off)ink(src_off+2,off,len)))indst_offmoduleMake(Io:Io.S)=structmoduleIo=IomoduleErrs=Io_errors.Make(Io)moduleAo=Append_only_file.Make(Io)(Errs)typet={arr:int64_bigarray;root:string;generation:int}letopen_map~root~generation=letpath=Irmin_pack.Layout.V4.mapping~generation~rootinmatchIo.classify_pathpathwith|`File->(letmmap=Int64_mmap.open_ro~fn:path~sz:(-1)inletarr=mmap.arrinletlen=BigArr1.dimarrinmatchlen>0&&lenmod3=0with|true->Int64_mmap.closemmap;Ok{root;generation;arr}|false->Error(`Corrupted_mapping_file(__FILE__^": mapping mmap size did not meet size requirements")))|_->Error`No_such_file_or_directoryletcreate?report_file_sizes~root~generation~register_entries()=assert(generation>0);letopenResult_syntaxinletpath0=Irmin_pack.Layout.V4.reachable~generation~rootinletpath1=Irmin_pack.Layout.V4.sorted~generation~rootinletpath2=Irmin_pack.Layout.V4.mapping~generation~rootinlet*()=ifSys.word_size<>64thenError`Gc_forbidden_on_32bit_platformselseOk()in(* Unlink the 3 files and ignore errors (typically no such file) *)Io.unlinkpath0|>ignore;Io.unlinkpath1|>ignore;Io.unlinkpath2|>ignore;(* Create [file0] *)let*file0=Ao.create_rw~path:path0~overwrite:true~auto_flush_threshold:1_000_000~auto_flush_procedure:`Internalin(* Fill and close [file0] *)letregister_entry~off~len=(* Write [off, len] in native-endian encoding because it will be read
with mmap. *)letbuffer=Bytes.create16inBytes.set_int64_nebuffer0(Int63.to_int64off);Bytes.set_int64_nebuffer8(Int64.of_intlen);(* Bytes.unsafe_to_string usage: buffer is uniquely owned; we assume
Bytes.set_int64_ne returns unique ownership; we give up ownership of buffer in
conversion to string. This is safe. *)Ao.append_exnfile0(Bytes.unsafe_to_stringbuffer)inlet*()=Errs.catch(fun()->register_entries~register_entry)inlet*()=Ao.flushfile0inlet*()=Ao.closefile0in(* Reopen [file0] but as an mmap, create [file1] and fill it. *)letfile0=Int_mmap.open_ro~fn:path0~sz:(-1)inletsz=BigArr1.dimfile0.Int_mmap.arrinletfile1=Int_mmap.create~fn:path1~szinlet*()=Errs.catch(fun()->sort~src:file0.arr~dst:file1.arr)in(* Close and unlink [file0] *)Int_mmap.closefile0;let*reachable_size=Io.size_of_pathpath0inIo.unlinkpath0|>ignore;(* Create [file2] *)let*file2=Ao.create_rw~path:path2~overwrite:true~auto_flush_threshold:1_000_000~auto_flush_procedure:`Internalin(* Fill and close [file2] *)letpoff=ref0inletencodei=letbuf=Bytes.create8inBytes.set_int64_lebuf0(Int64.of_inti);(* Bytes.unsafe_to_string is safe since [buf] will not be modified after
this function returns. We give up ownership. *)Bytes.unsafe_to_stringbufinletregister_entry~off~len=Ao.append_exnfile2(encodeoff);Ao.append_exnfile2(encode!poff);Ao.append_exnfile2(encodelen);poff:=!poff+leninlet*()=Errs.catch(fun()->calculate_extents_oc~src_is_sorted:()~src:file1.arr~register_entry)inlet*()=Ao.flushfile2inlet*()=Ao.fsyncfile2inletmapping_size=Ao.end_pofffile2inlet*()=Ao.closefile2in(* Close and unlink [file1] *)Int_mmap.closefile1;let*sorted_size=Io.size_of_pathpath1inOption.iter(funf->f(reachable_size,sorted_size,mapping_size))report_file_sizes;Io.unlinkpath1|>ignore;(* Open created map *)open_map~root~generationletentry_countarr=BigArr1.dimarr/3letentry_idxi=i*3letconv_int64:int64->int=funi->(ifSys.big_endianthen((* We are currently on a BE platform but the ints are encoded as LE in the
file. We've just read a LE int using a BE decoding scheme. Let's fix
this.
The first step is to set [buf] to contain exactly what is stored on
disk. Since the current platform is BE, we've interpreted what was
written on disk using a BE decoding scheme. To do the opposite operation
we must use a BE encoding scheme, hence [set_int64_be].
Now that [buf] mimics what was on disk, the second step consist of
decoding it using a LE function, hence [get_int64_le]. *)letbuf=Bytes.create8inBytes.set_int64_bebuf0i;Bytes.get_int64_lebuf0)elsei)|>Int64.to_intletentry_offarri=arr.{entry_idxi}|>conv_int64|>Int63.of_intletentry_poffarri=arr.{entry_idxi+1}|>conv_int64|>Int63.of_intletentry_lenarri=arr.{entry_idxi+2}|>conv_int64letiter_exn{arr;_}f=fori=0toentry_countarr-1dof~off:(entry_offarri)~len:(entry_lenarri)doneletitertf=Errs.catch(fun()->iter_exntf;())typeentry={off:int63;poff:int63;len:int}letfind_nearest_leq{arr;_}off=letgetarri=arr.{entry_idxi}|>conv_int64inmatchUtils.nearest_leq~arr~get~lo:0~hi:(entry_countarr-1)~key:(Int63.to_intoff)with|`All_gt_key->None|`Somei->letoff=entry_offarriinletpoff=entry_poffarriinletlen=entry_lenarriinSome{off;poff;len}end