123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openStore_sigsopenStore_errors(* Helper functions to copy byte sequences or integers in [src] to another byte
sequence [dst] at offset [offset], with named arguments to avoid
confusion. These functions return the offset in the destination at which to
copy the more data. *)letblit~src~dstoffset=letlen=Bytes.lengthsrcinBytes.blitsrc0dstoffsetlen;offset+lenletbytes_set_int64~src~dstoffset=Bytes.set_int64_bedstoffsetsrc;offset+8letbytes_set_int8~src~dstoffset=Bytes.set_int8dstoffsetsrc;offset+1(* Helper functions to read data (strings with a decoding function, or integers)
from a binary string. These functions return, as the second component, the
offset in the string at which to read more data. *)letread_int64stroffset=leti=TzEndian.get_int64_stringstroffsetin(i,offset+8)letread_int8stroffset=leti=TzEndian.get_int8_stringstroffsetin(i,offset+1)(* Functors to build stores on indexes *)moduletypeNAME=sigvalname:stringendmoduletypeSINGLETON_STORE=sigtype+'attypevaluevalload:path:string->'amode->'attzresultLwt.tvalread:[>`Read]t->valueoptiontzresultLwt.tvalwrite:[>`Write]t->value->unittzresultLwt.tvaldelete:[>`Write]t->unittzresultLwt.tvalreadonly:[>`Read]t->[`Read]tendmoduletypeINDEXABLE_STORE=sigtype+'attypekeytypevaluevalload:path:string->index_buffer_size:int->'amode->'attzresultLwt.tvalmem:[>`Read]t->key->booltzresultLwt.tvalfind:[>`Read]t->key->valueoptiontzresultLwt.tvaladd:?flush:bool->[>`Write]t->key->value->unittzresultLwt.tvalclose:_t->unittzresultLwt.tvalreadonly:[>`Read]t->[`Read]tvalgc:?async:bool->rwt->(key->value->booltzresultLwt.t)->unittzresultLwt.tvalwait_gc_completion:'at->unitLwt.tvalis_gc_finished:'at->boolendmoduletypeINDEXABLE_REMOVABLE_STORE=sigincludeINDEXABLE_STOREvalremove:?flush:bool->[>`Write]t->key->unittzresultLwt.tendmoduletypeINDEXED_FILE=sigtype+'attypekeytypeheadertypevaluevalmem:[>`Read]t->key->booltzresultLwt.tvalheader:[>`Read]t->key->headeroptiontzresultLwt.tvalread:[>`Read]t->key->(value*header)optiontzresultLwt.tvalappend:?flush:bool->[>`Write]t->key:key->header:header->value:value->unittzresultLwt.tvalload:path:string->index_buffer_size:int->cache_size:int->'amode->'attzresultLwt.tvalclose:_t->unittzresultLwt.tvalreadonly:[>`Read]t->[`Read]tvalgc:?async:bool->rwt->(key->header->value->booltzresultLwt.t)->unittzresultLwt.tvalwait_gc_completion:'at->unitLwt.tvalis_gc_finished:'at->boolendmoduletypeSIMPLE_INDEXED_FILE=sigincludeINDEXED_FILEvalappend:?flush:bool->[>`Write]t->key:key->value:value->unittzresultLwt.tendmoduletypeINDEX_KEY=sigincludeIndex.Key.Svalpp:Format.formatter->t->unitendmoduletypeENCODABLE_VALUE=sigtypetvalname:stringvalencoding:tData_encoding.tendmoduletypeFIXED_ENCODABLE_VALUE=sigincludeENCODABLE_VALUEvalfixed_size:intendmoduletypeENCODABLE_VALUE_HEADER=sigincludeENCODABLE_VALUEmoduleHeader:FIXED_ENCODABLE_VALUEendmoduleMake_fixed_encodable(V:ENCODABLE_VALUE):FIXED_ENCODABLE_VALUEwithtypet=V.t=structincludeVletfixed_size=matchData_encoding.Binary.fixed_lengthencodingwith|None->Stdlib.invalid_arg(name^" encoding is not fixed size")|Somesize->sizeendmoduleMake_index_value(E:FIXED_ENCODABLE_VALUE):Index.Value.Swithtypet=E.t=structtypet=E.tletencoded_size=E.fixed_sizeletencodev=Data_encoding.Binary.to_string_exn~buffer_size:encoded_sizeE.encodingvletdecodebufoffset=let_read_bytes,v=Data_encoding.Binary.read_exnE.encodingbufoffsetencoded_sizeinv(* The {!Repr.t} value is only used for pretty printing in {!Index} so this is
fine. *)lett=Repr.mapRepr.string(funs->decodes0)encodeendmoduleMake_index_key(E:sigincludeFIXED_ENCODABLE_VALUEvalequal:t->t->boolend):INDEX_KEYwithtypet=E.t=structincludeMake_index_value(E)letequal=E.equallethashv=Stdlib.Hashtbl.hash(encodev)(* {!Stdlib.Hashtbl.hash} is 30 bits *)lethash_size=30(* in bits *)letppppfk=Format.fprintfppf"%a"Data_encoding.Json.pp(Data_encoding.Json.constructE.encodingk)endmoduleMake_indexable(N:NAME)(K:INDEX_KEY)(V:Index.Value.S)=structmoduleI=Index_unix.Make(K)(V)(Index.Cache.Unbounded)typeinternal_index={index:I.t;index_path:string}typegc_status=|No_gc|Ongoingof{tmp_index:internal_index;promise:unitLwt.t}(** In order to periodically clean up the store with the {!gc} function, each
pure index store is split in multiple indexes: one fresh index and
multiple stale indexes (zero if no GC has ever occurred, one if the last
GC was successful, two if the GC is ongoing, and more in case some GC
failed).
Adding new information to the store is always done on the fresh index
whereas queries are done on both the fresh and stale indexes.
A gc operation starts by moving the fresh index to the stale list and
creates a new fresh index. The rest of the gc operation consists in,
asynchronously, merging the stale indexes while removing data. See {!gc}
for more details.
*)type_t={mutablefresh:internal_index;mutablestales:internal_indexlist;scheduler:Lwt_idle_waiter.t;readonly:bool;index_buffer_size:int;path:string;mutablegc_status:gc_status;}letinternal_indexes?(only_stale=false)store=ifonly_stalethenstore.staleselsestore.fresh::store.stalesletunsafe_memstorek=List.exists(funi->I.memi.indexk)(internal_indexesstore)letmemstorek=letopenLwt_result_syntaxintrace(Cannot_read_from_storeN.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->return(unsafe_memstorek)letfind_indexik=trySome(I.findik)withNot_found->Noneletunsafe_find?only_stalestorek=List.find_map(funi->tryfind_indexi.indexkwithe->Format.kasprintfStdlib.failwith"cannot access index %s : %s"i.index_path(Printexc.to_stringe))(internal_indexes?only_stalestore)letfindstorek=letopenLwt_result_syntaxintrace(Cannot_read_from_storeN.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->return(unsafe_findstorek)letadd?(flush=true)storekv=letopenLwt_result_syntaxintrace(Cannot_write_to_storeN.name)@@protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->I.replacestore.fresh.indexkv;ifflushthenI.flushstore.fresh.index;return_unitletstale_pathpathn=String.concat"."[path;string_of_intn]lettmp_pathpath=String.concat"."[path;"tmp"]letload_internal_index~index_buffer_size~readonlyindex_path=letindex=I.v~log_size:index_buffer_size~readonlyindex_pathin{index;index_path}letload(typea)~path~index_buffer_size(mode:amode):attzresultLwt.t=letopenLwt_result_syntaxintrace(Cannot_load_store{name=N.name;path})@@protect@@fun()->let*!()=Lwt_utils_unix.create_dir(Filename.dirnamepath)inletreadonly=matchmodewithRead_only->true|Read_write->falseinletfresh=load_internal_index~index_buffer_size~readonlypathin(* Loading stale indexes if they exist on disk (stale indexes are created by
GC operations). *)letrecload_stalesaccn=letopenLwt_syntaxinletstale_path=stale_pathpathninlet*!exists=Lwt_unix.file_existsstale_pathinifexiststhenletstale=load_internal_index~index_buffer_size~readonly:truestale_pathinload_stales(stale::acc)(n+1)elsereturnaccinlet*!stales=load_stales[]1inletscheduler=Lwt_idle_waiter.create()inreturn{fresh;stales;scheduler;readonly;index_buffer_size;path;gc_status=No_gc;}letclose_internal_indexi=tryI.closei.indexwithIndex.Closed->()letmv_internal_indexstoreindexdest=letopenLwt_syntaxinclose_internal_indexindex;let+()=Lwt_unix.renameindex.index_pathdestinload_internal_index~index_buffer_size:store.index_buffer_size~readonly:store.readonlydestletrm_internal_indexindex=close_internal_indexindex;Lwt_utils_unix.remove_dirindex.index_pathletclosestore=protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->letopenLwt_result_syntaxin(matchstore.gc_statuswith|Ongoing{promise;_}->Lwt.cancelpromise|No_gc->());close_internal_indexstore.fresh;List.iterclose_internal_indexstore.stales;let*!()=matchstore.gc_statuswith|No_gc->Lwt.return_unit|Ongoing{tmp_index;_}->rm_internal_indextmp_indexinreturn_unitletreadonlyx=(x:>[`Read]t)(** A gc is initiated by moving the fresh index to the stale list and creating
a new fresh index, as well as creating a temporary index for the result of
the gc. *)letinitiate_gcstore=letopenLwt_syntaxinLwt_idle_waiter.force_idlestore.scheduler@@fun()->let*()=Store_events.starting_gcN.nameinlettmp_index_path=tmp_pathstore.pathinletnew_stale_path=stale_pathstore.path(List.lengthstore.stales+1)inlet*new_index_stale=mv_internal_indexstorestore.freshnew_stale_pathinletnew_index_fresh=load_internal_index~index_buffer_size:store.index_buffer_size~readonly:store.readonlystore.pathinlettmp_index=load_internal_index~index_buffer_size:store.index_buffer_size~readonly:falsetmp_index_pathin(* Clear temporary index in case there are leftovers from a dirtily aborted
previous gc. *)I.cleartmp_index.index;store.fresh<-new_index_fresh;store.stales<-new_index_stale::store.stales;letpromise,resolve=Lwt.task()instore.gc_status<-Ongoing{tmp_index;promise};return(tmp_index,promise,resolve)(** If a gc operation fails, reverting simply consists in removing the
temporary index. We keep the two stale indexes as is, they will be merged
by the next successful gc. *)letrevert_failed_gcstore=letopenLwt_syntaxinmatchstore.gc_statuswith|No_gc->return_unit|Ongoing{tmp_index;promise}->Lwt.cancelpromise;store.gc_status<-No_gc;rm_internal_indextmp_index(** When the gc operation finishes, i.e. we have copied all elements to retain
to the temporary index, we can replace all the stale indexes by the
temporary one. *)letfinalize_gcstoretmp_index=letopenLwt_result_syntaxinprotect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->let*!()=List.iter_srm_internal_indexstore.stalesinletstale_path=stale_pathstore.path1inlet*!index_stale=mv_internal_indexstoretmp_indexstale_pathinstore.stales<-[index_stale];store.gc_status<-No_gc;let*!()=Store_events.finished_gcN.nameinreturn_unit(** Returns the elements of an index in an unspecified order. NOTE: the elements
are stored in memory. *)letindex_bindingsi=letacc=ref[]inI.iter(funkv->acc:=(k,v)::!acc)i;!acc(** The background task for a gc operation consists in copying all items that
satisfy [filter] from the stale indexes of [store] to the temporary one
[tmp_index]. While this is happening, the stale indexes can still be
queried and new bindings can still be added to the store because only the
fresh index is modified. *)letgc_background_taskstoretmp_indexfilterresolve=letopenLwt_syntaxinLwt.dont_wait(fun()->let*res=trace(Gc_failedN.name)@@protect@@fun()->letopenLwt_result_syntaxinletprocess_key_value(k,v)=let*keep=filterkvinifkeepthenI.replacetmp_index.indexkv;return_unitinletprocess_internal_indexi=List.iter_esprocess_key_value(index_bindingsi.index)inletstale_internal_indexes=internal_indexes~only_stale:truestore|>List.revinlet*()=List.iter_esprocess_internal_indexstale_internal_indexesinfinalize_gcstoretmp_indexinlet*()=matchreswith|Ok()->return_unit|Errorerror->let*()=Store_events.failed_gcN.nameerrorinrevert_failed_gcstoreinLwt.wakeup_laterresolve();return_unit)(funexn->Format.eprintf"Reverting GC for store %s failed because %s@."N.name(Printexc.to_stringexn))(** This function is called every time a gc operation starts. *)letgc_internal~asyncstorefilter=letopenLwt_syntaxinmatchstore.gc_statuswith|Ongoing_->Store_events.ignore_gcN.name|No_gc->let*tmp_index,promise,resolve=initiate_gcstoreingc_background_taskstoretmp_indexfilterresolve;ifasyncthenreturn_unitelseLwt.catch(fun()->promise)(functionLwt.Canceled->return_unit|e->raisee)letgc?(async=true)storefilter=letopenLwt_result_syntaxintrace(Gc_failedN.name)@@protect@@fun()->let*!()=gc_internal~asyncstorefilterinreturn_unitletwait_gc_completionstore=matchstore.gc_statuswith|No_gc->Lwt.return_unit|Ongoing{promise;_}->Lwt.catch(fun()->promise)(functionLwt.Canceled->Lwt.return_unit|e->raisee)letis_gc_finishedstore=matchstore.gc_statuswithNo_gc->true|Ongoing_->falseendmoduleMake_indexable_removable(N:NAME)(K:INDEX_KEY)(V:Index.Value.S)=structmoduleV_opt=struct(* The values stored in the index are optional values. When we "remove" a
key from the store, we're not really removing it from the index, but
simply setting its association to [None] (encoded with zero bytes here).
*)typet=V.toptionlett=Repr.optionV.tletencoded_size=1+V.encoded_sizeletencodev=letdst=Bytes.createencoded_sizeinlettag,value_bytes=matchvwith|None->(0,Bytes.makeV.encoded_size'\000')|Somev->(1,V.encodev|>Bytes.unsafe_of_string)inletoffset=bytes_set_int8~dst~src:tag0inlet_=blit~src:value_bytes~dstoffsetinBytes.unsafe_to_stringdstletdecodestroffset=lettag,offset=read_int8stroffsetinmatchtagwith|0->None|1->letvalue=V.decodestroffsetinSomevalue|_->assertfalseendincludeMake_indexable(N)(K)(V_opt)letfindstorek=letopenLwt_result_syntaxinlet+v=findstorekinmatchvwithNone|SomeNone->None|Some(Somev)->Somevletmemstorehash=letopenLwt_result_syntaxinlet+b=findstorehashinOption.is_somebletadd?flushstorekv=add?flushstorek(Somev)letremove?(flush=true)storek=letopenLwt_result_syntaxintrace(Cannot_write_to_storeN.name)@@protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->letexists=unsafe_memstorekinifnotexiststhenreturn_unitelse(I.replacestore.fresh.indexkNone;ifflushthenI.flushstore.fresh.index;return_unit)letgc?(async=true)storefilter=letopenLwt_result_syntaxintrace(Gc_failedN.name)@@protect@@fun()->(* Also remove bindings that point to None, i.e. the ones that were
artificially removed with {!remove}. *)letfilterk=functionNone->return_false|Somev->filterkvinlet*!()=gc_internal~asyncstorefilterinreturn_unitendmoduleMake_singleton(S:sigtypetvalname:stringvalencoding:tData_encoding.tend):SINGLETON_STOREwithtypevalue:=S.t=structtype+'at={file:string;mutablecache:S.toptionoption}letread_diskstore=letopenLwt_result_syntaxintrace(Cannot_read_from_storeS.name)@@protect@@fun()->let*!exists=Lwt_unix.file_existsstore.fileinmatchexistswith|false->return_none|true->(Lwt_io.with_file~flags:[Unix.O_RDONLY;O_CLOEXEC]~mode:Inputstore.file@@funchannel->let*!raw_data=Lwt_io.readchannelinletdata=Data_encoding.Binary.of_stringS.encodingraw_datainmatchdatawith|Okdata->return_somedata|Errorerr->tzfail(Decoding_errorerr))letreadstore=letopenLwt_result_syntaxinmatchstore.cachewithSomev->returnv|None->read_diskstoreletwrite_diskstorex=letopenLwt_result_syntaxintrace(Cannot_write_to_storeS.name)@@let*!res=Lwt_utils_unix.with_atomic_open_out~overwrite:truestore.file@@funfd->letblock_bytes=Data_encoding.Binary.to_bytes_exnS.encodingxinLwt_utils_unix.write_bytesfdblock_bytesinResult.bind_errorresLwt_utils_unix.tzfail_of_io_error|>Lwt.returnletwritestorex=letopenLwt_result_syntaxinlet+()=write_diskstorexinstore.cache<-Some(Somex)letdelete_diskstore=letopenLwt_result_syntaxintrace(Cannot_write_to_storeS.name)@@protect@@fun()->let*!exists=Lwt_unix.file_existsstore.fileinmatchexistswith|false->return_unit|true->let*!()=Lwt_unix.unlinkstore.fileinreturn_unitletdeletestore=letopenLwt_result_syntaxinlet+()=delete_diskstoreinstore.cache<-SomeNoneletload~path_mode=letopenLwt_result_syntaxintrace(Cannot_load_store{name=S.name;path})@@protect@@fun()->let*!()=Lwt_utils_unix.create_dir(Filename.dirnamepath)inreturn{file=path;cache=None}letreadonlyx=(x:>[`Read]t)endmoduleMake_indexed_file(N:NAME)(K:INDEX_KEY)(V:ENCODABLE_VALUE_HEADER)=structmoduleCache=Aches_lwt.Lache.Make_result(Aches.Rache.Transfer(Aches.Rache.LRU)(K))moduleRaw_header=Make_index_value(V.Header)moduleIHeader=structletname=N.name^".header"typet={offset:int;header:V.Header.t}letencoded_size=8(* offset *)+Raw_header.encoded_sizelett=letopenReprinmap(pairintRaw_header.t)(fun(offset,header)->{offset;header})(fun{offset;header}->(offset,header))letencodev=letdst=Bytes.createencoded_sizeinletoffset=bytes_set_int64~src:(Int64.of_intv.offset)~dst0inlet_offset=blit~src:(Raw_header.encodev.header|>String.to_bytes)~dstoffsetinBytes.unsafe_to_stringdstletdecodestroffset=letfile_offset,offset=read_int64stroffsetinletheader=Raw_header.decodestroffsetin{offset=Int64.to_intfile_offset;header}endmoduleHeader_index=Index_unix.Make(K)(IHeader)(Index.Cache.Unbounded)moduleValues_file=structletencoding=Data_encoding.dynamic_size~kind:`Uint30V.encodingletpread_valuefd~file_offset=letopenLwt_result_syntaxintrace(Cannot_read_from_storeN.name)@@protect@@fun()->(* Read length *)letlength_bytes=Bytes.create4inlet*!()=Lwt_utils_unix.read_bytes~file_offset~pos:0~len:4fdlength_bytesinletvalue_length_int32=Bytes.get_int32_belength_bytes0inletvalue_length=Int32.to_intvalue_length_int32inletvalue_bytes=Bytes.extendlength_bytes0value_lengthinlet*!()=Lwt_utils_unix.read_bytes~file_offset:(file_offset+4)~pos:4~len:value_lengthfdvalue_bytesinmatchData_encoding.Binary.of_bytesencodingvalue_byteswith|Okvalue->return(value,4+value_length)|Errorerr->tzfail(Decoding_errorerr)endtypeinternal_store={index:Header_index.t;fd:Lwt_unix.file_descr;index_path:string;data_path:string;}typegc_status=|No_gc|Ongoingof{tmp_store:internal_store;promise:unitLwt.t}(** In order to periodically clean up the store with the {!gc} function, each
store is split in multiple stores: one fresh store and multiple stale
stores (zero if no GC has ever occurred, one if the last GC was
successful, two if the GC is ongoing, and more in case some GC failed).
Adding new information to the store is always done on the fresh store
whereas queries are done on both the fresh and stale stores.
A gc operation starts by moving the fresh store to the stale list and
creates a new fresh store. The rest of the gc operation consists in,
asynchronously, merging the stale stores while removing data. See {!gc}
for more details.
*)type_t={mutablefresh:internal_store;mutablestales:internal_storelist;scheduler:Lwt_idle_waiter.t;readonly:bool;index_buffer_size:int;path:string;cache:(V.t*V.Header.t,tztrace)Cache.t;mutablegc_status:gc_status;}letinternal_stores?(only_stale=false)store=ifonly_stalethenstore.staleselsestore.fresh::store.stalesletunsafe_memstorek=List.exists(funs->Header_index.mems.indexk)(internal_storesstore)letmemstorekey=letopenLwt_result_syntaxintrace(Cannot_read_from_storeIHeader.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->letcached=Cache.bindstore.cachekey(funv->Lwt.return(Result.is_okv))inlet*!cached=Option.valuecached~default:Lwt.return_falseinreturn(cached||unsafe_memstorekey)letfind_indexik=trySome(Header_index.findik)withNot_found->Noneletunsafe_find_header?only_stalestorek=List.find_map(funs->Option.map(funh->(h,s))(tryfind_indexs.indexkwithe->Format.kasprintfStdlib.failwith"cannot access index %s : %s"s.index_path(Printexc.to_stringe)))(internal_stores?only_stalestore)letheaderstorekey=letopenLwt_result_syntaxintrace(Cannot_read_from_storeIHeader.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->letcached=Cache.bindstore.cachekey@@function|Ok(_value,header)->returnheader|Error_ase->Lwt.returneinmatchcachedwith|Someheader->Lwt_result.mapOption.someheader|None->(matchunsafe_find_headerstorekeywith|None->return_none|Some({header;_},_store)->return_someheader)letunsafe_read_from_disk_opt?only_stalestorekey=letopenLwt_result_syntaxinmatchunsafe_find_header?only_stalestorekeywith|None->return_none|Some({IHeader.offset;header},internal_store)->let+value,_ofs=Values_file.pread_valueinternal_store.fd~file_offset:offsetinSome(value,header)letunsafe_read_from_diskstorekey=letopenLwt_result_syntaxinlet*r=unsafe_read_from_disk_optstorekeyinmatchrwithNone->tzfail(ExnNot_found)|Somer->returnrletreadstorekey=trace(Cannot_read_from_storeIHeader.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->letread_from_diskkey=unsafe_read_from_diskstorekeyinletopenLwt_result_syntaxinCache.bind_or_putstore.cachekeyread_from_disk@@function|Okvalue->return_somevalue|Error_->return_noneletlocked_write_valuestore~offset~value~key~header=trace_eval(fun()->Cannot_write_to_storeN.name)@@protect@@fun()->letopenLwt_result_syntaxinletvalue_bytes=Data_encoding.Binary.to_bytes_exnValues_file.encodingvalueinletvalue_length=Bytes.lengthvalue_bytesinlet*!()=Lwt_utils_unix.write_bytes~pos:0~len:value_lengthstore.fdvalue_bytesinHeader_index.replacestore.indexkey{offset;header};returnvalue_lengthletunsafe_append_internal?(flush=true)store~key~header~value=letopenLwt_result_syntaxinlet*!offset=Lwt_unix.lseekstore.fd0Unix.SEEK_ENDinlet*!_written_len=locked_write_valuestore~offset~value~key~headerinifflushthenHeader_index.flushstore.index;return_unitletappend?(flush=true)store~key~header~value=trace(Cannot_write_to_storeN.name)@@protect@@fun()->letopenLwt_result_syntaxinLwt_idle_waiter.force_idlestore.scheduler@@fun()->Cache.putstore.cachekey(return(value,header));unsafe_append_internal~flushstore.fresh~key~header~valueletstale_pathpathn=Filename.concatpath(string_of_intn)lettmp_pathpath=Filename.concatpath"tmp"letdata_pathpath=Filename.concatpath"data"letindex_pathpath=Filename.concatpath"index"letload_internal_store~index_buffer_size~readonlypath=letopenLwt_syntaxinletflag=ifreadonlythenUnix.O_RDONLYelseUnix.O_RDWRinlet*()=Lwt_utils_unix.create_dirpathinletdata_path=data_pathpathinlet*fd=Lwt_unix.openfiledata_path[Unix.O_CREAT;O_CLOEXEC;flag]0o644inletindex_path=index_pathpathinletindex=Header_index.v~log_size:index_buffer_size~readonlyindex_pathinreturn{index;fd;index_path;data_path}letload(typea)~path~index_buffer_size~cache_size(mode:amode):attzresultLwt.t=letopenLwt_result_syntaxintrace(Cannot_load_store{name=N.name;path})@@protect@@fun()->letreadonly=matchmodewithRead_only->true|Read_write->falseinlet*!fresh=load_internal_store~index_buffer_size~readonlypathin(* Loading stale stores if they exist on disk (stale stores are created by
GC operations). *)letrecload_stalesaccn=letopenLwt_syntaxinletstale_path=stale_pathpathninlet*!exists=Lwt_unix.file_existsstale_pathinifexiststhenlet*!stale=load_internal_store~index_buffer_size~readonly:truestale_pathinload_stales(stale::acc)(n+1)elsereturnaccinlet*!stales=load_stales[]1inletscheduler=Lwt_idle_waiter.create()inletcache=Cache.createcache_sizeinreturn{fresh;stales;scheduler;readonly;index_buffer_size;path;cache;gc_status=No_gc;}letclose_internal_storestore=(tryHeader_index.closestore.indexwithIndex.Closed->());Lwt_utils_unix.safe_closestore.fdletmv_internal_storestoreinternal_storedest=letopenLwt_result_syntaxinlet*()=close_internal_storeinternal_storeinlet*!()=Lwt_utils_unix.create_dirdestinlet*!()=Lwt_unix.renameinternal_store.index_path(index_pathdest)inlet*!()=Lwt_unix.renameinternal_store.data_path(data_pathdest)inlet*!new_store=load_internal_store~index_buffer_size:store.index_buffer_size~readonly:store.readonlydestinreturnnew_storeletrm_internal_storestore=letpath=Filename.dirnamestore.index_pathinletopenLwt_result_syntaxinlet*()=close_internal_storestoreinassert(path=Filename.dirnamestore.data_path);let*!()=Lwt_utils_unix.remove_dirpathinreturn_unitletclosestore=protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->letopenLwt_result_syntaxin(matchstore.gc_statuswith|Ongoing{promise;_}->Lwt.cancelpromise|No_gc->());let*()=close_internal_storestore.freshand*()=List.iter_epclose_internal_storestore.stalesand*()=matchstore.gc_statuswith|No_gc->return_unit|Ongoing{tmp_store;_}->rm_internal_storetmp_storeinreturn_unitletreadonlyx=(x:>[`Read]t)(** A gc is initiated by moving the fresh store to the stale list and creating
a new fresh store, as well as creating a temporary store for the result of
the gc. *)letinitiate_gcstore=letopenLwt_result_syntaxinLwt_idle_waiter.force_idlestore.scheduler@@fun()->let*!()=Store_events.starting_gcN.nameinlettmp_store_path=tmp_pathstore.pathinletnew_stale_path=stale_pathstore.path(List.lengthstore.stales+1)inlet*new_store_stale=mv_internal_storestorestore.freshnew_stale_pathinlet*!new_store_fresh=load_internal_store~index_buffer_size:store.index_buffer_size~readonly:store.readonlystore.pathinlet*!tmp_store=load_internal_store~index_buffer_size:store.index_buffer_size~readonly:falsetmp_store_pathin(* Clear temporary store in case there are leftovers from a dirtily aborted
previous gc. *)Header_index.cleartmp_store.index;let*!()=Lwt_unix.ftruncatetmp_store.fd0instore.fresh<-new_store_fresh;store.stales<-new_store_stale::store.stales;letpromise,resolve=Lwt.task()instore.gc_status<-Ongoing{tmp_store;promise};return(tmp_store,promise,resolve)(** If a gc operation fails, reverting simply consists in removing the
temporary store. We keep the two stale stores as is, they will be merged
by the next successful gc. *)letrevert_failed_gcstore=letopenLwt_syntaxinmatchstore.gc_statuswith|No_gc->return_unit|Ongoing{tmp_store;promise}->(Lwt.cancelpromise;let+res=rm_internal_storetmp_storeinmatchreswith|Ok()->()|Error_e->(* ignore error when reverting *)())(** When the gc operation finishes, i.e. we have copied all elements to retain
to the temporary store, we can replace all the stale stores by the
temporary one. *)letfinalize_gcstoretmp_store=letopenLwt_result_syntaxinprotect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->let*()=List.iter_esrm_internal_storestore.stalesinletstale_path=stale_pathstore.path1inlet*store_stale=mv_internal_storestoretmp_storestale_pathinstore.stales<-[store_stale];store.gc_status<-No_gc;Cache.clearstore.cache;let*!()=Store_events.finished_gcN.nameinreturn_unit(** Returns the elements of an index in an unspecified order. NOTE: the elements
are stored in memory. *)letindex_header_bindingsi=letacc=ref[]inHeader_index.iter(funkv->acc:=(k,v)::!acc)i;!acc(** The background task for a gc operation consists in copying all items from
the stale stores of [store] that satisfy [filter] to the temporary one
[tmp_store]. While this is happening, the stale stores can still be
queried and new bindings can still be added to the store because only the
fresh store is modified. *)letgc_background_taskstoretmp_storefilterresolve=letopenLwt_result_syntaxinLwt.dont_wait(fun()->let*!res=trace(Gc_failedN.name)@@protect@@fun()->letprocess_key_headerinternal_store(key,{IHeader.offset;header})=let*value,_ofs=Values_file.pread_valueinternal_store.fd~file_offset:offsetinlet*keep=filterkeyheadervalueinifkeepthenunsafe_append_internaltmp_store~key~header~valueelsereturn_unitinletprocess_internal_storeinternal_store=List.iter_es(process_key_headerinternal_store)(index_header_bindingsinternal_store.index)inletstale_internal_stores=internal_stores~only_stale:truestore|>List.revinlet*()=List.iter_esprocess_internal_storestale_internal_storesinfinalize_gcstoretmp_storeinlet*!()=letopenLwt_syntaxinmatchreswith|Ok()->return_unit|Errorerror->let*()=Store_events.failed_gcN.nameerrorinrevert_failed_gcstoreinLwt.wakeup_laterresolve();Lwt.return_unit)(funexn->Format.eprintf"Reverting GC for store %s failed because %s@."N.name(Printexc.to_stringexn))letgc_internal~asyncstorefilter=letopenLwt_result_syntaxinmatchstore.gc_statuswith|Ongoing_->let*!()=Store_events.ignore_gcN.nameinreturn_unit|No_gc->let*tmp_store,promise,resolve=initiate_gcstoreingc_background_taskstoretmp_storefilterresolve;ifasyncthenreturn_unitelseLwt.catch(fun()->let*!()=promiseinreturn_unit)(functionLwt.Canceled->return_unit|e->raisee)letgc?(async=true)storefilter=trace(Gc_failedN.name)@@gc_internal~asyncstorefilterletwait_gc_completionstore=matchstore.gc_statuswith|No_gc->Lwt.return_unit|Ongoing{promise;_}->Lwt.catch(fun()->promise)(functionLwt.Canceled->Lwt.return_unit|e->raisee)letis_gc_finishedstore=matchstore.gc_statuswithNo_gc->true|Ongoing_->falseendmoduleMake_simple_indexed_file(N:NAME)(K:INDEX_KEY)(V:sigincludeENCODABLE_VALUE_HEADERvalheader:t->Header.tend)=structincludeMake_indexed_file(N)(K)(V)letappend?flushstore~key~value=append?flushstore~key~value~header:(V.headervalue)end