123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313(*
* Copyright (c) 2018-2021 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)open!ImportincludeAtomic_write_intfmoduleCache=IO.CachemoduleTable(K:Irmin.Type.S)=Hashtbl.Make(structtypet=K.tlethash=Irmin.Type.(unstage(short_hashK.t))?seed:Noneletequal=Irmin.Type.(unstage(equalK.t))end)moduleMake_persistent(Current:Version.S)(K:Irmin.Type.S)(V:Irmin.Hash.S)=structmoduleTbl=Table(K)moduleW=Irmin.Private.Watch.Make(K)(V)moduleIO=IO.Unixtypekey=K.ttypevalue=V.ttypewatch=W.watchtypet={index:int63Tbl.t;cache:V.tTbl.t;mutableblock:IO.t;w:W.t;mutableopen_instances:int;}letdecode_bin=Irmin.Type.(unstage(decode_binint32))letread_length32~offblock=letbuf=Bytes.create4inletn=IO.readblock~offbufinassert(n=4);letn,v=decode_bin(Bytes.unsafe_to_stringbuf)0inassert(n=4);Int32.to_intvletentry=Irmin.Type.(pair(string_of`Int32)V.t)letkey_to_bin_string=Irmin.Type.(unstage(to_bin_stringK.t))letkey_of_bin_string=Irmin.Type.(unstage(of_bin_stringK.t))letentry_to_bin_string=Irmin.Type.(unstage(to_bin_stringentry))letvalue_of_bin_string=Irmin.Type.(unstage(of_bin_stringV.t))letvalue_decode_bin=Irmin.Type.(unstage(decode_binV.t))letset_entryt?offkv=letk=key_to_bin_stringkinletbuf=entry_to_bin_string(k,v)inmatchoffwith|None->IO.appendt.blockbuf|Someoff->IO.sett.blockbuf~offletpp_branch=Irmin.Type.ppK.tletzero=matchvalue_of_bin_string(String.makeV.hash_size'\000')with|Okx->x|Error_->assertfalseletequal_val=Irmin.Type.(unstage(equalV.t))letrefillt~to_~from=letrecauxoffset=ifoffset>=to_then()elseletlen=read_length32~off:offsett.blockinletbuf=Bytes.create(len+V.hash_size)inletoff=offset++Int63.of_int4inletn=IO.readt.block~offbufinassert(n=Bytes.lengthbuf);letbuf=Bytes.unsafe_to_stringbufinleth=leth=String.subbuf0leninmatchkey_of_bin_stringhwith|Okk->k|Error(`Msge)->failwitheinletn,v=value_decode_binbufleninassert(n=String.lengthbuf);ifnot(equal_valvzero)thenTbl.addt.cachehv;Tbl.addt.indexhoffset;(aux[@tailcall])(off++Int63.(of_int@@(len+V.hash_size)))inauxfromletsync_offsett=letformer_offset=IO.offsett.blockinletformer_generation=IO.generationt.blockinleth=IO.force_headerst.blockinifformer_generation<>h.generationthen(Log.debug(funl->l"[branches] generation changed, refill buffers");IO.closet.block;letio=IO.v~fresh:false~readonly:true~version:(SomeCurrent.version)(IO.namet.block)int.block<-io;Tbl.cleart.cache;Tbl.cleart.index;refillt~to_:h.offset~from:Int63.zero)elseifh.offset>former_offsetthenrefillt~to_:h.offset~from:former_offsetletunsafe_findtk=Log.debug(funl->l"[branches] find %a"pp_branchk);ifIO.readonlyt.blockthensync_offsett;trySome(Tbl.findt.cachek)withNot_found->Noneletfindtk=Lwt.return(unsafe_findtk)letunsafe_memtk=Log.debug(funl->l"[branches] mem %a"pp_branchk);tryTbl.memt.cachekwithNot_found->falseletmemtv=Lwt.return(unsafe_memtv)letunsafe_removetk=Tbl.removet.cachek;tryletoff=Tbl.findt.indexkinset_entryt~offkzerowithNot_found->()letremovetk=Log.debug(funl->l"[branches] remove %a"pp_branchk);unsafe_removetk;W.notifyt.wkNoneletunsafe_clear?keep_generationt=Lwt.async(fun()->W.cleart.w);matchCurrent.versionwith|`V1->IO.truncatet.block|`V2->IO.clear?keep_generationt.block;Tbl.cleart.cache;Tbl.cleart.indexletcleart=Log.debug(funl->l"[branches] clear");unsafe_cleart;Lwt.return_unitletclear_keep_generationt=Log.debug(funl->l"[branches] clear");unsafe_clear~keep_generation:()t;Lwt.return_unitletwatches=W.v()letvalidt=ift.open_instances<>0then(t.open_instances<-t.open_instances+1;true)elsefalseletunsafe_v~fresh~readonlyfile=letblock=IO.v~fresh~version:(SomeCurrent.version)~readonlyfileinletcache=Tbl.create997inletindex=Tbl.create997inlett={cache;index;block;w=watches;open_instances=1}inleth=IO.force_headersblockinrefillt~to_:h.offset~from:Int63.zero;tletCache.{v=unsafe_v}=Cache.memoize~clear:unsafe_clear~valid~v:(fun()->unsafe_v)Layout.branchletv?fresh?readonlyfile=Lwt.return(unsafe_v()?fresh?readonlyfile)letunsafe_settkv=tryletoff=Tbl.findt.indexkinTbl.replacet.cachekv;set_entryt~offkvwithNot_found->letoffset=IO.offsett.blockinset_entrytkv;Tbl.addt.cachekv;Tbl.addt.indexkoffsetletsettkv=Log.debug(funl->l"[branches %s] set %a"(IO.namet.block)pp_branchk);unsafe_settkv;W.notifyt.wk(Somev)letequal_v_opt=Irmin.Type.(unstage(equal(optionV.t)))letunsafe_test_and_settk~test~set=letv=trySome(Tbl.findt.cachek)withNot_found->Noneinifnot(equal_v_optvtest)thenLwt.return_falseelseletreturn()=Lwt.return_trueinmatchsetwith|None->unsafe_removetk|>return|Somev->unsafe_settkv|>returnlettest_and_settk~test~set=Log.debug(funl->l"[branches] test-and-set %a"pp_branchk);unsafe_test_and_settk~test~set>>=function|true->W.notifyt.wkset>|=fun()->true|false->Lwt.return_falseletlistt=Log.debug(funl->l"[branches] list");letkeys=Tbl.fold(funk_acc->k::acc)t.cache[]inLwt.returnkeysletwatch_keyt=W.watch_keyt.wletwatcht=W.watcht.wletunwatcht=W.unwatcht.wletunsafe_closet=t.open_instances<-t.open_instances-1;ift.open_instances=0then(Tbl.resett.index;Tbl.resett.cache;ifnot(IO.readonlyt.block)thenIO.flusht.block;IO.closet.block;W.cleart.w)elseLwt.return_unitletcloset=unsafe_closetletflusht=IO.flusht.blockend(* FIXME: remove code duplication with irmin/atomic_write *)moduleCloseable(AW:S)=structtypet={closed:boolref;t:AW.t}typekey=AW.keytypevalue=AW.valueletcheck_not_closedt=if!(t.closed)thenraiseIrmin.Closedletmemtk=check_not_closedt;AW.memt.tkletfindtk=check_not_closedt;AW.findt.tkletsettkv=check_not_closedt;AW.sett.tkvlettest_and_settk~test~set=check_not_closedt;AW.test_and_sett.tk~test~setletremovetk=check_not_closedt;AW.removet.tkletlistt=check_not_closedt;AW.listt.ttypewatch=AW.watchletwatcht?initf=check_not_closedt;AW.watcht.t?initfletwatch_keytk?initf=check_not_closedt;AW.watch_keyt.tk?initfletunwatchtw=check_not_closedt;AW.unwatcht.twletmake_closeablet={closed=reffalse;t}letcloset=if!(t.closed)thenLwt.return_unitelse(t.closed:=true;AW.closet.t)letcleart=check_not_closedt;AW.cleart.tletflusht=check_not_closedt;AW.flusht.tletclear_keep_generationt=check_not_closedt;AW.clear_keep_generationt.tend