123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openStore_sigsopenStore_errors(* Helper functions to copy byte sequences or integers in [src] to another byte
sequence [dst] at offset [offset], with named arguments to avoid
confusion. These functions return the offset in the destination at which to
copy the more data. *)letblit~src~dstoffset=letlen=Bytes.lengthsrcinBytes.blitsrc0dstoffsetlen;offset+lenletbytes_set_int64~src~dstoffset=Bytes.set_int64_bedstoffsetsrc;offset+8letbytes_set_int8~src~dstoffset=Bytes.set_int8dstoffsetsrc;offset+1(* Helper functions to read data (strings with a decoding function, or integers)
from a binary string. These functions return, as the second component, the
offset in the string at which to read more data. *)letread_int64stroffset=leti=TzEndian.get_int64_stringstroffsetin(i,offset+8)letread_int8stroffset=leti=TzEndian.get_int8_stringstroffsetin(i,offset+1)(* Functors to build stores on indexes *)moduletypeNAME=sigvalname:stringendmoduletypeSINGLETON_STORE=sigtype+'attypevaluevalload:path:string->'amode->'attzresultLwt.tvalread:[>`Read]t->valueoptiontzresultLwt.tvalwrite:[>`Write]t->value->unittzresultLwt.tvaldelete:[>`Write]t->unittzresultLwt.tvalreadonly:[>`Read]t->[`Read]tendmoduletypeINDEXABLE_STORE=sigtype+'attypekeytypevaluevalload:path:string->'amode->'attzresultLwt.tvalmem:[>`Read]t->key->booltzresultLwt.tvalfind:[>`Read]t->key->valueoptiontzresultLwt.tvaladd:?flush:bool->[>`Write]t->key->value->unittzresultLwt.tvalclose:_t->unittzresultLwt.tvalreadonly:[>`Read]t->[`Read]tendmoduletypeINDEXABLE_REMOVABLE_STORE=sigincludeINDEXABLE_STOREvalremove:?flush:bool->[>`Write]t->key->unittzresultLwt.tendmoduletypeINDEXED_FILE=sigtype+'attypekeytypeheadertypevaluevalmem:[>`Read]t->key->booltzresultLwt.tvalheader:[>`Read]t->key->headeroptiontzresultLwt.tvalread:[>`Read]t->key->(value*header)optiontzresultLwt.tvalappend:?flush:bool->[>`Write]t->key:key->header:header->value:value->unittzresultLwt.tvalload:path:string->cache_size:int->'amode->'attzresultLwt.tvalclose:_t->unittzresultLwt.tvalreadonly:[>`Read]t->[`Read]tendmoduletypeSIMPLE_INDEXED_FILE=sigincludeINDEXED_FILEvalappend:?flush:bool->[>`Write]t->key:key->value:value->unittzresultLwt.tendmoduletypeENCODABLE_VALUE=sigtypetvalname:stringvalencoding:tData_encoding.tendmoduletypeFIXED_ENCODABLE_VALUE=sigincludeENCODABLE_VALUEvalfixed_size:intendmoduletypeENCODABLE_VALUE_HEADER=sigincludeENCODABLE_VALUEmoduleHeader:FIXED_ENCODABLE_VALUEendmoduleMake_fixed_encodable(V:ENCODABLE_VALUE):FIXED_ENCODABLE_VALUEwithtypet=V.t=structincludeVletfixed_size=matchData_encoding.Binary.fixed_lengthencodingwith|None->Stdlib.invalid_arg(name^" encoding is not fixed size")|Somesize->sizeendmoduleMake_index_value(E:FIXED_ENCODABLE_VALUE):Index.Value.Swithtypet=E.t=structtypet=E.tletencoded_size=E.fixed_sizeletencodev=Data_encoding.Binary.to_string_exn~buffer_size:encoded_sizeE.encodingvletdecodebufoffset=let_read_bytes,v=Data_encoding.Binary.read_exnE.encodingbufoffsetencoded_sizeinv(* The {!Repr.t} value is only used for pretty printing in {!Index} so this is
fine. *)lett=Repr.mapRepr.string(funs->decodes0)encodeendmoduleMake_index_key(E:sigincludeFIXED_ENCODABLE_VALUEvalequal:t->t->boolend):Index.Key.Swithtypet=E.t=structincludeMake_index_value(E)letequal=E.equallethashv=Stdlib.Hashtbl.hash(encodev)(* {!Stdlib.Hashtbl.hash} is 30 bits *)lethash_size=30(* in bits *)endmoduleMake_indexable(N:NAME)(K:Index.Key.S)(V:Index.Value.S)=structmoduleI=Index_unix.Make(K)(V)(Index.Cache.Unbounded)type_t={index:I.t;scheduler:Lwt_idle_waiter.t}(* TODO: https://gitlab.com/tezos/tezos/-/issues/4654
Make log size constant configurable. *)letlog_size=10_000letmemstorek=letopenLwt_result_syntaxintrace(Cannot_read_from_storeN.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->return(I.memstore.indexk)letfindstorek=letopenLwt_result_syntaxintrace(Cannot_read_from_storeN.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->letv=trySome(I.findstore.indexk)withNot_found->Noneinreturnvletadd?(flush=true)storekv=letopenLwt_result_syntaxintrace(Cannot_write_to_storeN.name)@@protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->I.replacestore.indexkv;ifflushthenI.flushstore.index;return_unitletload(typea)~path(mode:amode):attzresultLwt.t=letopenLwt_result_syntaxintrace(Cannot_load_store{name=N.name;path})@@protect@@fun()->let*!()=Lwt_utils_unix.create_dir(Filename.dirnamepath)inletreadonly=matchmodewithRead_only->true|Read_write->falseinletindex=I.v~log_size~readonlypathinletscheduler=Lwt_idle_waiter.create()inreturn{index;scheduler}letclosestore=letopenLwt_result_syntaxinprotect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->(tryI.closestore.indexwithIndex.Closed->());return_unitletreadonlyx=(x:>[`Read]t)endmoduleMake_indexable_removable(N:NAME)(K:Index.Key.S)(V:Index.Value.S)=structmoduleV_opt=struct(* The values stored in the index are optional values. When we "remove" a
key from the store, we're not really removing it from the index, but
simply setting its association to [None] (encoded with zero bytes here).
*)typet=V.toptionlett=Repr.optionV.tletencoded_size=1+V.encoded_sizeletencodev=letdst=Bytes.createencoded_sizeinlettag,value_bytes=matchvwith|None->(0,Bytes.makeV.encoded_size'\000')|Somev->(1,V.encodev|>Bytes.unsafe_of_string)inletoffset=bytes_set_int8~dst~src:tag0inlet_=blit~src:value_bytes~dstoffsetinBytes.unsafe_to_stringdstletdecodestroffset=lettag,offset=read_int8stroffsetinmatchtagwith|0->None|1->letvalue=V.decodestroffsetinSomevalue|_->assertfalseendincludeMake_indexable(N)(K)(V_opt)letfindstorek=letopenLwt_result_syntaxinlet+v=findstorekinmatchvwithNone|SomeNone->None|Some(Somev)->Somevletmemstorehash=letopenLwt_result_syntaxinlet+b=findstorehashinOption.is_somebletadd?flushstorekv=add?flushstorek(Somev)letremove?(flush=true)storek=letopenLwt_result_syntaxintrace(Cannot_write_to_storeN.name)@@protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->letexists=I.memstore.indexkinifnotexiststhenreturn_unitelse(I.replacestore.indexkNone;ifflushthenI.flushstore.index;return_unit)endmoduleMake_singleton(S:sigtypetvalname:stringvalencoding:tData_encoding.tend):SINGLETON_STOREwithtypevalue:=S.t=structtype+'at={file:string;mutablecache:S.toptionoption}letread_diskstore=letopenLwt_result_syntaxintrace(Cannot_read_from_storeS.name)@@protect@@fun()->let*!exists=Lwt_unix.file_existsstore.fileinmatchexistswith|false->return_none|true->(Lwt_io.with_file~flags:[Unix.O_RDONLY;O_CLOEXEC]~mode:Inputstore.file@@funchannel->let*!raw_data=Lwt_io.readchannelinletdata=Data_encoding.Binary.of_stringS.encodingraw_datainmatchdatawith|Okdata->return_somedata|Errorerr->tzfail(Decoding_errorerr))letreadstore=letopenLwt_result_syntaxinmatchstore.cachewithSomev->returnv|None->read_diskstoreletwrite_diskstorex=letopenLwt_result_syntaxintrace(Cannot_write_to_storeS.name)@@let*!res=Lwt_utils_unix.with_atomic_open_out~overwrite:truestore.file@@funfd->letblock_bytes=Data_encoding.Binary.to_bytes_exnS.encodingxinLwt_utils_unix.write_bytesfdblock_bytesinResult.bind_errorresLwt_utils_unix.tzfail_of_io_error|>Lwt.returnletwritestorex=letopenLwt_result_syntaxinlet+()=write_diskstorexinstore.cache<-Some(Somex)letdelete_diskstore=letopenLwt_result_syntaxintrace(Cannot_write_to_storeS.name)@@protect@@fun()->let*!exists=Lwt_unix.file_existsstore.fileinmatchexistswith|false->return_unit|true->let*!()=Lwt_unix.unlinkstore.fileinreturn_unitletdeletestore=letopenLwt_result_syntaxinlet+()=delete_diskstoreinstore.cache<-SomeNoneletload~path_mode=letopenLwt_result_syntaxintrace(Cannot_load_store{name=S.name;path})@@protect@@fun()->let*!()=Lwt_utils_unix.create_dir(Filename.dirnamepath)inreturn{file=path;cache=None}letreadonlyx=(x:>[`Read]t)endmoduleMake_indexed_file(N:NAME)(K:Index.Key.S)(V:ENCODABLE_VALUE_HEADER)=structmoduleCache=Aches_lwt.Lache.Make_result(Aches.Rache.Transfer(Aches.Rache.LRU)(K))moduleRaw_header=Make_index_value(V.Header)moduleIHeader=structletname=N.name^".header"typet={offset:int;header:V.Header.t}letencoded_size=8(* offset *)+Raw_header.encoded_sizelett=letopenReprinmap(pairintRaw_header.t)(fun(offset,header)->{offset;header})(fun{offset;header}->(offset,header))letencodev=letdst=Bytes.createencoded_sizeinletoffset=bytes_set_int64~src:(Int64.of_intv.offset)~dst0inlet_offset=blit~src:(Raw_header.encodev.header|>String.to_bytes)~dstoffsetinBytes.unsafe_to_stringdstletdecodestroffset=letfile_offset,offset=read_int64stroffsetinletheader=Raw_header.decodestroffsetin{offset=Int64.to_intfile_offset;header}endmoduleHeader_index=Index_unix.Make(K)(IHeader)(Index.Cache.Unbounded)moduleValues_file=structletencoding=Data_encoding.dynamic_size~kind:`Uint30V.encodingletpread_valuefd~file_offset=letopenLwt_result_syntaxintrace(Cannot_read_from_storeN.name)@@protect@@fun()->(* Read length *)letlength_bytes=Bytes.create4inlet*!()=Lwt_utils_unix.read_bytes~file_offset~pos:0~len:4fdlength_bytesinletvalue_length_int32=Bytes.get_int32_belength_bytes0inletvalue_length=Int32.to_intvalue_length_int32inletvalue_bytes=Bytes.extendlength_bytes0value_lengthinlet*!()=Lwt_utils_unix.read_bytes~file_offset:(file_offset+4)~pos:4~len:value_lengthfdvalue_bytesinmatchData_encoding.Binary.of_bytesencodingvalue_byteswith|Okvalue->return(value,4+value_length)|Errorerr->tzfail(Decoding_errorerr)endtype+'at={index:Header_index.t;fd:Lwt_unix.file_descr;scheduler:Lwt_idle_waiter.t;cache:(V.t*V.Header.t,tztrace)Cache.t;}(* The log_size corresponds to the maximum size of the memory zone
allocated in memory before flushing it onto the disk. It is
basically a cache which is use for the index. The cache size is
`log_size * log_entry` where a `log_entry` is roughly 56 bytes. *)(* TODO: https://gitlab.com/tezos/tezos/-/issues/4654
Make log size constant configurable. *)letblocks_log_size=10_000letmemstorekey=letopenLwt_result_syntaxintrace(Cannot_read_from_storeIHeader.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->letcached=Cache.bindstore.cachekey(funv->Lwt.return(Result.is_okv))inlet*!cached=Option.valuecached~default:Lwt.return_falseinreturn(cached||Header_index.memstore.indexkey)letheaderstorekey=letopenLwt_result_syntaxintrace(Cannot_read_from_storeIHeader.name)@@protect@@fun()->Lwt_idle_waiter.taskstore.scheduler@@fun()->letcached=Cache.bindstore.cachekey@@function|Ok(_value,header)->returnheader|Error_ase->Lwt.returneinmatchcachedwith|Someheader->Lwt_result.mapOption.someheader|None->(matchHeader_index.findstore.indexkeywith|exceptionNot_found->return_none|{header;_}->return_someheader)letreadstorekey=Lwt_idle_waiter.taskstore.scheduler@@fun()->letread_from_diskkey=letopenLwt_result_syntaxinmatchHeader_index.findstore.indexkeywith|exceptionNot_found->tzfail(ExnNot_found)|{IHeader.offset;header}->let+value,_ofs=Values_file.pread_valuestore.fd~file_offset:offsetin(value,header)inletopenLwt_result_syntaxinCache.bind_or_putstore.cachekeyread_from_disk@@function|Okvalue->return_somevalue|Error_->return_noneletlocked_write_valuestore~offset~value~key~header=trace_eval(fun()->Cannot_write_to_storeN.name)@@protect@@fun()->letopenLwt_result_syntaxinletvalue_bytes=Data_encoding.Binary.to_bytes_exnValues_file.encodingvalueinletvalue_length=Bytes.lengthvalue_bytesinlet*!()=Lwt_utils_unix.write_bytes~pos:0~len:value_lengthstore.fdvalue_bytesinHeader_index.replacestore.indexkey{offset;header};returnvalue_lengthletappend?(flush=true)store~key~header~(value:V.t)=trace(Cannot_write_to_storeN.name)@@protect@@fun()->letopenLwt_result_syntaxinLwt_idle_waiter.force_idlestore.scheduler@@fun()->Cache.putstore.cachekey(return(value,header));let*!offset=Lwt_unix.lseekstore.fd0Unix.SEEK_ENDinlet*!_written_len=locked_write_valuestore~offset~value~key~headerinifflushthenHeader_index.flushstore.index;return_unitletload(typea)~path~cache_size(mode:amode):attzresultLwt.t=letopenLwt_result_syntaxintrace(Cannot_load_store{name=N.name;path})@@protect@@fun()->let*!()=Lwt_utils_unix.create_dirpathinletreadonly=matchmodewithRead_only->true|Read_write->falseinletflag=ifreadonlythenUnix.O_RDONLYelseUnix.O_RDWRinlet*!fd=Lwt_unix.openfile(Filename.concatpath"data")[Unix.O_CREAT;O_CLOEXEC;flag]0o644inletindex=Header_index.v~log_size:blocks_log_size~readonly(Filename.concatpath"index")inletscheduler=Lwt_idle_waiter.create()inletcache=Cache.createcache_sizeinreturn{index;fd;scheduler;cache}letclosestore=protect@@fun()->Lwt_idle_waiter.force_idlestore.scheduler@@fun()->(tryHeader_index.closestore.indexwithIndex.Closed->());Lwt_utils_unix.safe_closestore.fdletreadonlyx=(x:>[`Read]t)endmoduleMake_simple_indexed_file(N:NAME)(K:Index.Key.S)(V:sigincludeENCODABLE_VALUE_HEADERvalheader:t->Header.tend)=structincludeMake_indexed_file(N)(K)(V)letappend?flushstore~key~value=append?flushstore~key~value~header:(V.headervalue)end