123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openError_monadtypeerror+=Missing_stored_kvs_dataofstring*intlet()=register_error_kind`Permanent~id:"stdlib_unix.missing_kvs_data"~title:"Missing stored data from KVS"~description:"Failed to load stored data from KVS"~pp:(funppf(path,index)->Format.fprintfppf"Failed to load on-disk data: no corresponding data found in file %s \
at index %d."pathindex)Data_encoding.(obj2(req"path"string)(req"index"int31))(function|Missing_stored_kvs_data(path,index)->Some(path,index)|_->None)(fun(path,index)->Missing_stored_kvs_data(path,index))type('file,'value)directory_spec={encoding:'valueData_encoding.t;eq:'value->'value->bool;index_of:'file->int;path:string;value_size:int;}(** [Directories] handle writing and reading virtual files to virtual directories.
A virtual directory is backed by a physical file and a virtual file is an offset
in a virtual directory.
Besides implementing a key-value store, the module [Directories] must properly
handle resource utilization, especially file descriptors.
The structure [Directories.t] guarantees that no more than the specified
[lru_size] file descriptors can be open at the same time.
*)moduleDirectories:sigtype'valuetvalinit:lru_size:int->'valuetvalclose:'valuet->unitLwt.tvalwrite:?override:bool->'valuet->('b,'value)directory_spec->'b->'value->unittzresultLwt.tvalread:'valuet->('b,'value)directory_spec->'b->'valuetzresultLwt.tend=structmoduleLRU=Ringo.LRU_CollectionmoduleTable=Hashtbl.Make(structincludeStringlethash=Hashtbl.hashend)moduleFile_table=Hashtbl.Make(structtypet=intletequal=Int.equallethash=Hashtbl.hashend)letmax_number_of_files=4096(* TODO: https://gitlab.com/tezos/tezos/-/issues/6033
For now the bitset is a byte set...
With a true bitset, we'd have [max_number_of_files/8] *)letbitset_size=max_number_of_filestypehandle={fd:Lwt_unix.file_descr;bitset:Lwt_bytes.t}letfile_existshandleindex=handle.bitset.{index}<>'\000'letset_file_existshandleindex=handle.bitset.{index}<-'\001'letinitialize_virtual_directorypathvalue_size=(* We perform the initialization synchronously to avoid spurious Lwt
premption slowing down writing shards. The execution time of the
code below should be on the order of a few tenth of a millisecond
on a Linux system. *)letfd=Unix.openfilepath[O_RDWR;O_CREAT;O_EXCL;O_CLOEXEC]0o660inlettotal_size=bitset_size+(max_number_of_files*value_size)intryUnix.ftruncatefdtotal_size;letbitset=Lwt_bytes.map_file~fd~shared:true~size:bitset_size()inletfd=Lwt_unix.of_unix_file_descr~blocking:truefdinLwt.return{fd;bitset}withUnix.Unix_error_ase->Unix.unlinkpath;raiseeletload_virtual_directorypath=letopenLwt_syntaxinlet*fd=Lwt_unix.openfilepath[O_RDWR;O_CLOEXEC]0o660in(* TODO: https://gitlab.com/tezos/tezos/-/issues/6033
Should we check that the file is at least as big as the bitset? *)letbitset=Lwt_bytes.map_file~fd:(Lwt_unix.unix_file_descrfd)~shared:true~size:bitset_size()inreturn{fd;bitset}letclose_virtual_directoryhandle=Lwt_unix.closehandle.fdtype'valuehandle_and_pending_callbacks=|Entryof{handle:handleLwt.t;accessed:Lwt_mutex.tFile_table.t;cached:'valueFile_table.t;(* TODO: https://gitlab.com/tezos/tezos/-/issues/6033
Should we use a weak table to automatically collect dangling promises?
Note that we do clear resolved promises each time we grow this list. *)mutablepending_callbacks:unitLwt.tlist;}|Being_evictedofunitLwt.tletkeep_pendingl=List.filter(funp->matchLwt.statepwithReturn()|Fail_->false|Sleep->true)l(* The type of directories.
The domains of [handles] and [lru] should be the same, before and after
calling the functions [write] and [read] in this module.
*)type'valuet={handles:'valuehandle_and_pending_callbacksTable.t;lru:stringLRU.t;}letinit~lru_size=lethandles=Table.create101inletlru=LRU.createlru_sizein{handles;lru}letclose{handles;_}=letopenLwt_syntaxinTable.iter_p(fun_entry->matchentrywith|Being_evictedp->p|Entry{handle;pending_callbacks=_;accessed=_;cached=_}->(* TODO https://gitlab.com/tezos/tezos/-/issues/6033
Should we lock access to [accessed]; then lock on
all mutex in [accessed], then close? This would ensure that we wait until
all pending callbacks terminate. *)let*handleinlet*()=Lwt_unix.fsynchandle.fdinLwt_unix.closehandle.fd)handlesletresolve_pending_and_closedirsremoved=letopenLwt_syntaxinletawait_close,resolve_close=Lwt.task()inmatchTable.finddirs.handlesremovedwith|None->assertfalse|Some(Being_evicted_)->assertfalse|Some(Entry{handle;accessed=_;cached=_;pending_callbacks})->Table.replacedirs.handlesremoved(Being_evictedawait_close);let*handleand*()=Lwt.joinpending_callbacksinlet+()=close_virtual_directoryhandleinTable.removedirs.handlesremoved;Lwt.wakeupresolve_close();()letwith_mutexaccessedfilef=matchFile_table.findaccessedfilewith|None->letmutex=Lwt_mutex.create()inFile_table.addaccessedfilemutex;Lwt_mutex.with_lockmutexf|Somemutex->Lwt_mutex.with_lockmutexfletrecbind_dir_and_lock_filedirsspecindexf=(* Precondition: the LRU and the table are in sync *)letopenLwt_syntaxinletload_or_initialize()=let*b=Lwt_unix.file_existsspec.pathinifbthenload_virtual_directoryspec.pathelseinitialize_virtual_directoryspec.pathspec.value_sizeinletput_then_bind()=(* Precondition: [spec.path] not in [dirs.handle] *)let_node,erased_opt=LRU.add_and_return_eraseddirs.lruspec.pathin(* Here, [spec.path] is in the LRU but not in the table yet.
But:
- all executions from this point are cooperation-point-free
until the insertion of [spec.path] in the table
It follows that this temporary discrepancy is not observable.
Same observation holds in the other direction if [erased_opt = Some erased].
*)lethandle=matcherased_optwith|None->load_or_initialize()|Someremoved->let*()=resolve_pending_and_closedirsremovedinload_or_initialize()inletaccessed=File_table.create3inletcached=File_table.create3inletcallback=with_mutexaccessedindex(fun()->Lwt.bindhandle(fcached))inTable.replacedirs.handlesspec.path(Entry{handle;accessed;cached;pending_callbacks=[Lwt.mapignorecallback];});callbackinmatchTable.finddirs.handlesspec.pathwith|Some(Entryp)->letpromise=with_mutexp.accessedindex(fun()->Lwt.bindp.handle(fp.cached))inp.pending_callbacks<-keep_pending(Lwt.mapignorepromise::p.pending_callbacks);promise|Some(Being_evictedawait_eviction)->let*()=await_evictionin(* We can't directly [put_and_bind] because several threads may be
waiting here. *)bind_dir_and_lock_filedirsspecindexf|None->put_then_bind()letwrite?(override=false)dirsspecfiledata=letopenLwt_result_syntaxinletindex=spec.index_offileinbind_dir_and_lock_filedirsspecindex@@funcachedhandle->letperform_write()=letpos=Int64.of_int(bitset_size+(index*spec.value_size))inletmmap=Lwt_bytes.map_file~fd:(Lwt_unix.unix_file_descrhandle.fd)~pos~size:spec.value_size~shared:true()inletbytes=Data_encoding.Binary.to_bytes_exnspec.encodingdatainifBytes.lengthbytes<>spec.value_sizethenfailwith"Key_value_store.write: encoded value does not respect specified size"else(Lwt_bytes.blit_from_bytesbytes0mmap0(Bytes.lengthbytes);set_file_existshandleindex;return_unit)inifnot(file_existshandleindex)then(assert(not(File_table.memcachedindex));perform_write())elseifoverridethenmatchFile_table.findcachedindexwith|None->File_table.addcachedindexdata;perform_write()|Somecached->ifspec.eqcacheddatathenreturn_unitelseperform_write()elsereturn_unitletreaddirsspecfile=letopenLwt_result_syntaxinletindex=spec.index_offileinbind_dir_and_lock_filedirsspecindex@@funcachedhandle->iffile_existshandleindexthenmatchFile_table.findcachedindexwith|None->(* Note that the following code executes atomically Lwt-wise. *)letpos=Int64.of_int(bitset_size+(index*spec.value_size))inletmmap=Lwt_bytes.map_file~fd:(Lwt_unix.unix_file_descrhandle.fd)~pos~size:spec.value_size~shared:true()inletbytes=Bytes.makespec.value_size'\000'inLwt_bytes.blit_to_bytesmmap0bytes0spec.value_size;letdata=Data_encoding.Binary.of_bytes_exnspec.encodingbytesinFile_table.addcachedindexdata;returndata|Somev->returnvelsetzfail(Missing_stored_kvs_data(spec.path,index))endtype('dir,'file,'value)t=|E:{directory_of:'dir->('file,'value)directory_spec;directories:'valueDirectories.t;}->('dir,'file,'value)tletdirectory?encoded_value_sizeencodingpatheqindex_of=matchencoded_value_sizewith|Somevalue_size->{path;eq;encoding;index_of;value_size}|None->(matchData_encoding.classifyencodingwith|`Fixedvalue_size->{path;eq;encoding;index_of;value_size}|`Dynamic|`Variable->invalid_arg"Key_value_store.directory: encoding does not have fixed size")(* FIXME https://gitlab.com/tezos/tezos/-/issues/4643
The reason why there are two LRUs and not one, is that in the case
of concurrent reads and writes, the LRU cannot prevent the absence
of race. To prevent that we use two LRUs to be able to discriminate
between the various concurrent accesses. In particular, while
reading a value, we want to wait if there is a write in
progress. Vice versa, if a read fails, we don't want to make the
next write to fail.
In practice, there should not be a duplication in memory of the
values read since values are shared. *)letinit~lru_sizedirectory_of=letdirectories=Directories.init~lru_sizeinE{directory_of;directories}letclose(E{directories;_})=Directories.closedirectoriesletwrite_value:typedirfilevalue.?override:bool->(dir,file,value)t->dir->file->value->unittzresultLwt.t=fun?override(E{directories;directory_of})dirfilevalue->letdir=directory_ofdirinDirectories.write?overridedirectoriesdirfilevalueletread_value:typedirfilevalue.(dir,file,value)t->dir->file->valuetzresultLwt.t=fun(E{directories;directory_of})dirfile->letdir=directory_ofdirinDirectories.readdirectoriesdirfileletwrite_values?overridetseq=Seq.ES.iter(fun(dir,file,value)->write_value?overridetdirfilevalue)seqletread_valuestseq=letopenLwt_syntaxinSeq_s.of_seqseq|>Seq_s.S.map(fun(dir,file)->let*maybe_value=read_valuetdirfileinreturn(dir,file,maybe_value))