123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865(*
* Copyright (c) 2018-2021 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)open!Import(* TODO(craigfe): better namespacing of modules shared with [irmin-pack] *)moduleLayout_layered=LayoutmoduleV=Irmin_pack.Version.V2letcache_size=10_000exceptionCancelledmoduleIO=Irmin_pack.IO.UnixmoduleLock=IO_layers.LockmoduleIO_layers=IO_layers.IOletmayf=functionNone->Lwt.return_unit|Somebf->fbfletlock_pathroot=Filename.concatroot"lock"moduleMaker(Config:Conf.Pack.S)(Node:Irmin.Private.Node.Maker)(Commit:Irmin.Private.Commit.Maker)(M:Irmin.Metadata.S)(C:Irmin.Contents.S)(P:Irmin.Path.S)(B:Irmin.Branch.S)(H:Irmin.Hash.S)=structmoduleIndex=Irmin_pack.Index.Make(H)modulePack=Irmin_pack.Pack_store.Maker(V)(Index)(H)typestore_handle=|Commit_t:H.t->store_handle|Node_t:H.t->store_handle|Content_t:H.t->store_handlemoduleX=structmoduleHash=HmoduleContents=structmodulePack_value=Irmin_pack.Pack_value.Of_contents(H)(C)(* FIXME: remove duplication with irmin-pack/ext.ml *)moduleCA=structmoduleKey=HmoduleVal=CmoduleCA=Pack.Make(Pack_value)includeLayered_store.Content_addressable(H)(Index)(CA)(CA)endincludeIrmin.Contents.Store(CA)endmoduleNode=structmodulePa=Layered_store.Pack_maker(H)(Index)(Pack)moduleNode=Node(H)(P)(M)moduleCA=Inode_layers.Make(Config)(H)(Pa)(Node)includeIrmin.Private.Node.Store(Contents)(P)(M)(CA)endmoduleCommit=structmoduleCommit=Commit(H)modulePack_value=Irmin_pack.Pack_value.Of_commit(H)(Commit)moduleCA=structmoduleKey=HmoduleVal=CommitmoduleCA=Pack.Make(Pack_value)includeLayered_store.Content_addressable(H)(Index)(CA)(CA)endincludeIrmin.Private.Commit.Store(Node)(CA)endmoduleBranch=structmoduleKey=BmoduleVal=HmoduleAtomic_write=structmoduleAW=Irmin_pack.Atomic_write.Make_persistent(V)(Key)(Val)includeIrmin_pack.Atomic_write.Closeable(AW)letv?fresh?readonlypath=AW.v?fresh?readonlypath>|=make_closeableendincludeLayered_store.Atomic_write(Key)(Atomic_write)(Atomic_write)endmoduleSlice=Irmin.Private.Slice.Make(Contents)(Node)(Commit)moduleSync=Irmin.Private.Sync.None(H)(B)moduleRepo=structtypeupper_layer={contents:readContents.CA.U.t;node:readNode.CA.U.t;commit:readCommit.CA.U.t;branch:Branch.U.t;index:Index.t;}typelower_layer={lcontents:readContents.CA.L.t;lnode:readNode.CA.L.t;lcommit:readCommit.CA.L.t;lbranch:Branch.L.t;lindex:Index.t;}typefreeze_info={throttle:Conf.Pack.freeze_throttle;lock:Lwt_mutex.t;mutablestate:[`None|`Running|`Cancel];}typet={root:string;readonly:bool;blocking_copy_size:int;with_lower:bool;contents:readContents.CA.t;node:readNode.CA.t;branch:Branch.t;commit:readCommit.CA.t;lower_index:Index.toption;uppers_index:Index.t*Index.t;mutableflip:bool;mutableclosed:bool;flip_file:IO_layers.t;batch_lock:Lwt_mutex.t;freeze:freeze_info;}letcontents_tt=t.contentsletnode_tt=(contents_tt,t.node)letcommit_tt=(node_tt,t.commit)letbranch_tt=t.branchmoduleIterate=structmoduleContents=structincludeContents.CAtypet=readContents.CA.tendmoduleNodes=structincludeNode.CAtypet=readNode.CA.tendmoduleCommits=structincludeCommit.CAtypet=readCommit.CA.tendtype'astore_fn={f:'t.(moduleS.Layeredwithtypet='t)->'t->'a;}[@@ocaml.unboxed]letiter_lwt(f:unitLwt.tstore_fn)t:unitLwt.t=f.f(moduleContents)t.contents>>=fun()->f.f(moduleNodes)t.node>>=fun()->f.f(moduleCommits)t.commit>>=fun()->f.f(moduleBranch)t.branchletiter(f:unitstore_fn)t:unit=f.f(moduleContents)t.contents;f.f(moduleNodes)t.node;f.f(moduleCommits)t.commit;f.f(moduleBranch)t.branchendletbatchtf=Lwt_mutex.with_lockt.batch_lock@@fun()->Contents.CA.batcht.contents(funcontents->Node.CA.batcht.node(funnode->Commit.CA.batcht.commit(funcommit->letcontents:'aContents.t=contentsinletnode:'aNode.t=(contents,node)inletcommit:'aCommit.t=(node,commit)infcontentsnodecommit)))letunsafe_v_upperrootconfig=letfresh=Conf.Pack.freshconfiginletlru_size=Conf.Pack.lru_sizeconfiginletreadonly=Conf.Pack.readonlyconfiginletlog_size=Conf.Pack.index_log_sizeconfiginletthrottle=Conf.Pack.merge_throttleconfiginletf=ref(fun()->())inletindex=Index.v~flush_callback:(fun()->!f())(* backpatching to add pack flush before an index flush *)~fresh~readonly~throttle~log_sizerootinlet*contents=Contents.CA.U.v~fresh~readonly~lru_size~indexrootinlet*node=Node.CA.U.v~fresh~readonly~lru_size~indexrootinlet*commit=Commit.CA.U.v~fresh~readonly~lru_size~indexrootinlet+branch=Branch.U.v~fresh~readonlyrootin(f:=fun()->Contents.CA.U.flush~index:falsecontents);({index;contents;node;commit;branch}:upper_layer)letunsafe_v_lowerrootconfig=letfresh=Conf.Pack.freshconfiginletlru_size=Conf.Pack.lru_sizeconfiginletreadonly=Conf.Pack.readonlyconfiginletlog_size=Conf.Pack.index_log_sizeconfiginletthrottle=Conf.Pack.merge_throttleconfiginletf=ref(fun()->())inletindex=Index.v~flush_callback:(fun()->!f())~fresh~readonly~throttle~log_sizerootinlet*lcontents=Contents.CA.L.v~fresh~readonly~lru_size~indexrootinlet*lnode=Node.CA.L.v~fresh~readonly~lru_size~indexrootinlet*lcommit=Commit.CA.L.v~fresh~readonly~lru_size~indexrootinlet+lbranch=Branch.L.v~fresh~readonlyrootin(f:=fun()->Contents.CA.L.flush~index:falselcontents);({lindex=index;lcontents;lnode;lcommit;lbranch}:lower_layer)letv_layer~vrootconfig=Lwt.catch(fun()->vrootconfig)(function|Irmin_pack.Version.Invalid{expected;found}asewhenexpected=V.version->Log.err(funm->m"[%s] Attempted to open store of unsupported version %a"rootIrmin_pack.Version.ppfound);Lwt.faile|e->Lwt.faile)letfreeze_infothrottle={throttle;state=`None;lock=Lwt_mutex.create()}letvconfig=letroot=Conf.Pack.rootconfiginletupper1=Filename.concatroot(Conf.upper_root1config)inlet*upper1=v_layer~v:unsafe_v_upperupper1configinletupper0=Filename.concatroot(Conf.upper_root0config)inlet*upper0=v_layer~v:unsafe_v_upperupper0configinletwith_lower=Conf.with_lowerconfiginletlower_root=Filename.concatroot(Conf.lower_rootconfig)inlet*lower=ifwith_lowerthenv_layer~v:unsafe_v_lowerlower_rootconfig>|=Option.someelseLwt.return_noneinletfile=Layout_layered.flip~rootinlet*flip_file=IO_layers.vfileinlet*flip=IO_layers.read_flipflip_filein(* A fresh store has to unlink the lock file as well. *)letfresh=Conf.Pack.freshconfiginletfreeze=freeze_info(Conf.Pack.freeze_throttleconfig)inletlock_file=lock_pathrootinletfreeze_in_progress()=freeze.state=`Runninginletalways_false()=falseinletbatch_lock=Lwt_mutex.create()inlet+()=iffresh&&Lock.testlock_filethenLock.unlinklock_fileelseLwt.return_unitinletlower_contents=Option.map(funx->x.lcontents)lowerinletcontents=Contents.CA.vupper1.contentsupper0.contentslower_contents~flip~freeze_in_progress:always_falseinletlower_node=Option.map(funx->x.lnode)lowerinletnode=Node.CA.vupper1.nodeupper0.nodelower_node~flip~freeze_in_progress:always_falseinletlower_commit=Option.map(funx->x.lcommit)lowerinletcommit=Commit.CA.vupper1.commitupper0.commitlower_commit~flip~freeze_in_progressinletlower_branch=Option.map(funx->x.lbranch)lowerinletbranch=Branch.vupper1.branchupper0.branchlower_branch~flip~freeze_in_progressinletlower_index=Option.map(funx->x.lindex)lowerinletreadonly=Conf.Pack.readonlyconfiginletblocking_copy_size=Conf.blocking_copy_sizeconfigin{contents;node;commit;branch;root;readonly;with_lower;blocking_copy_size;lower_index;uppers_index=(upper1.index,upper0.index);flip;closed=false;flip_file;freeze;batch_lock;}letunsafe_closet=t.closed<-true;(matcht.lower_indexwithSomex->Index.closex|None->());Index.close(fstt.uppers_index);Index.close(sndt.uppers_index);IO_layers.closet.flip_file>>=fun()->letf:unitLwt.tIterate.store_fn={f=(fun(typea)(moduleC:S.Layeredwithtypet=a)(x:a)->C.closex);}inIterate.iter_lwtftletcloset=Lwt_mutex.with_lockt.freeze.lock(fun()->unsafe_closet)(** RO uses the generation to sync the stores, so to prevent races (async
reads of flip and generation) the generation is used to update the
flip. The first store reads the flip and syncs with the files on disk,
the other stores only need to update the flip. *)letsynct=leton_generation_change()=Node.CA.clear_cachest.node;Commit.CA.clear_cachest.commitinleton_generation_change_next_upper()=Node.CA.clear_caches_next_uppert.node;Commit.CA.clear_caches_next_uppert.commitinletflip=Contents.CA.sync~on_generation_change~on_generation_change_next_uppert.contentsint.flip<-flip;letf:unitIterate.store_fn={f=(fun(typea)(moduleC:S.Layeredwithtypet=a)(x:a)->C.update_flip~flipx);}inIterate.iterftletcleart=Contents.CA.cleart.contents(** migrate can be called on a layered store where only one layer exists
on disk. As migration fails on an empty store, we check which layer is
in the wrong version. *)letmigrateconfig=ifConf.Pack.readonlyconfigthenraiseIrmin_pack.RO_not_allowed;letroot=Conf.Pack.rootconfiginConf.[upper_root1;upper_root0;lower_root]|>List.map(funname->letroot=Filename.concatroot(nameconfig)inletconfig=Irmin.Private.Conf.addconfigConf.Pack.root_key(Someroot)intryletio=IO.v~version:(SomeV.version)~fresh:false~readonly:true(Layout.pack~root)in(config,Someio)with|Irmin_pack.Version.Invalid_->(config,None)|e->raisee)|>List.fold_left(funto_migrate(config,io)->matchiowith|None->config::to_migrate|Someio->IO.closeio;to_migrate)[]|>List.iter(funconfig->Irmin_pack.migrateconfig)letlayer_idtstore_handler=matchstore_handlerwith|Commit_tk->Commit.CA.layer_idt.commitk|Node_tk->Node.CA.layer_idt.nodek|Content_tk->Contents.CA.layer_idt.contentskletflusht=Contents.CA.flusht.contents;Branch.flusht.branchletflush_next_lowert=Contents.CA.flush_next_lowert.contents;Branch.flush_next_lowert.branch(** Store share instances of the underlying IO files, so it is enough to
call clear on one store. However, each store has its own caches, which
need to be cleared too. *)letclear_previous_upper?keep_generationt=Log.debug(funl->l"clear previous upper");Contents.CA.clear_previous_upper?keep_generationt.contents>>=fun()->Node.CA.clear_caches_next_uppert.node;Commit.CA.clear_caches_next_uppert.commit;Branch.clear_previous_uppert.branchletflip_uppert=t.flip<-nott.flip;letf:unitIterate.store_fn={f=(fun(typea)(moduleC:S.Layeredwithtypet=a)(x:a)->C.flip_upperx);}inIterate.iterftletwrite_flipt=IO_layers.write_flipt.flipt.flip_fileletupper_in_uset=ift.flipthen`Upper1else`Upper0letoffsett=Contents.CA.offsett.contentsendendletintegrity_check?ppf~auto_repairt=letmoduleChecks=Irmin_pack.Checks.Index(Index)inletcontents=X.Repo.contents_ttinletnodes=X.Repo.node_tt|>sndinletcommits=X.Repo.commit_tt|>sndinletintegrity_check_layer~layerindex=letcheck~kind~offset~lengthk=matchkindwith|`Contents->X.Contents.CA.integrity_check~offset~length~layerkcontents|`Node->X.Node.CA.integrity_check~offset~length~layerknodes|`Commit->X.Commit.CA.integrity_check~offset~length~layerkcommitsinChecks.integrity_check?ppf~auto_repair~checkindexin[(`Upper1,Some(fstt.X.Repo.uppers_index));(`Upper0,Some(sndt.X.Repo.uppers_index));(`Lower,t.lower_index);]|>List.map(fun(layer,index)->matchindexwith|Someindex->(integrity_check_layer~layerindex,layer)|None->(Ok`No_error,layer))includeIrmin.Of_private(X)letsync=X.Repo.syncletclear=X.Repo.clearletmigrate=X.Repo.migrateletflush=X.Repo.flushletpp_commits=Fmt.list~sep:Fmt.commaCommit.pp_hashmoduleCopy=structletmem_commit_lowert=X.Commit.CA.mem_lowert.X.Repo.commitletmem_commit_nextt=X.Commit.CA.mem_nextt.X.Repo.commitletmem_node_lowert=X.Node.CA.mem_lowert.X.Repo.nodeletmem_node_nextt=X.Node.CA.mem_nextt.X.Repo.nodeletmem_contents_lowert=X.Contents.CA.mem_lowert.X.Repo.contentsletmem_contents_nextt=X.Contents.CA.mem_nextt.X.Repo.contentsletcopy_branchest=X.Branch.copy~mem_commit_lower:(mem_commit_lowert)~mem_commit_upper:(mem_commit_nextt)t.X.Repo.branchletskip_with_stats~skiph=skiph>|=funshould_skip->Irmin_layers.Stats.skip_testshould_skip;should_skipletno_skip_=Lwt.returnfalseletpred_nodetk=letn=snd(X.Repo.node_tt)inX.Node.CA.findnk>|=function|None->[]|Somev->List.rev_map(function`Inodex->`Nodex|(`Node_|`Contents_)asx->x)(X.Node.CA.Val.predv)letalways_false_=falseletwith_cancelcancelf=ifcancel()thenLwt.failCancelledelsef()letiter_copy(contents,nodes,commits)?(skip_commits=no_skip)?(cancel=always_false)?(skip_nodes=no_skip)?(skip_contents=no_skip)t?(min=[])max=(* if node or contents are already in dst then they are skipped by
Graph.iter; there is no need to check this again when the object is
copied *)letcommitk=with_cancelcancel@@fun()->X.Commit.CA.copycommitst.X.Repo.commit"Commit"k;Irmin_layers.Stats.freeze_yield();let*()=Lwt.pause()inIrmin_layers.Stats.freeze_yield_end();Lwt.return_unitinletnodek=with_cancelcancel@@fun()->X.Node.CA.copynodest.X.Repo.nodek;Lwt.return_unitinletcontentsk=with_cancelcancel@@fun()->X.Contents.CA.copycontentst.X.Repo.contents"Contents"k;Lwt.return_unitinletskip_nodeh=skip_with_stats~skip:skip_nodeshinletskip_contentsh=skip_with_stats~skip:skip_contentshinletskip_commith=skip_with_stats~skip:skip_commitshinlet+()=Repo.iter~cache_size~min~max~commit~node~contents~skip_node~skip_contents~pred_node~skip_committinX.Repo.flushtmoduleCopyToLower=structleton_lowertf=letcontents=(X.Contents.CA.Lower,X.Contents.CA.lowert.X.Repo.contents)inletnodes=(X.Node.CA.Lower,X.Node.CA.lowert.X.Repo.node)inletcommits=(X.Commit.CA.Lower,X.Commit.CA.lowert.X.Repo.commit)inf(contents,nodes,commits)letcopy?cancel?(min=[])tcommits=Log.debug(funf->f"@[<2>copy to lower:@ min=%a,@ max=%a@]"pp_commitsminpp_commitscommits);letmax=List.map(funx->`Commit(Commit.hashx))commitsinletmin=List.map(funx->`Commit(Commit.hashx))mininon_lowert(funl->iter_copy?cancell~skip_commits:(mem_commit_lowert)~skip_nodes:(mem_node_lowert)~skip_contents:(mem_contents_lowert)t~minmax)endmoduleCopyToUpper=structleton_next_uppertf=letcontents=(X.Contents.CA.Upper,X.Contents.CA.next_uppert.X.Repo.contents)inletnodes=(X.Node.CA.Upper,X.Node.CA.next_uppert.X.Repo.node)inletcommits=(X.Commit.CA.Upper,X.Commit.CA.next_uppert.X.Repo.commit)inf(contents,nodes,commits)letcopy?cancel?(min=[])tcommits=Log.debug(funf->f"@[<2>copy to next upper:@ min=%a,@ max=%a@]"pp_commitsminpp_commitscommits);letmax=List.map(funx->`Commit(Commit.hashx))commitsinletmin=List.map(funx->`Commit(Commit.hashx))mininon_next_uppert(funu->iter_copy?cancelu~skip_commits:(mem_commit_nextt)~skip_nodes:(mem_node_nextt)~skip_contents:(mem_contents_nextt)~mintmax)(** Newies are the objects added in current upper during the freeze. They
are copied to the next upper before the freeze ends. When copying the
newies we have to traverse them as well, to ensure that all objects
used by a newies are also copied in the next upper. We only keep track
of commit newies and rely on `Repo.iter` to compute the transitive
closures of all the newies. *)letcopy_newies~cancelt=letnewies=X.Commit.CA.consume_newiest.X.Repo.commitinletnewies=List.rev_map(funx->`Commitx)newiesinLog.debug(funl->l"copy newies");(* we want to copy all the new commits; stop whenever one
commmit already in the other upper or in lower. *)letskip_commitsk=mem_commit_nexttk>>=function|true->Lwt.returntrue|false->mem_commit_lowertkinon_next_uppert(funu->iter_copyu?cancel~skip_commits~skip_nodes:(mem_node_nextt)~skip_contents:(mem_contents_nextt)tnewies)>>=fun()->X.Branch.copy_newies_to_next_uppert.branch(** Repeatedly call [copy_newies] as long as there are many newies (more
than newies_limit bytes added). *)letreccopy_newies_to_next_upper~canceltformer_offset=letnewies_limit=Int63.of_intt.X.Repo.blocking_copy_sizeinletoffset=X.Repo.offsettinifoffset--former_offset>=newies_limitthen(Irmin_layers.Stats.copy_newies_loop();copy_newies~cancelt>>=fun()->(copy_newies_to_next_upper~canceltoffset[@tail]))elseLwt.return_unitendmoduleCopyFromLower=struct(* FIXME(samoht): copy/paste from iter_copy with s/copy/copy_from_lower *)letiter_copy(contents,nodes,commits)?(skip_commits=no_skip)?(cancel=always_false)?(skip_nodes=no_skip)?(skip_contents=no_skip)t?(min=[])cs=(* if node or contents are already in dst then they are skipped by
Graph.iter; there is no need to check this again when the object is
copied *)letcommitk=with_cancelcancel@@fun()->X.Commit.CA.copy_from_lower~dst:commitst.X.Repo.commit"Commit"kinletnodek=with_cancelcancel@@fun()->X.Node.CA.copy_from_lower~dst:nodest.X.Repo.nodekinletcontentsk=with_cancelcancel@@fun()->X.Contents.CA.copy_from_lower~dst:contentst.X.Repo.contents"Contents"kinletskip_nodeh=skip_with_stats~skip:skip_nodeshinletskip_contentsh=skip_with_stats~skip:skip_contentshinletskip_commith=skip_with_stats~skip:skip_commitshinletmax=List.map(func->`Commitc)csinletmin=List.map(func->`Commitc)mininlet+()=Repo.iter~cache_size~min~max~commit~node~contents~skip_node~skip_contents~pred_node~skip_committinX.Repo.flushtleton_current_uppertf=letcontents=X.Contents.CA.current_uppert.X.Repo.contentsinletnodes=X.Node.CA.current_uppert.X.Repo.nodeinletcommits=X.Commit.CA.current_uppert.X.Repo.commitinf(contents,nodes,commits)(** An object can be in either lower or upper or both. We can't skip an
object already in upper as some predecessors could still be in lower. *)letself_contained?min~maxt=letmax=List.map(funx->Commit.hashx)maxinletmin=matchminwith|None->max(* if min is empty then copy only the max commits *)|Somemin->List.map(funx->Commit.hashx)minin(* FIXME(samoht): do this in 2 steps: 1/ find the shallow
hashes in upper 2/ iterates with max=shallow
(ngoguey): we could stop at the uppers directly following a lower.
*)Log.debug(funl->l"self_contained: copy commits min:%a; max:%a from lower into \
upper to make the upper self contained"(Fmt.list(Irmin.Type.ppH.t))min(Fmt.list(Irmin.Type.ppH.t))max);on_current_uppert(funu->iter_copyu~mintmax)endendletcopy~cancel~min_lower~max_lower~min_upper~max_uppert=(* Copy commits to lower.
In case cancellation of the freeze, copies to the lower layer will not
be reverted. Since the copying is performed in the [rev] order, the next
freeze will resume copying where the previous freeze stopped. *)Irmin_layers.Stats.freeze_section"copy to lower";(ift.X.Repo.with_lowerthenCopy.CopyToLower.copy~cancelt~min:min_lowermax_lowerelseLwt.return_unit)>>=fun()->(* Copy [min_upper, max_upper] to next_upper. In case of cancellation of the
freeze, the next upper will be cleared. *)Irmin_layers.Stats.freeze_section"copy to next upper";Copy.CopyToUpper.copyt~cancel~min:min_uppermax_upper>>=fun()->Irmin_layers.Stats.freeze_section"copy branches";(* Copy branches to both lower and next_upper *)Copy.copy_branchestmoduleField=structtypet=F:'aFmt.t*string*'a->t|Eletppppf=functionE->()|F(pp,k,v)->Fmt.pfppf"%s=%a"kppvletppsppft=Fmt.list~sep:(Fmt.unit"; ")ppppf(List.filter(funx->x<>E)t)letcommitsk=function[]->E|v->F(pp_commits,k,v)endletpp_repoppft=Fmt.pfppf"%a"Layered_store.pp_current_uppert.X.Repo.flipletunsafe_freeze~min_lower~max_lower~min_upper~max_upper?hookt=Log.info(funl->l"[%a] freeze starts { %a }"pp_repotField.pps[Field.commits"min_lower"min_lower;Field.commits"max_lower"max_lower;Field.commits"min_upper"min_upper;Field.commits"max_upper"max_upper;]);letoffset=X.Repo.offsettinletlock_file=lock_patht.rootin(* We take a file lock here to signal that a freeze was in progess in
case of crash, to trigger the recovery path. *)let*lock_file=Lock.vlock_fileinletcancel()=t.freeze.state=`Cancelinletcopy()=may(funf->f`Before_Copy)hook>>=fun()->copy~cancel~min_lower~max_lower~min_upper~max_uppert>>=fun()->Irmin_layers.Stats.freeze_section"flush lower";X.Repo.flush_next_lowert;may(funf->f`Before_Copy_Newies)hook>>=fun()->Irmin_layers.Stats.freeze_section"copy newies (loop)";Copy.CopyToUpper.copy_newies_to_next_upper~cancel:(Somecancel)toffset>>=fun()->may(funf->f`Before_Copy_Last_Newies)hook>>=fun()->(* Let's finish the freeze under the batch lock so that no concurrent
modifications occur until the uppers are flipped. No more cancellations
from this point on. There are only a few newies left (less than
[newies_limit] bytes) so this lock should be quickly released. *)Irmin_layers.Stats.freeze_section"wait for batch lock";Irmin_layers.Stats.freeze_yield();Lwt_mutex.with_lockt.batch_lock(fun()->Irmin_layers.Stats.freeze_yield_end();Irmin_layers.Stats.freeze_section"copy newies (last)";Copy.CopyToUpper.copy_newies~cancel:Nonet>>=fun()->Irmin_layers.Stats.freeze_section"misc";may(funf->f`Before_Flip)hook>>=fun()->X.Repo.flip_uppert;may(funf->f`Before_Clear)hook>>=fun()->X.Repo.clear_previous_uppert)>>=fun()->(* RO reads generation from pack file to detect a flip change, so it's
ok to write the flip file outside the lock *)X.Repo.write_fliptinletfinalizecancelled()=Irmin_layers.Stats.freeze_section"finalize";t.freeze.state<-`None;(ifcancelledthenX.Repo.clear_previous_upper~keep_generation:()telseLwt.return_unit)>>=fun()->Lock.closelock_file>>=fun()->Lwt_mutex.unlockt.freeze.lock;may(funf->f`After_Clear)hook>|=fun()->Irmin_layers.Stats.freeze_stop();(* Fmt.pr "\n%a%!" Irmin_layers.Stats.pp_latest (); *)()inletasync()=Lwt.try_bindcopy(finalizefalse)(function|Cancelled->finalizetrue()|e->Lwt.faile)inLwt.asyncasync;Lwt.return_unit(** Main thread takes the [t.freeze.lock] at the begining of freeze and async
thread releases it at the end. This is to ensure that no two freezes can
run simultaneously. *)letfreeze'?min_lower?max_lower?min_upper?max_upper?(recovery=false)?hookt=let*()=ifrecoverythenX.Repo.clear_previous_upper~keep_generation:()telseLwt.return_unitinletfreeze()=lett0=Mtime_clock.now()inLwt_mutex.lockt.freeze.lock>>=fun()->t.freeze.state<-`Running;Irmin_layers.Stats.freeze_startt0"wait for freeze lock";Irmin_layers.Stats.freeze_section"misc";letmin_lower=Option.valuemin_lower~default:[]inlet*max_lower=matchmax_lowerwithSomel->Lwt.returnl|None->Repo.headstinletmax_upper=Option.valuemax_upper~default:max_lowerinletmin_upper=Option.valuemin_upper~default:max_upperinunsafe_freeze~min_lower~max_lower~min_upper~max_upper?hooktinift.X.Repo.closedthenLwt.fail_with"store is closed"elseift.readonlythenraiseIrmin_pack.RO_not_allowedelsematch(t.freeze.state,t.freeze.throttle)with|`Running,`Overcommit_memory->Lwt.return()|`Running,`Cancel_existing->t.freeze.state<-`Cancel;freeze()|_->freeze()letlayer_id=X.Repo.layer_idletfreeze=freeze'?hook:Noneletasync_freeze(t:Repo.t)=Lock.test(lock_patht.X.Repo.root)letupper_in_use=X.Repo.upper_in_useletself_contained=Copy.CopyFromLower.self_containedletneeds_recoveryt=Lock.test(lock_patht.X.Repo.root)letcheck_self_contained?headst=Log.debug(funl->l"Check that the upper layer is self contained");leterrors=ref0inletnone()=increrrors;Lwt.return_unitinletnodek=X.Node.CA.checkt.X.Repo.node~nonekinletcontentsk=X.Contents.CA.checkt.X.Repo.contents~nonekinletcommitk=X.Commit.CA.checkt.X.Repo.commit~nonekinlet*heads=matchheadswithNone->Repo.headst|Somem->Lwt.returnminlethashes=List.map(funx->`Commit(Commit.hashx))headsinlet+()=Repo.iter~cache_size~min:[]~max:hashes~commit~node~contentstinletpp_commits=Fmt.list~sep:Fmt.commaCommit.pp_hashinif!errors=0thenFmt.kstrf(funx->Ok(`Msgx))"Upper layer is self contained for heads %a"pp_commitsheadselseFmt.kstrf(funx->Error(`Msgx))"Upper layer is not self contained for heads %a: %n phantom objects \
detected"pp_commitsheads!errorsmodulePrivate_layer=structmoduleHook=structtype'at='a->unitLwt.tletvf=fendletwait_for_freeze(t:Repo.t)=Lwt_mutex.with_lockt.freeze.lock(fun()->Lwt.return_unit)letfreeze'=freeze'letupper_in_use=upper_in_useendend