123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175(*
* Copyright (c) 2022-2022 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)openImportincludeAppend_only_file_intfmoduleMake(Io:Io.S)(Errs:Io_errors.SwithmoduleIo=Io)=structmoduleIo=IomoduleErrs=Errstypet={io:Io.t;mutablepersisted_end_poff:int63;dead_header_size:int63;rw_perm:rw_permoption;}andauto_flush_procedure=[`Internal|`Externaloft->unit]andrw_perm={buf:Buffer.t;auto_flush_threshold:int;auto_flush_procedure:auto_flush_procedure;}(** [rw_perm] contains the data necessary to operate in readwrite mode. *)letcreate_rw~path~overwrite~auto_flush_threshold~auto_flush_procedure=letopenResult_syntaxinlet+io=Io.create~path~overwriteinletpersisted_end_poff=Int63.zeroinletbuf=Buffer.create0in{io;persisted_end_poff;dead_header_size=Int63.zero;rw_perm=Some{buf;auto_flush_threshold;auto_flush_procedure};}(** A store is consistent if the real offset of the suffix/dict files is the
one recorded in the control file. When opening the store, the offset from
the control file is passed as the [end_poff] argument to the [open_ro],
[open_rw] functions. The [end_poff] from the control file is then used as
the real offset.
In case of a crash, we can only recover if the [end_poff] is smaller than
the real offset. We cannot recover otherwise, because we have no
guarantees that the last object fsynced to disk is written entirely to
disk. *)letcheck_consistent_store~end_poff~dead_header_sizeio=letopenResult_syntaxinlet*real_offset=Io.read_sizeioinletdead_header_size=Int63.of_intdead_header_sizeinletreal_offset_without_header=Int63.Syntax.(real_offset-dead_header_size)inifreal_offset_without_header<end_poffthenError`Inconsistent_storeelse(ifreal_offset_without_header>end_poffthen[%log.warn"The end offset in the control file %a is smaller than the offset on \
disk %a for %s; the store was closed in a inconsistent state."Int63.ppend_poffInt63.ppreal_offset_without_header(Io.pathio)];Ok())letopen_rw~path~end_poff~dead_header_size~auto_flush_threshold~auto_flush_procedure=letopenResult_syntaxinlet*io=Io.open_~path~readonly:falseinlet+()=check_consistent_store~end_poff~dead_header_sizeioinletpersisted_end_poff=end_poffinletdead_header_size=Int63.of_intdead_header_sizeinletbuf=Buffer.create0in{io;persisted_end_poff;dead_header_size;rw_perm=Some{buf;auto_flush_threshold;auto_flush_procedure};}letopen_ro~path~end_poff~dead_header_size=letopenResult_syntaxinlet*io=Io.open_~path~readonly:trueinlet+()=check_consistent_store~end_poff~dead_header_sizeioinletpersisted_end_poff=end_poffinletdead_header_size=Int63.of_intdead_header_sizein{io;persisted_end_poff;dead_header_size;rw_perm=None}letempty_buffer=function|{rw_perm=Some{buf;_};_}whenBuffer.lengthbuf>0->false|_->trueletcloset=ifnot@@empty_buffertthenError`Pending_flushelseIo.closet.ioletreadonlyt=Io.readonlyt.ioletpatht=Io.patht.ioletauto_flush_threshold=function|{rw_perm=None;_}->None|{rw_perm=Somerw_perm;_}->Somerw_perm.auto_flush_thresholdletend_pofft=matcht.rw_permwith|None->t.persisted_end_poff|Somerw_perm->letopenInt63.Syntaxint.persisted_end_poff+(Buffer.lengthrw_perm.buf|>Int63.of_int)letrefresh_end_pofftnew_end_poff=matcht.rw_permwith|Some_->Error`Rw_not_allowed|None->t.persisted_end_poff<-new_end_poff;Ok()letflusht=matcht.rw_permwith|None->Error`Ro_not_allowed|Somerw_perm->letopenResult_syntaxinletopenInt63.Syntaxinlets=Buffer.contentsrw_perm.bufinletoff=t.persisted_end_poff+t.dead_header_sizeinlet+()=Io.write_stringt.io~offsint.persisted_end_poff<-t.persisted_end_poff+(String.lengths|>Int63.of_int);(* [truncate] is semantically identical to [clear], except that
[truncate] doesn't deallocate the internal buffer. We use
[clear] in legacy_io. *)Buffer.truncaterw_perm.buf0letfsynct=Io.fsynct.ioletread_exnt~off~lenb=letopenInt63.Syntaxinletoff'=off+Int63.of_intleninifoff'>t.persisted_end_poffthenraise(Errors.Pack_error`Read_out_of_bounds);letoff=off+t.dead_header_sizeinIo.read_exnt.io~off~lenbletread_to_stringt~off~len=letopenInt63.Syntaxinletoff'=off+Int63.of_intleninifoff'>t.persisted_end_poffthenError`Read_out_of_boundselseletoff=off+t.dead_header_sizeinIo.read_to_stringt.io~off~lenletappend_exnts=matcht.rw_permwith|None->raiseErrors.RO_not_allowed|Somerw_perm->(assert(Buffer.lengthrw_perm.buf<rw_perm.auto_flush_threshold);Buffer.add_stringrw_perm.bufs;ifBuffer.lengthrw_perm.buf>=rw_perm.auto_flush_thresholdthenmatchrw_perm.auto_flush_procedurewith|`Internal->flusht|>Errs.raise_if_error|`Externalcb->cbt;assert(empty_buffert))end