123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392(* The MIT License
Copyright (c) 2019 Craig Ferguson <craig@tarides.com>
Thomas Gazagnaire <thomas@tarides.com>
Ioana Cristescu <ioana@tarides.com>
Clément Pascutto <clement@tarides.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software. *)letsrc=Logs.Src.create"index_unix"~doc:"Index_unix"moduleLog=(valLogs.src_logsrc:Logs.LOG)exceptionRO_not_allowedletcurrent_version="00000001"moduleStats=Index.StatsmoduleIO:Index.IO=structlet(++)=Int64.addlet(--)=Int64.subexternalset_64:Bytes.t->int->int64->unit="%caml_string_set64u"externalget_64:string->int->int64="%caml_string_get64"externalswap64:int64->int64="%bswap_int64"letencode_int64i=letset_uint64soffv=ifnotSys.big_endianthenset_64soff(swap64v)elseset_64soffvinletb=Bytes.create8inset_uint64b0i;Bytes.unsafe_to_stringbletdecode_int64buf=letget_uint64soff=ifnotSys.big_endianthenswap64(get_64soff)elseget_64soffinget_uint64buf0moduleRaw=structtypet={fd:Unix.file_descr;mutablecursor:int64}letvfd={fd;cursor=0L}externalpread:Unix.file_descr->int64->bytes->int->int->int="caml_pread"externalpwrite:Unix.file_descr->int64->bytes->int->int->int="caml_pwrite"letreally_writefdoffbuf=letrecauxfd_offbuf_offlen=letw=pwritefdfd_offbufbuf_offleninifw=0||w=lenthen()else(aux[@tailcall])(fd_off++Int64.of_intw)(buf_off+w)(len-w)in(aux[@tailcall])off0(Bytes.lengthbuf)letreally_readfdofflenbuf=letrecauxfd_offbuf_offlen=letr=preadfdfd_offbufbuf_offleninifr=0thenbuf_off(* end of file *)elseifr=lenthenbuf_off+relse(aux[@tailcall])(fd_off++Int64.of_intr)(buf_off+r)(len-r)in(aux[@tailcall])off0lenletunsafe_writet~offbuf=letbuf=Bytes.unsafe_of_stringbufinreally_writet.fdoffbuf;t.cursor<-off++Int64.of_int(Bytes.lengthbuf);Stats.add_write(Bytes.lengthbuf)letunsafe_readt~off~lenbuf=letn=really_readt.fdofflenbufint.cursor<-off++Int64.of_intn;Stats.add_readn;nmoduleOffset=structletsettn=letbuf=encode_int64ninunsafe_writet~off:0Lbufletgett=letbuf=Bytes.create8inletn=unsafe_readt~off:0L~len:8bufinassert(n=8);decode_int64(Bytes.unsafe_to_stringbuf)endmoduleVersion=structletgett=letbuf=Bytes.create8inletn=unsafe_readt~off:8L~len:8bufinassert(n=8);Bytes.unsafe_to_stringbufletsett=unsafe_writet~off:8Lcurrent_versionendmoduleGeneration=structletgett=letbuf=Bytes.create8inletn=unsafe_readt~off:16L~len:8bufinassert(n=8);decode_int64(Bytes.unsafe_to_stringbuf)letsettgen=letbuf=encode_int64geninunsafe_writet~off:16LbufendmoduleFan=structletsettbuf=letsize=encode_int64(Int64.of_int(String.lengthbuf))inunsafe_writet~off:24Lsize;ifbuf<>""thenunsafe_writet~off:(24L++8L)bufletget_sizet=letsize_buf=Bytes.create8inletn=unsafe_readt~off:24L~len:8size_bufinassert(n=8);decode_int64(Bytes.unsafe_to_stringsize_buf)letset_sizetsize=letbuf=encode_int64sizeinunsafe_writet~off:24Lbufletgett=letsize=Int64.to_int(get_sizet)inletbuf=Bytes.createsizeinletn=unsafe_readt~off:(24L++8L)~len:sizebufinassert(n=size);Bytes.unsafe_to_stringbufendendtypet={file:string;mutableheader:int64;mutableraw:Raw.t;mutableoffset:int64;mutableflushed:int64;mutablefan_size:int64;readonly:bool;version:string;buf:Buffer.t;}letsynct=ift.readonlythenraiseRO_not_allowed;letbuf=Buffer.contentst.bufinletoffset=t.offsetinBuffer.cleart.buf;ifbuf=""then()else(Raw.unsafe_writet.raw~off:t.flushedbuf;Raw.Offset.sett.rawoffset;assert(t.flushed++Int64.of_int(String.lengthbuf)=t.header++offset);t.flushed<-offset++t.header)letnamet=t.fileletrename~src~dst=syncsrc;Unix.closedst.raw.fd;Unix.renamesrc.filedst.file;Buffer.cleardst.buf;dst.header<-src.header;dst.fan_size<-src.fan_size;dst.offset<-src.offset;dst.flushed<-src.flushed;dst.raw<-src.rawletcloset=ifnott.readonlythenBuffer.cleart.buf;Unix.closet.raw.fdletauto_flush_limit=1_000_000Lletappendtbuf=ift.readonlythenraiseRO_not_allowed;Buffer.add_stringt.bufbuf;letlen=Int64.of_int(String.lengthbuf)int.offset<-t.offset++len;ift.offset--t.flushed>auto_flush_limitthensynctletreadt~off~lenbuf=ifnott.readonlythenassert(t.header++off<=t.flushed);Raw.unsafe_readt.raw~off:(t.header++off)~lenbufletoffsett=t.offsetletforce_offsett=t.offset<-Raw.Offset.gett.raw;t.offsetletversiont=t.versionletget_generationt=leti=Raw.Generation.gett.rawinLog.debug(funm->m"get_generation: %Ld"i);iletset_generationti=Log.debug(funm->m"set_generation: %Ld"i);Raw.Generation.sett.rawiletget_fanoutt=Raw.Fan.gett.rawletset_fanouttbuf=assert(Int64.equal(Int64.of_int(String.lengthbuf))t.fan_size);Raw.Fan.sett.rawbufletreadonlyt=t.readonlyletprotect_unix_exn=function|Unix.Unix_error_ase->failwith(Printexc.to_stringe)|e->raiseeletignore_enoent=function|Unix.Unix_error(Unix.ENOENT,_,_)->()|e->raiseeletprotectfx=tryfxwithe->protect_unix_exneletsafefx=tryfxwithe->ignore_enoenteletmkdirdirname=letrecauxdirk=ifSys.file_existsdir&&Sys.is_directorydirthenk()else(ifSys.file_existsdirthensafeUnix.unlinkdir;(aux[@tailcall])(Filename.dirnamedir)@@fun()->protect(Unix.mkdirdir)0o755;k())in(aux[@tailcall])dirname(fun()->())letclear?(keep_generation=false)t=t.offset<-0L;t.flushed<-t.header;ifnotkeep_generationthenRaw.Generation.sett.raw0L;Raw.Offset.sett.rawt.offset;Raw.Fan.sett.raw"";Buffer.cleart.bufletbuffers=Hashtbl.create256letbufferfile=tryHashtbl.findbuffersfilewithNot_found->letbuf=Buffer.create(4*1024)inHashtbl.addbuffersfilebuf;buflet()=assert(String.lengthcurrent_version=8)letv~readonly~fresh~generation~fan_sizefile=letv~fan_size~offset~versionraw=letheader=8L++8L++8L++8L++fan_sizein{version;header;file;offset;raw;readonly;fan_size;buf=bufferfile;flushed=header++offset;}inletmode=Unix.(ifreadonlythenO_RDONLYelseO_RDWR)inmkdir(Filename.dirnamefile);matchSys.file_existsfilewith|false->letx=Unix.openfilefileUnix.[O_CREAT;O_CLOEXEC;mode]0o644inletraw=Raw.vxinRaw.Offset.setraw0L;Raw.Fan.set_sizerawfan_size;Raw.Version.setraw;Raw.Generation.setrawgeneration;v~fan_size~offset:0L~version:current_versionraw|true->letx=Unix.openfilefileUnix.[O_EXCL;O_CLOEXEC;mode]0o644inletraw=Raw.vxinifreadonly&&freshthenFmt.failwith"IO.v: cannot reset a readonly file"elseiffreshthen(Raw.Offset.setraw0L;Raw.Fan.set_sizerawfan_size;Raw.Version.setraw;Raw.Generation.setrawgeneration;v~fan_size~offset:0L~version:current_versionraw)elseletoffset=Raw.Offset.getrawinletversion=Raw.Version.getrawinletfan_size=Raw.Fan.get_sizerawinv~fan_size~offset~versionrawtypelock={path:string;fd:Unix.file_descr}exceptionLockedofstringletunsafe_lockopf=mkdir(Filename.dirnamef);letfd=Unix.openfilef[Unix.O_CREAT;Unix.O_RDWR]0o600andpid=string_of_int(Unix.getpid())inletpid_len=String.lengthpidintryUnix.lockffdop0;ifUnix.single_write_substringfdpid0pid_len<>pid_lenthen(Unix.closefd;failwith"Unable to write PID to lock file")elseSomefdwith|Unix.Unix_error(Unix.EAGAIN,_,_)->Unix.closefd;None|e->Unix.closefd;raiseeleterr_rw_lockpath=letic=open_inpathinletline=input_lineicinclose_inic;letpid=int_of_stringlineinLog.err(funl->l"Cannot lock %s: index is already opened in write mode by PID %d. \
Current PID is %d."pathpid(Unix.getpid()));raise(Lockedpath)letlockpath=Log.debug(funl->l"Locking %s"path);matchunsafe_lockUnix.F_TLOCKpathwith|Somefd->{path;fd}|None->err_rw_lockpathletunlock{path;fd}=Log.debug(funl->l"Unlocking %s"path);Unix.closefdmoduleMutex=structincludeMutexletwith_locktf=Mutex.lockt;tryletans=f()inMutex.unlockt;answithe->Mutex.unlockt;raiseeendtypeasync=Thread.toptionletasyncf=Some(Thread.createf())letyield=Thread.yieldletreturn()=Noneletawaitt=matchtwithNone->()|Somet->Thread.jointendmoduleMake(K:Index.Key)(V:Index.Value)=Index.Make(K)(V)(IO)modulePrivate=structmoduleIO=IOmoduleMake(K:Index.Key)(V:Index.Value)=Index.Private.Make(K)(V)(IO)end