123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803(* The MIT License
Copyright (c) 2019 Craig Ferguson <craig@tarides.com>
Thomas Gazagnaire <thomas@tarides.com>
Ioana Cristescu <ioana@tarides.com>
Clément Pascutto <clement@tarides.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software. *)moduleStats=StatsmoduletypeKey=sigtypetvalequal:t->t->boolvalhash:t->intvalhash_size:intvalencode:t->stringvalencoded_size:intvaldecode:string->int->tvalpp:tFmt.tendmoduletypeValue=sigtypetvalencode:t->stringvalencoded_size:intvaldecode:string->int->tvalpp:tFmt.tendmoduletypeIO=Io.SmoduletypeMUTEX=sigtypetvalcreate:unit->tvallock:t->unitvalunlock:t->unitvalwith_lock:t->(unit->'a)->'aendmoduletypeTHREAD=sigtypetvalasync:(unit->'a)->tvalawait:t->unitvalreturn:unit->tvalyield:unit->unitendmoduletypeS=sigtypettypekeytypevaluevalv:?fresh:bool->?readonly:bool->log_size:int->string->tvalclear:t->unitvalfind:t->key->valuevalmem:t->key->boolexceptionInvalid_key_sizeofkeyexceptionInvalid_value_sizeofvaluevalreplace:t->key->value->unitvalfilter:t->(key*value->bool)->unitvaliter:(key->value->unit)->t->unitvalflush:?with_fsync:bool->t->unitvalclose:t->unitendletmayf=functionNone->()|Somebf->fbfletassert_and_get=functionNone->assertfalse|Somee->eexceptionRO_not_allowedexceptionClosedmoduleMake_private(K:Key)(V:Value)(IO:IO)(Mutex:MUTEX)(Thread:THREAD)=structtypeasync=Thread.tletawait=Thread.awaittypekey=K.ttypevalue=V.ttypeentry={key:key;key_hash:int;value:value}letentry_size=K.encoded_size+V.encoded_sizeletentry_sizeL=Int64.of_intentry_sizeexceptionInvalid_key_sizeofkeyexceptionInvalid_value_sizeofvalueletappend_key_valueiokeyvalue=letencoded_key=K.encodekeyinletencoded_value=V.encodevalueinifString.lengthencoded_key<>K.encoded_sizethenraise(Invalid_key_sizekey);ifString.lengthencoded_value<>V.encoded_sizethenraise(Invalid_value_sizevalue);IO.appendioencoded_key;IO.appendioencoded_valueletdecode_entrybytesoff=letstring=Bytes.unsafe_to_stringbytesinletkey=K.decodestringoffinletvalue=V.decodestring(off+K.encoded_size)in{key;key_hash=K.hashkey;value}moduleTbl=Hashtbl.Make(K)typeconfig={log_size:int;readonly:bool;fresh:bool}typeindex={io:IO.t;fan_out:Fan.t}typelog={io:IO.t;mem:valueTbl.t}typeinstance={config:config;root:string;mutablegeneration:int64;mutableindex:indexoption;mutablelog:logoption;mutablelog_async:logoption;mutableopen_instances:int;writer_lock:IO.lockoption;mutablemerge_lock:Mutex.t;mutablerename_lock:Mutex.t;}typet=instanceoptionrefletcheck_opent=match!twithSomeinstance->instance|None->raiseClosedletcleart=lett=check_opentinLog.debug(funl->l"clear %S"t.root);ift.config.readonlythenraiseRO_not_allowed;t.generation<-0L;Mutex.with_lockt.merge_lock(fun()->letlog=assert_and_gett.loginIO.clearlog.io;Tbl.clearlog.mem;may(funl->IO.clearl.io;IO.closel.io)t.log_async;may(fun(i:index)->IO.cleari.io;IO.closei.io)t.index;t.index<-None;t.log_async<-None)letflush_instance?(with_fsync=false)instance=Log.debug(funl->l"[%s] flushing instance"(Filename.basenameinstance.root));ifinstance.config.readonlythenraiseRO_not_allowed;may(funlog->IO.sync~with_fsynclog.io)instance.log;may(funlog->IO.sync~with_fsynclog.io)instance.log_asyncletflush?(with_fsync=false)t=lett=check_opentinLog.info(funl->l"[%s] flush"(Filename.basenamet.root));Mutex.with_lockt.rename_lock(fun()->flush_instance~with_fsynct)let(//)=Filename.concatletindex_dirroot=root//"index"letlog_pathroot=index_dirroot//"log"letlog_async_pathroot=index_dirroot//"log_async"letindex_pathroot=index_dirroot//"data"letlock_pathroot=index_dirroot//"lock"letmerge_pathroot=index_dirroot//"merge"letpage_size=Int64.mulentry_sizeL1_000Lletiter_io_off?min:(min_off=0L)?max:max_offfio=letmax_off=matchmax_offwithNone->IO.offsetio|Somem->minletrecauxoffset=letremaining=Int64.submax_offoffsetinifremaining<=0Lthen()elseletlen=Int64.to_int(minremainingpage_size)inletraw=Bytes.createleninletn=IO.readio~off:offset~lenrawinletrecread_pagepageoff=ifoff=nthen()elseletentry=decode_entrypageoffinfInt64.(add(of_intoff)offset)entry;(read_page[@tailcall])page(off+entry_size)inread_pageraw0;(aux[@tailcall])Int64.(addoffsetpage_size)in(aux[@tailcall])min_offletiter_io?min?maxfio=iter_io_off?min?max(fun_e->fe)iomoduleEntry=structtypet=entrymoduleKey=KmoduleValue=Vletencoded_size=entry_sizeletdecode=decode_entryletto_keye=e.keyletto_valuee=e.valueendmoduleIOArray=Io_array.Make(IO)(Entry)moduleSearch=Search.Make(Entry)(IOArray)(structtypet=intmoduleEntry=Entryletcompare:int->int->int=compareletof_entrye=e.key_hashletof_key=K.hashletlinear_interpolate~low:(low_index,low_metric)~high:(high_index,high_metric)key_metric=letlow_in=float_of_intlow_metricinlethigh_in=float_of_inthigh_metricinlettarget_in=float_of_intkey_metricinletlow_out=Int64.to_floatlow_indexinlethigh_out=Int64.to_floathigh_indexin(* Fractional position of [target_in] along the line from [low_in] to [high_in] *)letproportion=(target_in-.low_in)/.(high_in-.low_in)in(* Convert fractional position to position in output space *)letposition=low_out+.(proportion*.(high_out-.low_out))inletrounded=ceil(position-.0.5)+.0.5inInt64.of_floatroundedend)letwith_cache~v~clear=letroots=Hashtbl.create0inletf?(fresh=false)?(readonly=false)~log_sizeroot=Log.info(funl->l"[%s] v fresh=%b readonly=%b log_size=%d"(Filename.basenameroot)freshreadonlylog_size);tryifnot(Sys.file_exists(index_dirroot))then(Log.debug(funl->l"[%s] does not exist anymore, cleaning up the fd cache"(Filename.basenameroot));Hashtbl.removeroots(root,true);Hashtbl.removeroots(root,false);raiseNot_found);lett=Hashtbl.findroots(root,readonly)inift.open_instances<>0then(Log.debug(funl->l"[%s] found in cache"(Filename.basenameroot));t.open_instances<-t.open_instances+1;lett=ref(Somet)iniffreshthencleart;t)else(Hashtbl.removeroots(root,readonly);raiseNot_found)withNot_found->letinstance=v~fresh~readonly~log_sizerootinHashtbl.addroots(root,readonly)instance;ref(Someinstance)in`Stagedfletv_no_cache~fresh~readonly~log_sizeroot=Log.debug(funl->l"[%s] not found in cache, creating a new instance"(Filename.basenameroot));letwriter_lock=ifnotreadonlythenSome(IO.lock(lock_pathroot))elseNoneinletconfig={log_size=log_size*entry_size;readonly;fresh}inletlog_path=log_pathrootinletlog=ifreadonlytheniffreshthenraiseRO_not_allowedelseNoneelseletio=IO.v~fresh~readonly~generation:0L~fan_size:0Llog_pathinletentries=Int64.div(IO.offsetio)entry_sizeLinLog.debug(funl->l"[%s] log file detected. Loading %Ld entries"(Filename.basenameroot)entries);letmem=Tbl.create(Int64.to_intentries)initer_io(fune->Tbl.replacememe.keye.value)io;Some{io;mem}inletlog_async_path=log_async_pathrootin(* If we are in readonly mode, the log_async will be read during sync_log so
there is no need to do it here. *)if(notreadonly)&&Sys.file_existslog_async_paththen(letio=IO.v~fresh~readonly:false~generation:0L~fan_size:0Llog_async_pathinletentries=Int64.div(IO.offsetio)entry_sizeLinLog.debug(funl->l"[%s] log_async file detected. Loading %Ld entries"(Filename.basenameroot)entries);(* If we are not in fresh mode, we move the contents of log_async to
log. *)ifnotfreshthenmay(funlog->iter_io(fune->Tbl.replacelog.meme.keye.value;append_key_valuelog.ioe.keye.value)io;IO.synclog.io;IO.cleario)log;IO.closeio);letgeneration=matchlogwithNone->0L|Somelog->IO.get_generationlog.ioinletindex=letindex_path=index_pathrootinifSys.file_existsindex_paththen(letio=IO.v~fresh~readonly~generation~fan_size:0Lindex_pathinletentries=Int64.div(IO.offsetio)entry_sizeLinLog.debug(funl->l"[%s] index file detected. Loading %Ld entries"(Filename.basenameroot)entries);letfan_out=Fan.import~hash_size:K.hash_size(IO.get_fanoutio)inSome{fan_out;io})else(Log.debug(funl->l"[%s] no index file detected."(Filename.basenameroot));None)in{config;generation;log;log_async=None;root;index;open_instances=1;merge_lock=Mutex.create();rename_lock=Mutex.create();writer_lock;}let(`Stagedv)=with_cache~v:v_no_cache~clearletinterpolation_searchindexkey=lethashed_key=K.hashkeyinletlow_bytes,high_bytes=Fan.searchindex.fan_outhashed_keyinletlow,high=Int64.(divlow_bytesentry_sizeL,divhigh_bytesentry_sizeL)inSearch.interpolation_search(IOArray.vindex.io)key~low~highlettry_load_logtpath=Log.debug(funl->l"[%s] checking on-disk %s file"(Filename.basenamet.root)(Filename.basenamepath));ifSys.file_existspaththen(letio=IO.v~fresh:false~readonly:true~generation:0L~fan_size:0Lpathinletmem=Tbl.create0initer_io(fune->Tbl.replacememe.keye.value)io;Some{io;mem})elseNoneletsync_logt=Log.debug(funl->l"[%s] checking for changes on disk"(Filename.basenamet.root));letno_changes()=Log.debug(funl->l"[%s] no changes detected"(Filename.basenamet.root))inletadd_log_entryloge=Tbl.replacelog.meme.keye.valueinletsync_log_async?(generation_change=false)()=matcht.log_asyncwith|None->t.log_async<-try_load_logt(log_async_patht.root)|Somelog->letoffset=IO.offsetlog.ioinletnew_offset=IO.force_offsetlog.ioinifgeneration_change||offset<>new_offsetthen(Tbl.clearlog.mem;iter_io(add_log_entrylog)log.io)else()in(matcht.logwith|None->t.log<-try_load_logt(log_patht.root)|Some_->());matcht.logwith|None->sync_log_async()|Somelog->letgeneration=IO.get_generationlog.ioinletlog_offset=IO.offsetlog.ioinletnew_log_offset=IO.force_offsetlog.ioinletadd_log_entrye=add_log_entrylogeinsync_log_async~generation_change:(t.generation<>generation)();ift.generation<>generationthen(Log.debug(funl->l"[%s] generation has changed, reading log and index from disk"(Filename.basenamet.root));Tbl.clearlog.mem;iter_ioadd_log_entrylog.io;may(fun(i:index)->IO.closei.io)t.index;ifInt64.equalgeneration0Lthent.index<-Noneelseletindex_path=index_patht.rootinletio=IO.v~fresh:false~readonly:true~generation~fan_size:0Lindex_pathinletfan_out=Fan.import~hash_size:K.hash_size(IO.get_fanoutio)int.index<-Some{fan_out;io};t.generation<-generation)elseiflog_offset<new_log_offsetthen(Log.debug(funl->l"[%s] new entries detected, reading log from disk"(Filename.basenamet.root));iter_ioadd_log_entrylog.io~min:log_offset)elseiflog_offset>new_log_offsetthen(* In that case the log has probably been emptied and is being
refilled with async_log contents. *)no_changes()elseno_changes()letfind_instancetkey=letfind_if_exists~name~finddb()=matchdbwith|None->Log.debug(funl->l"[%s] %s is not present"(Filename.basenamet.root)name);raiseNot_found|Somee->letans=findekeyinLog.debug(funl->l"[%s] found in %s"(Filename.basenamet.root)name);ansinlet(@~)ab=trya()withNot_found->b()inletfind_log_index()=find_if_exists~name:"log"~find:(funlog->Tbl.findlog.mem)t.log@~find_if_exists~name:"index"~find:interpolation_searcht.indexinMutex.with_lockt.rename_lock(fun()->ift.config.readonlythensync_logt;find_if_exists~name:"log_async"~find:(funlog->Tbl.findlog.mem)t.log_async@~fun()->find_log_index@~fun()->ift.config.readonlythen(sync_logt;find_log_index())elseraiseNot_found)letfindtkey=lett=check_opentinLog.info(funl->l"[%s] find %a"(Filename.basenamet.root)K.ppkey);find_instancetkeyletmemtkey=lett=check_opentinLog.info(funl->l"[%s] mem %a"(Filename.basenamet.root)K.ppkey);matchfind_instancetkeywith_->true|exceptionNot_found->falseletappend_buf_fanoutfan_outhashbuf_strdst_io=Fan.updatefan_outhash(IO.offsetdst_io);IO.appenddst_iobuf_strletappend_entry_fanoutfan_outentrydst_io=Fan.updatefan_outentry.key_hash(IO.offsetdst_io);append_key_valuedst_ioentry.keyentry.valueletrecmerge_from_logfan_outloglog_ihash_edst_io=iflog_i>=Array.lengthlogthenlog_ielseletv=log.(log_i)inifv.key_hash>=hash_ethenlog_ielse(append_entry_fanoutfan_outvdst_io;(merge_from_log[@tailcall])fan_outlog(log_i+1)hash_edst_io)letappend_remaining_logfan_outloglog_idst_io=forlog_i=log_itoArray.lengthlog-1doappend_entry_fanoutfan_outlog.(log_i)dst_iodone(* Merge [log] with [index] into [dst_io], ignoring bindings that do not
satisfy [filter (k, v)]. [log] must be sorted by key hashes. *)letmerge_with~filterlog(index:index)dst_io=letentries=10_000inletlen=entries*entry_sizeinletbuf=Bytes.createleninletrefilloff=ignore(IO.readindex.io~off~lenbuf)inletindex_end=IO.offsetindex.ioinletfan_out=index.fan_outinrefill0L;letrecgoindex_offsetbuf_offsetlog_i=ifindex_offset>=index_endthenappend_remaining_logfan_outloglog_idst_ioelseletbuf_str=Bytes.subbufbuf_offsetentry_sizeinletindex_offset=Int64.addindex_offsetentry_sizeLinlete=Entry.decodebuf_str0inletlog_i=merge_from_logfan_outloglog_ie.key_hashdst_ioinThread.yield();if(log_i>=Array.lengthlog||letkey=log.(log_i).keyinnot(K.equalkeye.key))&&filter(e.key,e.value)thenappend_buf_fanoutfan_oute.key_hash(Bytes.to_stringbuf_str)dst_io;letbuf_offset=letn=buf_offset+entry_sizeinifn>=Bytes.lengthbufthen(refillindex_offset;0)elsenin(go[@tailcall])index_offsetbuf_offsetlog_iin(go[@tailcall])0L00letmerge?(blocking=false)?(filter=fun_->true)?hook~witnesst=Mutex.lockt.merge_lock;Log.info(funl->l"[%s] merge"(Filename.basenamet.root));Stats.incr_nb_merge();flush_instance~with_fsync:truet;letlog_async=letio=letlog_async_path=log_async_patht.rootinIO.v~fresh:true~readonly:false~generation:(Int64.succt.generation)~fan_size:0Llog_async_pathinletmem=Tbl.create0in{io;mem}int.log_async<-Somelog_async;letgo()=may(funf->f`Before)hook;letlog=assert_and_gett.loginletgeneration=Int64.succt.generationinletlog_array=letcompare_entryee'=comparee.key_hashe'.key_hashinTbl.filter_map_inplace(funkeyvalue->iffilter(key,value)thenSomevalueelseNone)log.mem;letb=Array.make(Tbl.lengthlog.mem)witnessinTbl.fold(funkeyvaluei->b.(i)<-{key;key_hash=K.hashkey;value};i+1)log.mem0|>ignore;Array.fast_sortcompare_entryb;binletfan_size=matcht.indexwith|None->Tbl.lengthlog.mem|Someindex->(Int64.to_int(IO.offsetindex.io)/entry_size)+Array.lengthlog_arrayinletfan_out=Fan.v~hash_size:K.hash_size~entry_sizefan_sizeinletmerge=letmerge_path=merge_patht.rootinIO.v~fresh:true~readonly:false~generation~fan_size:(Int64.of_int(Fan.exported_sizefan_out))merge_pathinletindex=matcht.indexwith|None->letio=IO.v~fresh:true~readonly:false~generation~fan_size:0L(index_patht.root)inappend_remaining_logfan_outlog_array0merge;{io;fan_out}|Someindex->letindex={indexwithfan_out}inmerge_with~filterlog_arrayindexmerge;indexinFan.finalizeindex.fan_out;IO.set_fanoutmerge(Fan.exportindex.fan_out);Mutex.with_lockt.rename_lock(fun()->IO.rename~src:merge~dst:index.io;t.index<-Someindex;IO.clear~keep_generation:truelog.io;Tbl.clearlog.mem;IO.set_generationlog.iogeneration;t.generation<-generation;letlog_async=assert_and_gett.log_asyncinTbl.iter(funkeyvalue->Tbl.replacelog.memkeyvalue;append_key_valuelog.iokeyvalue)log_async.mem;IO.synclog.io;t.log_async<-None);may(funf->f`After)hook;IO.clearlog_async.io;IO.closelog_async.io;Mutex.unlockt.merge_lockinifblockingthen(go();Thread.return())elseThread.asyncgoletget_witnesst=matcht.logwith|None->None|Somelog->(letexceptionFoundofentryinmatchTbl.iter(funkeyvalue->raise(Found{key;value;key_hash=K.hashkey}))log.memwith|exceptionFounde->Somee|()->(matcht.indexwith|None->None|Someindex->letbuf=Bytes.createentry_sizeinletn=IO.readindex.io~off:0L~len:entry_sizebufinassert(n=entry_size);Some(decode_entrybuf0)))letforce_merge?hookt=lett=check_opentinLog.info(funl->l"[%s] forced merge"(Filename.basenamet.root));letwitness=Mutex.with_lockt.rename_lock(fun()->get_witnesst)inmatchwitnesswith|None->Log.debug(funl->l"[%s] index is empty"(Filename.basenamet.root));Thread.return()|Somewitness->merge?hook~witnesstletreplacetkeyvalue=lett=check_opentinLog.info(funl->l"[%s] replace %a %a"(Filename.basenamet.root)K.ppkeyV.ppvalue);ift.config.readonlythenraiseRO_not_allowed;letdo_merge=Mutex.with_lockt.rename_lock(fun()->letlog=matcht.log_asyncwith|Someasync_log->async_log|None->assert_and_gett.loginappend_key_valuelog.iokeyvalue;Tbl.replacelog.memkeyvalue;Int64.compare(IO.offsetlog.io)(Int64.of_intt.config.log_size)>0)inifdo_mergethenignore(merge~witness:{key;key_hash=K.hashkey;value}t:async)letfiltertf=lett=check_opentinLog.info(funl->l"[%s] filter"(Filename.basenamet.root));ift.config.readonlythenraiseRO_not_allowed;letwitness=Mutex.with_lockt.rename_lock(fun()->get_witnesst)inmatchwitnesswith|None->Log.debug(funl->l"[%s] index is empty"(Filename.basenamet.root))|Somewitness->Thread.await(merge~blocking:true~filter:f~witnesst)letiterft=lett=check_opentinLog.info(funl->l"[%s] iter"(Filename.basenamet.root));ift.config.readonlythensync_logt;matcht.logwith|None->()|Somelog->Tbl.iterflog.mem;may(fun(i:index)->iter_io(fune->fe.keye.value)i.io)t.index;Mutex.with_lockt.rename_lock(fun()->(matcht.log_asyncwith|None->()|Somelog->Tbl.iterflog.mem);may(fun(i:index)->iter_io(fune->fe.keye.value)i.io)t.index)letcloseit=match!itwith|None->Log.info(funl->l"close: instance already closed")|Somet->Log.info(funl->l"[%s] close"(Filename.basenamet.root));Mutex.with_lockt.merge_lock(fun()->it:=None;t.open_instances<-t.open_instances-1;ift.open_instances=0then(Log.debug(funl->l"[%s] last open instance: closing the file descriptor"(Filename.basenamet.root));ifnott.config.readonlythenflush_instance~with_fsync:truet;may(funl->Tbl.clearl.mem;IO.closel.io)t.log;may(fun(i:index)->IO.closei.io)t.index;may(funlock->IO.unlocklock)t.writer_lock))endmoduleMake=Make_privatemodulePrivate=structmoduleFan=FanmoduleIo_array=Io_arraymoduleSearch=SearchmoduleHook=structtype'at='a->unitletvf=fendmoduletypeS=sigincludeStypeasyncvalforce_merge:?hook:[`After|`Before]Hook.t->t->asyncvalawait:async->unitendmoduleMake=Make_privateend