123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361openCartontype('uid,'s)light_load='uid->(kind*int,'s)iotype('uid,'s)heavy_load='uid->(Dec.v,'s)iotypeoptint=Optint.tletblit_from_stringsrcsrc_offdstdst_offlen=Bigstringaf.blit_from_stringsrc~src_offdst~dst_off~len[@@inline]letsrc=Logs.Src.create"thin"moduleLog=(valLogs.src_logsrc:Logs.LOG)exceptionExistsmoduleMake(Scheduler:SCHEDULER)(IO:IOwithtype'at='aScheduler.s)(Uid:UID)=structlet(>>=)xf=IO.bindxfletreturnx=IO.returnxlet(>>?)xf=x>>=functionOkx->fx|Error_aserr->returnerrletsched=letopenSchedulerin{Carton.bind=(funxf->inj(prjx>>=funx->prj(fx)));Carton.return=(funx->inj(returnx));}letreadstream=letke=Ke.Rke.create~capacity:0x1000Bigarray.charinletrecgofilledinputs=matchKe.Rke.N.peekkewith|[]->(stream()>>=function|Some(src,off,len)->Ke.Rke.N.pushke~blit:blit_from_string~length:String.length~off~lensrc;gofilledinputs|None->returnfilled)|src::_->letsrc=Cstruct.of_bigarraysrcinletlen=min(Cstruct.leninputs)(Cstruct.lensrc)inCstruct.blitsrc0inputs0len;Ke.Rke.N.shift_exnkelen;iflen<Cstruct.leninputsthengo(filled+len)(Cstruct.shiftinputslen)elsereturn(filled+len)infunfilledinputs->gofilledinputsmoduleVerify=Carton.Dec.Verify(Uid)(Scheduler)(IO)moduleFp=Carton.Dec.Fp(Uid)letfirst_pass~zl_buffer~digeststream=letfl_buffer=Cstruct.createDe.io_buffer_sizeinletzl_window=De.make_window~bits:15inletallocate_=zl_windowinletread_cstruct=readstreaminletread_bytes()buf~off~len=letrecgorestraw=ifrest<=0then(Cstruct.blit_to_bytesfl_buffer0bufofflen;return(absrest+len))elseread_cstruct0raw>>=function|0->(* TODO(dinosaure): end of flow, add a test. *)return(len-rest)|filled->go(rest-filled)(Cstruct.shiftrawfilled)ingolenfl_bufferinletread_bytes()buf~off~len=Scheduler.inj(read_bytes()buf~off~len)inFp.check_headerschedread_bytes()|>Scheduler.prj>>=fun(max,_,len)->letdecoder=Fp.decoder~o:zl_buffer~allocate`Manualinletdecoder=Fp.srcdecoder(Cstruct.to_bigarrayfl_buffer)0leninletchildren=Hashtbl.create0x100inletwhere=Hashtbl.create0x100inletweight=Hashtbl.create0x100inletchecks=Hashtbl.create0x100inletmatrix=Array.makemaxVerify.unresolved_nodeinletreplacehashtblkv=tryletv'=Hashtbl.findhashtblkinifv<v'thenHashtbl.replacehashtblkv'withNot_found->Hashtbl.addhashtblkvinletrecgodecoder=matchFp.decodedecoderwith|`Awaitdecoder->read_cstruct0fl_buffer>>=funlen->Log.debug(funm->m"Refill the first-pass state with %d byte(s)."len);go(Fp.srcdecoder(Cstruct.to_bigarrayfl_buffer)0len)|`Peekdecoder->(* XXX(dinosaure): [Fp] does the compression. *)letkeep=Fp.src_remdecoderinread_cstruct0(Cstruct.shiftfl_bufferkeep)>>=funlen->go(Fp.srcdecoder(Cstruct.to_bigarrayfl_buffer)0(keep+len))|`Entry({Fp.kind=Base_;offset;size;crc;_},decoder)->letn=Fp.countdecoder-1inLog.debug(funm->m"[+] base object (%d) (%Ld)."noffset);replaceweightoffsetsize;Hashtbl.addwhereoffsetn;Hashtbl.addchecksoffsetcrc;matrix.(n)<-Verify.unresolved_base~cursor:offset;godecoder|`Entry({Fp.kind=Ofs{sub=s;source;target};offset;crc;_},decoder)->letn=Fp.countdecoder-1inLog.debug(funm->m"[+] ofs object (%d) (%Ld)."noffset);replaceweightInt64.(suboffset(Int64.of_ints))source;replaceweightoffsettarget;Hashtbl.addwhereoffsetn;Hashtbl.addchecksoffsetcrc;(tryletvs=Hashtbl.findchildren(`OfsInt64.(suboffset(of_ints)))inHashtbl.replacechildren(`OfsInt64.(suboffset(of_ints)))(offset::vs)withNot_found->Hashtbl.addchildren(`OfsInt64.(suboffset(of_ints)))[offset]);godecoder|`Entry({Fp.kind=Ref{ptr;target;source};offset;crc;_},decoder)->letn=Fp.countdecoder-1inLog.debug(funm->m"[+] ref object (%d) (%Ld) (weight: %d)."noffset(Stdlib.maxtargetsource:>int));replaceweightoffset(Stdlib.maxtargetsource);Hashtbl.addwhereoffsetn;Hashtbl.addchecksoffsetcrc;(tryletvs=Hashtbl.findchildren(`Refptr)inHashtbl.replacechildren(`Refptr)(offset::vs)withNot_found->Hashtbl.addchildren(`Refptr)[offset]);godecoder|`Enduid->return(Okuid)|`Malformederr->Log.err(funm->m"Got an error: %s."err);return(Error(`Msgerr))ingodecoder>>?funuid->Log.debug(funm->m"First pass on incoming PACK file is done.");return(Ok({Carton.Dec.where=(fun~cursor->Hashtbl.findwherecursor);children=(fun~cursor~uid->match(Hashtbl.find_optchildren(`Ofscursor),Hashtbl.find_optchildren(`Refuid))with|Somea,Someb->List.sort_uniqcompare(a@b)|Somex,None|None,Somex->x|None,None->[]);digest;weight=(fun~cursor->Hashtbl.findweightcursor);},matrix,where,checks,children,uid))type('t,'path,'fd,'error)fs={create:'t->'path->('fd,'error)resultIO.t;append:'t->'fd->string->unitIO.t;map:'t->'fd->pos:int64->int->Bigstringaf.t;close:'t->'fd->(unit,'error)resultIO.t;}moduleSet=Set.Make(Uid)letzipab=ifArray.lengtha<>Array.lengthbtheninvalid_arg"zip: lengths mismatch";Array.init(Array.lengtha)(funi->a.(i),b.(i))letsharel0l1=tryList.iter(fun(v,_)->ifList.exists(Int64.equalv)l1thenraiseExists)l0;falsewithExists->trueletverify?(threads=4)~digesttpath{create;append;map;close}stream=letzl_buffer=De.bigstring_createDe.io_buffer_sizeinletallocatebits=De.make_window~bitsinletweight=ref0Lincreatetpath>>?funfd->letstream()=stream()>>=function|Some(buf,off,len)asres->appendtfd(String.subbufofflen)>>=fun()->weight:=Int64.add!weight(Int64.of_intlen);returnres|none->returnnoneinLog.debug(funm->m"Start to analyse the PACK file.");first_pass~zl_buffer~digeststream>>?fun(oracle,matrix,where,checks,children,uid)->letweight=!weightinletpack=Carton.Dec.makefd~allocate~z:zl_buffer~uid_ln:Uid.length~uid_rw:Uid.of_raw_string(fun_->assertfalse)inletmapfd~poslen=letlen=minlenInt64.(to_int(subweightpos))inmaptfd~posleninLog.debug(funm->m"Start to verify incoming PACK file (second pass).");Verify.verify~threadspack~map~oracle~matrix>>=fun()->Log.debug(funm->m"Second pass on incoming PACK file is done.");letoffsets=Hashtbl.fold(funk_a->k::a)where[]|>List.sortInt64.compare|>Array.of_listinletunresolveds,resolveds=letfold(unresolveds,resolveds)(offset,status)=ifVerify.is_resolvedstatusthenletuid=Verify.uid_of_statusstatusinletcrc=Hashtbl.findchecksoffsetinunresolveds,{Carton.Dec.Idx.crc;offset;uid}::resolvedselseletcrc=Hashtbl.findchecksoffsetin(offset,crc)::unresolveds,resolvedsinArray.fold_leftfold([],[])(zipoffsetsmatrix)inletrequireds=Hashtbl.fold(funkvsa->matchkwith|`Ofs_->a|`Refuid->ifshareunresolvedsvsthenSet.adduidaelsea)childrenSet.emptyinclosetfd>>?fun()->Log.debug(funm->m"PACK file verified (%d resolved object(s), %d unresolved object(s))"(List.lengthresolveds)(List.lengthunresolveds));return(Ok(Hashtbl.lengthwhere,Set.elementsrequireds,unresolveds,resolveds,weight,uid))letfind_=assertfalseletvuid={Carton.Enc.uid_ln=Uid.length;Carton.Enc.uid_rw=Uid.to_raw_string}typenonreclight_load=(Uid.t,Scheduler.t)light_loadtypenonrecheavy_load=(Uid.t,Scheduler.t)heavy_loadletcanonicalize~light_load~heavy_load~src~dstt{create;append;close;map;_}nuidsweight=letb={Carton.Enc.o=Bigstringaf.createDe.io_buffer_size;Carton.Enc.i=Bigstringaf.createDe.io_buffer_size;Carton.Enc.q=De.Queue.create0x10000;Carton.Enc.w=De.make_window~bits:15;}inletctx=refUid.emptyinletcursor=ref0Linletlight_loaduid=Scheduler.prj(light_loaduid)increatetdst>>?funfd->letheader=Bigstringaf.create12inCarton.Enc.header_of_pack~length:(n+List.lengthuids)header012;lethdr=Bigstringaf.to_stringheaderinappendtfdhdr>>=fun()->ctx:=Uid.feed!ctxheader;cursor:=Int64.add!cursor12L;letencode_baseuid=light_loaduid>>=fun(kind,length)->letentry=Carton.Enc.make_entry~kind~lengthuidinletanchor=!cursorinletcrc=refCheckseum.Crc32.defaultinCarton.Enc.entry_to_targetsched~load:heavy_loadentry|>Scheduler.prj>>=funtarget->Carton.Enc.encode_targetsched~b~find~load:heavy_load~uid:vuidtarget~cursor:(Int64.to_intanchor)|>Scheduler.prj>>=fun(len,encoder)->letrecgoencoder=matchCarton.Enc.N.encode~o:b.oencoderwith|`Flush(encoder,len)->appendtfd(Bigstringaf.substringb.o~off:0~len)>>=fun()->ctx:=Uid.feed!ctx~off:0~lenb.o;crc:=Checkseum.Crc32.digest_bigstringb.o0len!crc;cursor:=Int64.add!cursor(Int64.of_intlen);letencoder=Carton.Enc.N.dstencoderb.o0(Bigstringaf.lengthb.o)ingoencoder|`End->return{Carton.Dec.Idx.crc=!crc;offset=anchor;uid}inappendtfd(Bigstringaf.substringb.o~off:0~len)>>=fun()->ctx:=Uid.feed!ctx~off:0~lenb.o;crc:=Checkseum.Crc32.digest_bigstringb.o0len!crc;cursor:=Int64.add!cursor(Int64.of_intlen);letencoder=Carton.Enc.N.dstencoderb.o0(Bigstringaf.lengthb.o)ingoencoderinletrecgoacc=function|[]->return(List.revacc)|uid::uids->encode_baseuid>>=funentry->go(entry::acc)uidsingo[]uids>>=funentries->letshift=Int64.sub!cursor12Linlettop=Int64.subweight(Int64.of_intUid.length)inletrecgosrcpos=letmax=Int64.subtopposinletlen=minmax(Int64.mul1024L1024L)inletlen=Int64.to_intleninletraw=maptsrc~posleninappendtfd(Bigstringaf.to_stringraw)>>=fun()->ctx:=Uid.feed!ctxraw;cursor:=Int64.add!cursor(Int64.of_intlen);ifInt64.addpos(Int64.of_intlen)<topthengosrc(Int64.addpos(Int64.of_intlen))elseletuid=Uid.get!ctxinappendtfd(Uid.to_raw_stringuid)>>=fun()->return(Ok(Int64.(add!cursor(of_intUid.length)),uid))increatetsrc>>?funsrc->gosrc12L>>?fun(weight,uid)->closetfd>>?fun()->return(Ok(shift,weight,uid,entries))end