1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114(* The MIT License
Copyright (c) 2019 Craig Ferguson <craig@tarides.com>
Thomas Gazagnaire <thomas@tarides.com>
Ioana Cristescu <ioana@tarides.com>
Clément Pascutto <clement@tarides.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software. *)includeIndex_intfopen!ImportmoduleKey=structmoduletypeS=KeymoduleString_fixed=Data.String_fixedendmoduleValue=structmoduletypeS=ValuemoduleString_fixed=Data.String_fixedendexceptionRO_not_allowed(** Raised whenever a read-only instance performs a write action. *)exceptionRW_not_allowed(** Raised whenever a read-write instance performs a reserved read-only action. *)exceptionClosed(** Raised whenever a closed instance is used. *)moduleMake_private(K:Key)(V:Value)(Platform:Platform.S)(Cache:Cache.S)=structopenPlatformtype'aasync='aThread.tletawait=Thread.awaittypekey=K.ttypevalue=V.tletpp_key=Repr.ppK.tletpp_value=Repr.ppV.tmoduleLog_file=Log_file.Make(IO)(K)(V)moduleLru:sigtypetvalcreate:int->tvaladd:t->key->value->unitvalfind:t->key->valueoptionvalclear:t->unitend=structmoduleLru=Lru.M.Make(K)(structtypet=V.tletweight_=1end)typet=Lru.trefletcreaten=ref(Lru.createn)letfindtk=letresult=Lru.findk!tinlet()=matchresultwith|None->Stats.incr_nb_lru_misses()|Some_->Stats.incr_nb_lru_hits()inresult(* NOTE: the provided [add] implementation never discards elements from the
LRU, and the user must manually discard any excess elements using [trim].
For safety, we shadow [add] with a re-implementation that always trims
after adding. *)letaddtkv=lett=!tinLru.addkvt;Lru.trimtletcleart=t:=Lru.create(Lru.capacity!t)endmoduleEntry=structincludeLog_file.EntrymoduleKey=KmoduleValue=Vletto_key{key;_}=keyletto_value{value;_}=valueendmoduleStats=structincludeStats.Make(Clock)includeStatsendmoduleIO=structincludeIo.Extend(IO)letiter?min?maxf=letpage_size=Int63.(mulEntry.encoded_sizeL(of_int1_000))initer~page_size?min?max(fun~off~buf~buf_off->letentry=Entry.decodebufbuf_offinfoffentry;Entry.encoded_size)endtypethrottle=[`Overcommit_memory|`Block_writes]typeconfig={log_size:int;(** The log maximal size before triggering a [merge]. *)lru_size:int;readonly:bool;fresh:bool;(** Whether the index was created from scratch, erasing existing data,
if any. *)throttle:throttle;(** The throttle strategy used when merges stack. *)flush_callback:unit->unit;(** A callback called whenever the index is flushed. Useful to sync
index's flushes with the flushes of larger systems. *)}typeindex={io:IO.t;(** The disk file handler. *)fan_out:[`Read]Fan.t;(** The fan-out, used to map keys to small intervals in the file, in
constant time. This is an in-memory object, also encoded in the
header of [io]. *)}typeinstance={config:config;root:string;(** The root location of the index *)mutablegeneration:int63;(** The generation is a counter of rewriting operations (e.g. [clear]
and [merge]). It is used to sync RO instances. *)mutableindex:indexoption;(** The main index file contains old sorted bindings. It is [None] when
no [merge] occurred yet. On RO instances, this is also [None] when
the file is empty, e.g. after a [clear]. *)mutablelog:Log_file.toption;(** The log file contains the most recent bindings. It can be [None]
when a read-only instance is created before its RW counterpart; in
that case the [log] creation is pending. *)mutablelog_async:Log_file.toption;(** The log_async file contains bindings added concurrently to a [merge]
operation. It is only present when a merge is ongoing. *)mutableopen_instances:int;(** The number of open instances that are shared through the [Cache.t]. *)mutablelru:Lru.t;writer_lock:IO.Lock.toption;(** A lock that prevents multiple RW instances to be open at the same
time. *)sync_lock:Semaphore.t;(** A lock that prevents multiple [sync] to happen at the same time. *)merge_lock:Semaphore.t;(** A lock that prevents multiple [merges] to happen at the same time. *)rename_lock:Semaphore.t;(** A lock used to protect a critical bit when finalizing a [merge]
operation. All operations should be guarded by this lock. *)mutablepending_cancel:bool;(** A signal for the merging thread to terminate prematurely. *)}includePrivate_typesletcheck_pending_cancelinstance=matchinstance.pending_cancelwithtrue->`Abort|false->`Continue(* [t] is an [option ref] to handle [close] operations. A closed index is None. *)typet=instanceoptionrefletcheck_opent=match!twithSomeinstance->instance|None->raiseClosed(** {1 Clear} *)letclear'~hookt=lett=check_opentinLog.debug(funl->l"clear %S"t.root);ift.config.readonlythenraiseRO_not_allowed;t.pending_cancel<-true;hook`Abort_signalled;Semaphore.with_acquire"clear"t.merge_lock(fun()->t.pending_cancel<-false;t.generation<-Int63.succt.generation;letlog=Option.gett.loginlethook()=hook`IO_clearint.lru<-Lru.createt.config.lru_size;Log_file.clear~generation:t.generation~reopen:true~hooklog;Option.iter(funl->Log_file.clear~generation:t.generation~reopen:falsel)t.log_async;Option.iter(fun(i:index)->IO.clear~generation:t.generation~reopen:falsei.io)t.index;t.index<-None;t.log_async<-None)letclear=clear'~hook:(fun_->())(** {1 Flush} *)letflush_instance?no_async?no_callback?(with_fsync=false)instance=Log.debug(funl->l"[%s] flushing instance"(Filename.basenameinstance.root));ifinstance.config.readonlythenraiseRO_not_allowed;instance.log|>Option.iter(funlog->Log.debug(funl->l"[%s] flushing log"(Filename.basenameinstance.root));Log_file.flush?no_callback~with_fsynclog);match(no_async,instance.log_async)with|Some(),_|None,None->()|None,Somelog->Log.debug(funl->l"[%s] flushing log_async"(Filename.basenameinstance.root));Log_file.flush?no_callback~with_fsynclogletflush?no_callback?(with_fsync=false)t=lett=check_opentinLog.debug(funl->l"[%s] flush"(Filename.basenamet.root));Semaphore.with_acquire"flush"t.rename_lock(fun()->flush_instance?no_callback~with_fsynct)(** Extract [log] and [log_async] for (private) tests. *)letlog_file_to_listmsglog=letx=Log_file.to_sorted_seqlog|>Seq.map(fun(e:Entry.t)->(e.key,e.value))|>List.of_seqinlety=ref[]inIO.iter(fun_(e:Entry.t)->y:=(e.key,e.value)::!y)(Log_file.iolog);ifList.lengthx<>List.length!ythen(letpp_entryppf(k,_)=pp_keyppfkinletpp=Fmt.Dump.listpp_entryinFmt.epr"consistency error in %s:\nmem : %a\ndisk: %a\n%!"msgppxpp!y;assertfalse);xletlogt=flusht;lett=check_opentinOption.map(log_file_to_list"log")t.logletlog_asynct=flusht;lett=check_opentinOption.map(log_file_to_list"log_async")t.log_async(** {1 RO instances syncing} *)(** Loads the log file at [path], if it exists. Used by RO instances to load
the temporary [log_async], or to fill the [log] field when they have been
created before their RW counterpart. *)lettry_load_logtpath=matchIO.v_readonlypathwith|Error`No_file_on_disk->None|Okio->letlog=Log_file.createioinLog.debug(funl->l"[%s] loaded %d entries from %s"(Filename.basenamet.root)(Log_file.cardinallog)(Filename.basenamepath));Somelog(** Syncs the [log_async] of the instance by checking on-disk changes. *)letsync_log_async~hookt=matcht.log_asyncwith|None->hook`Reload_log_async;t.log_async<-try_load_logt(Layout.log_async~root:t.root)|Somelog->letold_generation=t.generationinletold_offset=IO.offset(Log_file.iolog)inleth=IO.Header.get(Log_file.iolog)inif(* the generation has changed *)h.generation>Int63.succold_generation||(* the last sync was done between clear(log) and clear(log_async) *)(h.generation=Int63.succold_generation&&h.offset=Int63.zero)then((* close the file .*)Log_file.closelog;(* check that file is on disk, reopen and reload everything. *)hook`Reload_log_async;t.log_async<-try_load_logt(Layout.log_async~root:t.root)(* else if the disk offset is greater, reload the newest data. *))elseifold_offset<h.offsetthenLog_file.sync_entries~min:old_offsetlog(* else if the offset is lesser, that means the [log_async] was
cleared, and the generation should have changed. *)elseifold_offset>h.offsetthen((* Should never occur, but we can recover by reloading the log from
scratch rather than just hard failing. *)Log.err(funl->l"[%s] log_async IO header monotonicity violated during sync:@,\
\ offset: %a -> %a@,\
\ generation: %a -> %a@,\
Reloading the log to compensate."(Filename.basenamet.root)Int63.ppold_offsetInt63.pph.offsetInt63.ppold_generationInt63.pph.generation);Log_file.reloadlog)(** Syncs the [index] of the instance by checking on-disk changes. *)letsync_indext=(* Close the file handler to be able to reload it, as the file may have
changed after a merge. *)Option.iter(fun(i:index)->IO.closei.io)t.index;letindex_path=Layout.data~root:t.rootinmatchIO.v_readonlyindex_pathwith|Error`No_file_on_disk->t.index<-None|Okio->letfan_out=Fan.import~hash_size:K.hash_size(IO.get_fanoutio)in(* We maintain that [index] is [None] if the file is empty. *)ifIO.offsetio=Int63.zerothent.index<-Noneelset.index<-Some{fan_out;io}(** Syncs an instance entirely, by checking on-disk changes for [log], [sync],
and [log_async]. *)letsync_instance?(hook=fun_->())t=Semaphore.with_acquire"sync"t.sync_lock@@fun()->Log.debug(funl->l"[%s] checking for changes on disk (generation=%a)"(Filename.basenamet.root)Int63.ppt.generation);(* the first sync needs to load the log file. *)let()=matcht.logwith|None->t.log<-try_load_logt(Layout.log~root:t.root)|Some_->()in(* There is a race between sync and merge:
- At the end of the merge, the entries in log_async are copied
into log. [merge] starts by calling IO.Header.set(log) with a
new generation number, copies all the entries and then clear
log_async.
- so here we need to make sure we do the same thing in reverse:
start by syncing [log_async], then read [log]'s headers. At
worse, [log_async] and [log] might contain duplicated entries,
but we won't miss any. These entries will be added to [log.mem]
using Tbl.replace where they will be deduplicated. *)sync_log_async~hookt;matcht.logwith|None->()|Somelog->(* This one is the cached offset, from the previous sync. *)letlog_offset=IO.offset(Log_file.iolog)inhook`Before_offset_read;leth=IO.Header.get(Log_file.iolog)inhook`After_offset_read;ift.generation<>h.generationthen((* If the generation has changed, then we need to reload both the
[index] and the [log]. The new generation is the one on disk. *)Log.debug(funl->l"[%s] generation has changed: %a -> %a"(Filename.basenamet.root)Int63.ppt.generationInt63.pph.generation);hook`Reload_log;t.generation<-h.generation;Log_file.closelog;Lru.cleart.lru;t.log<-try_load_logt(Layout.log~root:t.root);(* The log file is never removed (even by clear). *)assert(t.log<>None);sync_indext)elseiflog_offset<h.offsetthen((* else if the disk offset is greater, we read the newest bindings. *)Log.debug(funl->l"[%s] new entries detected, reading log from disk"(Filename.basenamet.root));Log_file.sync_entries~min:log_offsetlog)else(* Here the disk offset should be equal to the known one. A smaller
offset should not be possible, because that would mean a [clear] or
[merge] occurred, which should have changed the generation. *)(* TODO: Handle the "impossible" case differently? *)Log.debug(funl->l"[%s] no changes detected"(Filename.basenamet.root))(** {1 Find and Mem}*)moduleIOArray=Io_array.Make(IO)(Entry)moduleSearch=Search.Make(Entry)(IOArray)(structtypet=intmoduleEntry=Entryletcompare:int->int->int=compareletof_entrye=e.Entry.key_hashletof_key=K.hashletlinear_interpolate~low:(low_index,low_metric)~high:(high_index,high_metric)key_metric=letlow_in=float_of_intlow_metricinlethigh_in=float_of_inthigh_metricinlettarget_in=float_of_intkey_metricinletlow_out=Int63.to_floatlow_indexinlethigh_out=Int63.to_floathigh_indexin(* Fractional position of [target_in] along the line from [low_in] to [high_in] *)letproportion=(target_in-.low_in)/.(high_in-.low_in)in(* Convert fractional position to position in output space *)letposition=low_out+.(proportion*.(high_out-.low_out))inletrounded=ceil(position-.0.5)+.0.5inInt63.of_floatroundedend)letinterpolation_searchindexkey=lethashed_key=K.hashkeyinletlow_bytes,high_bytes=Fan.searchindex.fan_outhashed_keyinletlow,high=Int63.(divlow_bytesEntry.encoded_sizeL,divhigh_bytesEntry.encoded_sizeL)inSearch.interpolation_search(IOArray.vindex.io)key~low~high(** Finds the value associated to [key] in [t]. In order, checks in
[log_async] (in memory), then [log] (in memory), then [index] (on disk). *)letfind_instancetkey=letfind_if_exists~name~finddb=matchdbwith|None->raiseNot_found|Somee->letans=findekeyinLog.debug(funl->l"[%s] found in %s"(Filename.basenamet.root)name);ansinletfind_log_async()=find_if_exists~name:"log_async"~find:Log_file.findt.log_asyncinletfind_log()=find_if_exists~name:"log"~find:Log_file.findt.loginletfind_index()=find_if_exists~name:"index"~find:interpolation_searcht.indexinSemaphore.with_acquire"find_instance"t.rename_lock@@fun()->matchLru.findt.lrukeywith|Somee->e|None->lete=matchfind_log_async()with|e->e|exceptionNot_found->(matchfind_log()with|e->e|exceptionNot_found->find_index())inLru.addt.lrukeye;eletfindtkey=lett=check_opentinLog.debug(funl->l"[%s] find %a"(Filename.basenamet.root)pp_keykey);find_instancetkeyletmemtkey=lett=check_opentinLog.debug(funl->l"[%s] mem %a"(Filename.basenamet.root)pp_keykey);matchfind_instancetkeywith_->true|exceptionNot_found->falseletsync'?hookt=letft=Stats.incr_nb_sync();lett=check_opentinLog.info(funl->l"[%s] sync"(Filename.basenamet.root));ift.config.readonlythensync_instance?hooktelseraiseRW_not_allowedinStats.sync_with_timer(fun()->ft)letsync=sync'?hook:None(** {1 Index creation} *)lettransfer_log_async_to_log~root~generation~log~log_async=letentries=Int63.div(IO.offsetlog_async)Entry.encoded_sizeLinLog.debug(funl->l"[%s] log_async file detected. Loading %a entries"(Filename.basenameroot)Int63.ppentries);(* Can only happen in RW mode where t.log is always [Some _] *)matchlogwith|None->assertfalse|Somelog->IO.iter(fun_e->Log_file.replaceloge.keye.value)log_async;(* Force fsync here so that persisted entries in log_async
continue to persist in log. *)Log_file.flush~with_fsync:truelog;IO.clear~generation~reopen:falselog_asyncletv_no_cache?(flush_callback=fun()->())~throttle~fresh~readonly~lru_size~log_sizeroot=Log.debug(funl->l"[%s] not found in cache, creating a new instance"(Filename.basenameroot));letwriter_lock=ifnotreadonlythenSome(IO.Lock.lock(Layout.lock~root))elseNoneinletconfig={log_size=log_size*Entry.encoded_size;lru_size;readonly;fresh;throttle;flush_callback;}in(* load the [log] file *)letlog=letlog_path=Layout.log~rootinifreadonlytheniffreshthenraiseRO_not_allowedelseNoneelseletio=IO.v~flush_callback~fresh~generation:Int63.zero~fan_size:Int63.zerolog_pathinletentries=Int63.(to_int_exn(IO.offsetio/Entry.encoded_sizeL))inLog.debug(funl->l"[%s] log file detected. Loading %d entries"(Filename.basenameroot)entries);Some(Log_file.createio)inletgeneration=matchlogwith|None->Int63.zero|Somelog->IO.get_generation(Log_file.iolog)in(* load the [log_async] file *)let()=letlog_async_path=Layout.log_async~rootin(* - If we are in readonly mode, the log_async will be read
during sync_log so there is no need to do it here. *)if(notreadonly)&&IO.existslog_async_paththenletio=IO.v~flush_callback~fresh~generation~fan_size:Int63.zerolog_async_pathin(* in fresh mode, we need to wipe the existing [log_async] file. *)iffreshthenIO.clear~generation~reopen:falseioelse(* If we are not in fresh mode, we move the contents
of log_async to log. *)transfer_log_async_to_log~root~generation~log~log_async:ioin(* load the [data] file *)letindex=ifreadonlythenNoneelseletindex_path=Layout.data~rootinifIO.existsindex_paththenletio=(* NOTE: No [flush_callback] on the Index IO as we maintain the
invariant that any bindings it contains were previously persisted
in either [log] or [log_async]. *)IO.v?flush_callback:None~fresh~generation~fan_size:Int63.zeroindex_pathinletentries=Int63.div(IO.offsetio)Entry.encoded_sizeLinifentries=Int63.zerothenNoneelse(Log.debug(funl->l"[%s] index file detected. Loading %a entries"(Filename.basenameroot)Int63.ppentries);letfan_out=Fan.import~hash_size:K.hash_size(IO.get_fanoutio)inSome{fan_out;io})else(Log.debug(funl->l"[%s] no index file detected."(Filename.basenameroot));None)in{config;generation;log;log_async=None;root;index;lru=Lru.createlru_size;open_instances=1;merge_lock=Semaphore.maketrue;rename_lock=Semaphore.maketrue;sync_lock=Semaphore.maketrue;writer_lock;pending_cancel=false;}typecache=(string*bool,instance)Cache.tletempty_cache=Cache.createletv?(flush_callback=fun()->())?(cache=empty_cache())?(fresh=false)?(readonly=false)?(throttle=`Block_writes)?(lru_size=30_000)~log_sizeroot=letnew_instance()=letinstance=v_no_cache~flush_callback~fresh~readonly~log_size~lru_size~throttlerootinifreadonlythensync_instanceinstance;Cache.addcache(root,readonly)instance;ref(Someinstance)inLog.info(funl->l"[%s] v fresh=%b readonly=%b log_size=%d"(Filename.basenameroot)freshreadonlylog_size);match(Cache.findcache(root,readonly),IO.exists(Layout.log~root))with|None,_->new_instance()|Some_,false->Log.debug(funl->l"[%s] does not exist anymore, cleaning up the fd cache"(Filename.basenameroot));Cache.removecache(root,true);Cache.removecache(root,false);new_instance()|Somet,true->(matcht.open_instanceswith|0->Cache.removecache(root,readonly);new_instance()|_->Log.debug(funl->l"[%s] found in cache"(Filename.basenameroot));t.open_instances<-t.open_instances+1;ifreadonlythensync_instancet;lett=ref(Somet)iniffreshthencleart;t)(** {1 Merges} *)(** Appends the entry encoded in [buf] into [dst_io] and registers it in
[fan_out] with hash [hash]. *)letappend_substring_fanoutfan_outhashdst_iobuf~off~len=Fan.updatefan_outhash(IO.offsetdst_io);IO.append_substringdst_iobuf~off~len(** Appends [entry] into [dst_io] and registers it in [fan_out]. *)letappend_entry_fanoutfan_outentrydst_io=Fan.updatefan_outentry.Entry.key_hash(IO.offsetdst_io);Entry.encodeentry(IO.appenddst_io)(** Appends the [log] values into [dst_io], from [log_i] to the first value
which hash is higher than or equal to [hash_e] (the current value in
[data]), excluded, and returns its index. Also registers the appended
values to [fan_out]. *)letrecmerge_from_logfan_outloghash_edst_io=matchlogwith|Seq.Nil->Seq.Nil|Seq.Cons(v,_)whenv.Entry.key_hash>=hash_e->log|Seq.Cons(v,log)->append_entry_fanoutfan_outvdst_io;(merge_from_log[@tailcall])fan_out(log())hash_edst_io(** Appends the [log] values into [dst_io], from [log_i] to the end, and
registers them in [fan_out]. *)letappend_remaining_logfan_outlogdst_io=Seq.iter(funentry->append_entry_fanoutfan_outentrydst_io)log(** Merges [log] with [index] into [dst_io], ignoring bindings that do not
satisfy [filter (k, v)]. [log] must be sorted by key hashes. *)letmerge_with~hook~yield~filterlogindex_iofan_outdst_io=(* We read the [index] by page; the [refill] function is in charge of
refilling the page buffer when empty. *)letlen=10_000*Entry.encoded_sizeinletbuf=Bytes.createleninletrefilloff=ignore(IO.readindex_io~off~lenbuf)inletindex_end=IO.offsetindex_ioinrefillInt63.zero;letfilter=Option.fold~none:(fun__->true)~some:(funfkeyentry_off->(* When the filter is not provided, we don't need to decode the value. *)letvalue=Entry.decode_value(Bytes.unsafe_to_stringbuf)entry_offinf(key,value))filterin(* This performs the merge. [index_offset] is the offset of the next entry
to process in [index], [buf_offset] is its counterpart in the page
buffer. [log_i] is the index of the next entry to process in [log]. *)letrecgofirst_entryindex_offsetbuf_offsetlog=(* If the index is fully read, we append the rest of the [log]. *)ifindex_offset>=index_endthen(append_remaining_logfan_out(fun()->log)dst_io;`Completed)elseletindex_offset=Int63.addindex_offsetEntry.encoded_sizeLinletindex_key,index_key_hash=Entry.decode_key(Bytes.unsafe_to_stringbuf)buf_offsetinletlog=merge_from_logfan_outlogindex_key_hashdst_ioinmatchyield()with|`Abort->`Aborted|`Continue->(* This yield is used to balance the resources between the merging
thread (here) and the main thread. *)Thread.yield();(* If the log entry has the same key as the index entry, we do not
add the index one, respecting the [replace] semantics. *)letlog_overwrites_index_entry=matchlogwith|Seq.Nil->false|Seq.Cons(entry,_)->K.(equalentry.keyindex_key)inif(notlog_overwrites_index_entry)&&filterindex_keybuf_offsetthenappend_substring_fanoutfan_outindex_key_hashdst_io(Bytes.unsafe_to_stringbuf)~off:buf_offset~len:Entry.encoded_size;iffirst_entrythenhook`After_first_entry;letbuf_offset=letn=buf_offset+Entry.encoded_sizein(* If the buffer is entirely consumed, refill it. *)ifn>=Bytes.lengthbufthen(refillindex_offset;0)elsenin(go[@tailcall])falseindex_offsetbuf_offsetlogin(go[@tailcall])trueInt63.zero0(log())(** Increases and returns the merge counter. *)letmerge_counter=letn=ref0infun()->incrn;!n(** Merges the entries in [t.log] into the data file, ensuring that concurrent
writes are not lost.
The caller must ensure the following:
- [t.log] has been loaded;
- [t.log_async] has been created;
- [t.merge_lock] is acquired before entry, and released immediately after
this function returns or raises an exception. *)letunsafe_perform_merge~filter~hookt=hook`Before;letlog=Option.gett.loginletgeneration=Int63.succt.generationinletsorted_log_bindings=Log_file.to_sorted_seqlog|>Option.foldfilter~none:Fun.id~some:(funf->Seq.filter(fun(e:Entry.t)->f(e.key,e.value)))inletfan_size,old_fan_nb=matcht.indexwith|None->(Log_file.cardinallog,None)|Someindex->(Int63.(to_int_exn(IO.offsetindex.io/Entry.encoded_sizeL))+Log_file.cardinallog,Some(Fan.nb_fansindex.fan_out))inletfan_out=Fan.v~hash_size:K.hash_size~entry_size:Entry.encoded_sizefan_sizeinlet()=matchold_fan_nbwith|Someon->letnew_fan_nb=Fan.nb_fansfan_outinifnew_fan_nb<>onthenLog.info(funm->m"the number of fan-out entries has changed: %d to %d"onnew_fan_nb)|_->()inletmerge=letmerge_path=Layout.merge~root:t.rootinIO.v~fresh:true~generation~fan_size:(Int63.of_int(Fan.exported_sizefan_out))merge_pathinletmerge_result:[`Index_ioofIO.t|`Aborted]=matcht.indexwith|None->(matchcheck_pending_canceltwith|`Abort->`Aborted|`Continue->letio=IO.v~fresh:true~generation~fan_size:Int63.zero(Layout.data~root:t.root)inappend_remaining_logfan_outsorted_log_bindingsmerge;`Index_ioio)|Someindex->(matchmerge_with~hook~yield:(fun()->check_pending_cancelt)~filtersorted_log_bindingsindex.iofan_outmergewith|`Completed->`Index_ioindex.io|`Aborted->`Aborted)inmatchmerge_resultwith|`Aborted->IO.clear~generation~reopen:falsemerge;(`Aborted,Mtime.Span.zero)|`Index_ioio->letfan_out=Fan.finalizefan_outinletindex={io;fan_out}inIO.set_fanoutmerge(Fan.exportindex.fan_out);letbefore_rename_lock=Clock.counter()inletrename_lock_duration=Semaphore.with_acquire"merge-rename"t.rename_lock(fun()->letrename_lock_duration=Clock.countbefore_rename_lockinIO.rename~src:merge~dst:index.io;t.index<-Someindex;t.generation<-generation;(* The filter may have removed some of the bindings that exist in
the LRU. We over-approximate by clearing the entire thing. *)ifOption.is_somefilterthenLru.cleart.lru;Log_file.clear~generation~reopen:truelog;hook`After_clear;letlog_async=Option.gett.log_asyncinLog_file.iterlog_async~f:(funentry->Log_file.replacelogentry.keyentry.value);(* NOTE: It {i may} not be necessary to trigger the
[flush_callback] here. If the instance has been recently
flushed (or [log_async] just reached the [auto_flush_limit]),
we're just moving already-persisted values around. However, we
trigger the callback anyway for simplicity. *)(* `fsync` is necessary, since bindings in `log_async` may have
been explicitely `fsync`ed during the merge, so we need to
maintain their durability. *)Log_file.flush~with_fsync:truelog;Log_file.clear~generation:(Int63.succgeneration)~reopen:falselog_async;(* log_async.mem does not need to be cleared as we are discarding
it. *)t.log_async<-None;rename_lock_duration)inhook`After;(`Completed,rename_lock_duration)letreset_log_asynct=letio=letlog_async_path=Layout.log_async~root:t.rootinIO.v~flush_callback:t.config.flush_callback~fresh:true~generation:(Int63.succt.generation)~fan_size:Int63.zerolog_async_pathint.log_async<-Some(Log_file.createio)letmerge'?(blocking=false)?filter?(hook=fun_->())?(force=false)t=letmerge_started=Clock.counter()inletmerge_id=merge_counter()inletmsg=Fmt.str"merge { id=%d }"merge_idinSemaphore.acquiremsgt.merge_lock;letmerge_lock_wait=Clock.countmerge_startedinLog.info(funl->letpp_forcedppf()=ifforcethenFmt.stringppf"; force=true"inl"[%s] merge started { id=%d%a }"(Filename.basenamet.root)merge_idpp_forced());Stats.incr_nb_merge();(* Cleanup previous crashes of the merge thread. *)Option.iter(funl->transfer_log_async_to_log~root:t.root~generation:t.generation~log:t.log~log_async:(Log_file.iol))t.log_async;reset_log_asynct;(* NOTE: We flush [log] {i after} enabling [log_async] to ensure that no
unflushed bindings make it into [log] before being merged into the index.
This satisfies the invariant that all bindings are {i first} persisted in
a log, so that the [index] IO doesn't need to trigger the
[flush_callback]. *)flush_instance~no_async:()~with_fsync:truet;letgo()=letmerge_result,rename_lock_wait=Fun.protect(fun()->unsafe_perform_merge~filter~hookt)~finally:(fun()->Semaphore.releaset.merge_lock)inlettotal_duration=Clock.countmerge_startedinletmerge_duration=Mtime.Span.abs_difftotal_durationmerge_lock_waitinStats.add_merge_durationmerge_duration;Log.info(funl->letaction=matchmerge_resultwith|`Aborted->"aborted"|`Completed->"completed"inl"[%s] merge %s { id=%d; total-duration=%a; merge-duration=%a; \
merge-lock=%a; rename-lock=%a }"(Filename.basenamet.root)actionmerge_idMtime.Span.pptotal_durationMtime.Span.ppmerge_durationMtime.Span.ppmerge_lock_waitMtime.Span.pprename_lock_wait);merge_resultinifblockingthengo()|>Thread.returnelseThread.asyncgoletis_emptyt=(* A read-only instance may have not yet loaded the [log], if no explicit
[sync] has yet occurred, leaving some ambiguity as to whether the index
is strictly "empty". For now, we only need this internal function for
read-write instances, so we dodge the question .*)assert(nott.config.readonly);matcht.logwith|None->true|Somelog->Option.is_nonet.index&&Log_file.cardinallog=0(** This triggers a merge if the [log] exceeds [log_size], or if the [log]
contains entries and [force] is true *)lettry_merge_aux?hook?(force=false)t=lett=check_opentinmatchis_emptytwith|true->Log.debug(funl->l"[%s] index is empty"(Filename.basenamet.root));Thread.return`Completed|false->(matcht.logwith|None->Log.debug(funl->l"[%s] log is empty"(Filename.basenamet.root));Thread.return`Completed|Somelog->ifforce||Int63.compare(IO.offset(Log_file.iolog))(Int63.of_intt.config.log_size)>0thenmerge'~force?hooktelseThread.return`Completed)letmerget=ignore(try_merge_aux?hook:None~force:truet:_async)lettry_merget=ignore(try_merge_aux?hook:None~force:falset:_async)letinstance_is_mergingt=(* [merge_lock] is used to detect an ongoing merge. Other operations can
take this lock, but as they are not async, we consider this to be a good
enough approximation. *)Semaphore.is_heldt.merge_lockletis_mergingt=lett=check_opentinift.config.readonlythenraiseRO_not_allowed;instance_is_mergingt(** {1 Replace} *)letreplace'?hook?(overcommit=false)tkeyvalue=lett=check_opentinStats.incr_nb_replace();Log.debug(funl->l"[%s] replace %a %a"(Filename.basenamet.root)pp_keykeypp_valuevalue);ift.config.readonlythenraiseRO_not_allowed;letlog_limit_reached=Semaphore.with_acquire"replace"t.rename_lock(fun()->letlog=matcht.log_asyncwithSomelog->log|None->Option.gett.loginLog_file.replacelogkeyvalue;Lru.addt.lrukeyvalue;letoffset=IO.offset(Log_file.iolog)inInt63.compareoffset(Int63.of_intt.config.log_size)>0)iniflog_limit_reached&¬overcommitthenletis_merging=instance_is_mergingtinmatch(t.config.throttle,is_merging)with|`Overcommit_memory,true->(* Do not start a merge, overcommit the memory instead. *)None|`Overcommit_memory,false|`Block_writes,_->(* Start a merge, blocking if one is already running. *)lethook=hook|>Option.map(funfstage->f(`Mergestage))inSome(merge'?hookt)elseNoneletreplace?overcommittkeyvalue=ignore(replace'?hook:None?overcommittkeyvalue:_asyncoption)letreplace_with_timer?sampling_intervaltkeyvalue=matchsampling_intervalwith|None->replacetkeyvalue|Somesampling_interval->Stats.start_replace();replacetkeyvalue;Stats.end_replace~sampling_interval(** {1 Filter} *)(** [filter] is implemented with a [merge], during which bindings that do not
satisfy the predicate are not merged. *)letfiltertf=lett=check_opentinLog.debug(funl->l"[%s] filter"(Filename.basenamet.root));ift.config.readonlythenraiseRO_not_allowed;matchis_emptytwith|true->Log.debug(funl->l"[%s] index is empty"(Filename.basenamet.root))|false->(matchThread.await(merge'~blocking:true~filter:ft)with|Ok(`Aborted|`Completed)->()|Error(`Async_exnexn)->Fmt.failwith"filter: asynchronous exception during merge (%s)"(Printexc.to_stringexn))(** {1 Iter} *)letiterft=lett=check_opentinLog.debug(funl->l"[%s] iter"(Filename.basenamet.root));matcht.logwith|None->()|Somelog->Log_file.iterlog~f:(funentry->fentry.keyentry.value);Option.iter(fun(i:index)->IO.iter(fun_e->fe.keye.value)i.io)t.index;Semaphore.with_acquire"iter"t.rename_lock(fun()->matcht.log_asyncwith|None->()|Somelog->Log_file.iterlog~f:(funentry->fentry.keyentry.value))(** {1 Close} *)letclose'~hook?immediatelyit=letabort_merge=Option.is_someimmediatelyinmatch!itwith|None->Log.debug(funl->l"close: instance already closed")|Somet->Log.debug(funl->l"[%s] close"(Filename.basenamet.root));ifabort_mergethen(t.pending_cancel<-true;hook`Abort_signalled);Semaphore.with_acquire"close"t.merge_lock(fun()->(* The instance is set to [None] prior to closing the resources to
ensure other clients see it as atomic. *)it:=None;t.open_instances<-t.open_instances-1;(* Resources are closed only if this is the last open instance. *)ift.open_instances=0then(Log.debug(funl->l"[%s] last open instance: closing the file descriptor"(Filename.basenamet.root));ifnott.config.readonlythenflush_instance~with_fsync:truet;Option.iterLog_file.closet.log;Option.iter(fun(i:index)->IO.closei.io)t.index;Option.iter(funlock->IO.Lock.unlocklock)t.writer_lock))letclose=close'~hook:(fun_->())moduleChecks=Checks.Make(K)(V)(Platform)endmoduleCache=CachemoduleChecks=ChecksmoduleMake=Make_privatemodulePlatform=PlatformmoduleStats=StatsmodulePrivate=structmoduleFan=FanmoduleIo=IomoduleIo_array=Io_arraymoduleSearch=SearchmoduleData=DatamoduleLayout=LayoutmoduleLogs=LogmoduleHook=structtype'at='a->unitletvf=fendmoduletypeS=Privatewithtype'ahook:='aHook.tmoduleMake=Make_privateend