openCoreopenImportmoduleScheduler=Raw_schedulermoduleUnix=Unix_syscallsmoduleId=Unique_id.Int63()moduleRead_result=structmoduleZ=structtype'at=[`Okof'a|`Eof][@@derivingbin_io,sexp]letbinda~f=matchawith|`Oka->fa|`Eof->`Eof;;letmapa~f=matchawith|`Oka->`Ok(fa)|`Eof->`Eof;;letmap=`Custommapletreturna=`OkaendincludeZincludeMonad.Make(Z)end(* We put everything in module [Internal] and then expose just the functions we want
later. This reminds us to wrap functions with [do_read], which we do to prevent
multiple simultaneous active uses of a reader. *)moduleInternal=structmoduleState=structtypet=[`Not_in_use|`In_use|`Closed][@@derivingsexp]endmoduleOpen_flags=Unix.Open_flagstypeopen_flags=[`Already_closed|`OkofOpen_flags.t|`Errorofexn][@@derivingsexp_of]typet={fd:Fd.t;id:Id.t;(* [buf] holds data read by the reader from the OS, but not yet read by user code.
When [t] is closed, [buf] is set to the empty buffer. So, we must make sure in
any code that accesses [buf] that [t] has not been closed. In particular, after
any deferred operation, we must check whether [t] has been closed while we were
waiting. *)mutablebuf:Bigstring.t;(* [close_may_destroy_buf] indicates whether a call to [close] can immediately
destroy [buf]. [close_may_destroy_buf] is usually [`Yes], except when we're in
the middle of a system call in another thread that refers to [buf], in which case
it is [`Not_now] and [close] can't destroy [buf], and we must wait until that
system call finishes before doing so.
[`Not_ever] is used for [read_one_chunk_at_a_time], which exposes[buf]
to client code, which may in turn hold on to it (e.g. via
[Bigstring.sub_shared]), and thus it is not safe to ever destroy it. *)mutableclose_may_destroy_buf:[`Yes|`Not_now|`Not_ever];(* [pos] is the first byte of data in [buf] to b be read by user code. *)mutablepos:int;(* [available] is how many bytes in [buf] are available to be read by user code. *)mutableavailable:int;(* [`Closed] means that [close t] has been called. [`In_use] means there is some
user call extant that is waiting for data from the reader. *)mutablestate:State.t;close_finished:unitIvar.t;mutablelast_read_time:Time.t;(* [open_flags] is the open-file-descriptor bits of [fd]. It is created when [t] is
created, and starts a deferred computation that calls [Unix.fcntl_getfl].
[open_flags] is used to report an error when [fd] is not readable. [Fd] treats
the call to [fcntl_getfl] as an active system call, which prevents [Unix.close
fd] from completing until [fcntl_getfl] finishes. This prevents a
file-descriptor or thread leak even though client code doesn't explicitly wait on
[open_flags]. *)open_flags:open_flagsDeferred.t}[@@derivingfields]letsexp_of_tt=[%sexp(t.fd:Fd.t_hum)]typet_internals=tletsexp_of_t_internals{available;buf=_;close_finished;close_may_destroy_buf;id;fd;last_read_time;open_flags;pos;state}=letunless_testingx=Option.some_if(notam_running_inline_test)xin[%sexp{id=(id|>unless_testing:(Id.toption[@sexp.option]));state:State.t;available:int;pos:int;open_flags=(open_flags|>unless_testing:(open_flagsDeferred.toption[@sexp.option]));last_read_time=(last_read_time|>unless_testing:(Time.toption[@sexp.option]));close_may_destroy_buf:[`Yes|`Not_now|`Not_ever];close_finished:unitIvar.t;fd=(fd|>unless_testing:(Fd.toption[@sexp.option]))}];;letio_stats=Io_stats.create()letinvariantt:unit=assert(0<=t.pos);assert(0<=t.available);assert(t.pos+t.available<=Bigstring.lengtht.buf);;letcreate?buf_lenfd=letbuf_len=matchbuf_lenwith|None->(matchFd.kindfdwith|Char|File->32*1024|Fifo|Socket_->128*1024)|Somebuf_len->ifbuf_len>0thenbuf_lenelseraise_s[%message"Reader.create got non positive buf_len"(buf_len:int)(fd:Fd.t)]inletopen_flags=Fd.syscall_in_threadfd~name:"fcntl_getfl"(funfile_descr->Core.Unix.fcntl_getflfile_descr)in{fd;id=Id.create();buf=Bigstring.createbuf_len;close_may_destroy_buf=`Yes;pos=0;available=0;state=`Not_in_use;close_finished=Ivar.create();last_read_time=Scheduler.cycle_start();open_flags};;letof_in_channelickind=create(Fd.of_in_channelickind)letopen_file?buf_lenfile=let%mapfd=Unix.openfilefile~mode:[`Rdonly]~perm:0o000increatefd?buf_len;;letstdin=lazy(create(Fd.stdin()))letclose_finishedt=Ivar.readt.close_finishedletis_closedt=matcht.statewith|`Closed->true|`Not_in_use|`In_use->false;;letempty_buf=Bigstring.create0letdestroyt=(* Calling [unsafe_destroy] on [t]'s bigstrings rather than waiting for finalizers to
free them makes their space immediately available for reuse by C's malloc. *)Bigstring.unsafe_destroyt.buf;t.buf<-empty_buf;;letcloset=(matcht.statewith|`Closed->()|`Not_in_use|`In_use->t.state<-`Closed;upon(Unix.closet.fd)(fun()->Ivar.fillt.close_finished());t.pos<-0;t.available<-0;(matcht.close_may_destroy_bufwith|`Yes->destroyt|`Not_now|`Not_ever->()));close_finishedt;;letwith_closet~f=Monitor.protectf~finally:(fun()->closet)letwith_reader_exclusivetf=let%bind()=Unix.lockft.fd`ReadinMonitor.protectf~finally:(fun()->ifnot(Fd.is_closedt.fd)thenUnix.unlockft.fd;return());;letwith_file?buf_len?(exclusive=false)file~f=let%bindt=open_file?buf_lenfileinwith_closet~f:(fun()->ifexclusivethenwith_reader_exclusivet(fun()->ft)elseft);;(* [get_data t] attempts to read data into [t.buf]. If the read gets data, [get_data]
returns [`Ok], otherwise it returns [`Eof]. *)letget_datat:[`Ok|`Eof]Deferred.t=Deferred.create(funresult->t.open_flags>>>funopen_flags->leteof()=Ivar.fillresult`Eofinmatcht.state,open_flagswith|`Not_in_use,_->assertfalse|`Closed,_|_,`Already_closed->eof()|`In_use,((`Error_|`Ok_)asopen_flags)->letcan_read_fd=matchopen_flagswith|`Error_->false|`Okopen_flags->Unix.Open_flags.can_readopen_flagsinifnotcan_read_fdthenraise_s[%message"not allowed to read due to file-descriptor flags"(open_flags:open_flags)~reader:(t:t)];letebadf()=(* If the file descriptor has been closed, we will get EBADF from a syscall.
If someone closed the [Fd.t] using [Fd.close], then that is fine. But if the
underlying file descriptor got closed in some other way, then something is
likely wrong, so we raise. *)raise_s[%message"reader file descriptor was unexpectedly closed"~reader:(t:t)]inletfinishreshandle=matchreswith|`Already_closed->eof()|`Errorexn->(matchexnwith|Bigstring.IOError(0,End_of_file)|Unix.Unix_error((ECONNRESET|EHOSTUNREACH|ENETDOWN|ENETRESET|ENETUNREACH(* When using OpenOnload, read() can return EPIPE if a TCP connection
is established and then immediately closed. *)|EPIPE|ETIMEDOUT),_,_)->eof()|Unix.Unix_error(EBADF,_,_)->ebadf()|_->handleexn)|`Ok(bytes_read,read_time)->Io_stats.updateio_stats~kind:(Fd.kindt.fd)~bytes:(Int63.of_intbytes_read);ifbytes_read=0theneof()else(t.pos<-0;t.available<-t.available+bytes_read;t.last_read_time<-read_time;Ivar.fillresult`Ok)inletbuf=t.bufinift.available>0&&t.pos>0then(Bigstring.blit~src:buf~src_pos:t.pos~dst:buf~dst_pos:0~len:t.available;t.pos<-0);letpos=t.availableinletlen=Bigstring.lengthbuf-posinifnot(Fd.supports_nonblockt.fd)then((matcht.close_may_destroy_bufwith|`Yes->t.close_may_destroy_buf<-`Not_now|`Not_now|`Not_ever->());Fd.syscall_in_threadt.fd~name:"read"(funfile_descr->letres=Bigstring.readfile_descrbuf~pos~leninres,Time.now())>>>funres->(matcht.close_may_destroy_bufwith|`Not_now->t.close_may_destroy_buf<-`Yes|`Yes|`Not_ever->());matcht.statewith|`Not_in_use->assertfalse|`In_use->finishresraise|`Closed->(* If we're here, somebody [close]d the reader while we were making the system
call. [close] couldn't [destroy], so we need to. *)destroyt;eof())else(letrecloop()=(* Force the async cycle to end between reads, allowing others to run. *)Fd.ready_tot.fd`Read>>>function|`Bad_fd->ebadf()|`Closed->eof()|`Ready->(* There is a race between the [ready_to] becoming determined and someone
[close]ing [t]. It is possible to get [`Ready] and then by the time we
get here, [t] is closed. *)(matcht.statewith|`Not_in_use->assertfalse|`Closed->eof()|`In_use->finish(Fd.syscallt.fd~nonblocking:true(funfile_descr->letres=Unix.Syscall_result.Int.ok_or_unix_error_exn(Bigstring.read_assume_fd_is_nonblockingfile_descrbuf~pos~len)~syscall_name:"read"inres,Scheduler.cycle_start()))(function(* Since [t.fd] is ready, we should never see EWOULDBLOCK or EAGAIN.
But we don't trust the OS. So, in case it does, we just try
again. *)|Unix.Unix_error((EWOULDBLOCK|EAGAIN),_,_)->loop()|exn->raiseexn))inloop()));;letensure_buf_lent~at_least=letbuf_len=Bigstring.lengtht.bufinifbuf_len<at_leastthen(letnew_buf=Bigstring.create(Int.maxat_least(2*Bigstring.lengtht.buf))inift.available>0thenBigstring.blit~src:t.buf~src_pos:t.pos~len:t.available~dst:new_buf~dst_pos:0;t.buf<-new_buf;t.pos<-0);assert(Bigstring.lengtht.buf>=at_least);;(* [get_data_until] calls [get_data] to read into [t.buf] until [t.available >=
available_at_least], or until it reaches EOF. It returns [`Ok] if [t.available >=
available_at_least], and [`Eof] if not. *)letget_data_untilt~available_at_least=ift.available>=available_at_leastthenreturn`Okelse(ensure_buf_lent~at_least:available_at_least;ift.pos>0then(Bigstring.blit~src:t.buf~src_pos:t.pos~dst:t.buf~dst_pos:0~len:t.available;t.pos<-0);letrecloop()=let%bindresult=get_datatinift.available>=available_at_leastthenreturn`Okelse(matchresultwith|`Eof->return(`Eoft.available)|`Ok->loop())inloop());;(* [with_nonempty_buffer t f] waits for [t.buf] to have data, and then returns [f `Ok].
If no data can be read, then [with_nonempty_buffer] returns [f `Eof].
[with_nonempty_buffer] must be called with [t.state] as [`Closed] or [`In_use]. It
guarantees that if [f `Ok] is called, that [t.state = `In_use]. *)letwith_nonempty_buffer(typea)t(f:[`Ok|`Eof]->a):aDeferred.t=matcht.statewith|`Not_in_use->assertfalse|`Closed->return(f`Eof)|`In_use->ift.available>0thenreturn(f`Ok)else(let%mapok_or_eof=get_datatinmatcht.statewith|`Not_in_use->assertfalse|`Closed->f`Eof|`In_use->fok_or_eof);;(* [with_nonempty_buffer' t f] is an optimized version of
[don't_wait_for (with_nonempty_buffer t f)].
With [force_refill = true], [with_nonempty_buffer'] will do a read, whether or not
there is already data available in [t.buf]. *)letwith_nonempty_buffer'?(force_refill=false)t(f:[`Ok|`Eof]->unit):unit=matcht.statewith|`Not_in_use->assertfalse|`Closed->f`Eof|`In_use->if(notforce_refill)&&t.available>0thenf`Okelseget_datat>>>funok_or_eof->(matcht.statewith|`Not_in_use->assertfalse|`Closed->f`Eof|`In_use->fok_or_eof);;letconsumetamount=assert(0<=amount&&amount<=t.available);t.pos<-t.pos+amount;t.available<-t.available-amount;;type'ahandle_chunk_result=[`Stopof'a|`Stop_consumedof'a*int|`Continue|`Consumedofint*[`Needofint|`Need_unknown]][@@derivingsexp_of]type'aread_one_chunk_at_a_time_result=[`Eof|`Stoppedof'a|`Eof_with_unconsumed_dataofstring][@@derivingsexp_of]typeconsumed=[`Consumedofint*[`Needofint|`Need_unknown]][@@derivingsexp_of]letread_one_chunk_at_a_timet~handle_chunk=t.close_may_destroy_buf<-`Not_ever;Deferred.create(funfinal_result->letrecloop~force_refill=with_nonempty_buffer't~force_refill(function|`Eof->letresult=ift.available>0then`Eof_with_unconsumed_data(Bigstring.to_stringt.buf~pos:t.pos~len:t.available)else`EofinIvar.fillfinal_resultresult|`Ok->letlen=t.availableinletcontinuez=matcht.statewith|`Not_in_use->assertfalse|`Closed->Ivar.fillfinal_result`Eof|`In_use->(matchzwith|`Stopa->consumetlen;Ivar.fillfinal_result(`Stoppeda)|`Stop_consumed(a,consumed)->consumetconsumed;Ivar.fillfinal_result(`Stoppeda)|`Continue->consumetlen;loop~force_refill:true|`Consumed(consumed,need)asc->ifconsumed<0||consumed>len||matchneedwith|`Need_unknown->false|`Needneed->need<0||consumed+need<=lenthenraise_s[%message"handle_chunk returned invalid `Consumed"~_:(c:consumed)(len:int)~reader:(t:t)];consumetconsumed;letbuf_len=Bigstring.lengtht.bufinletnew_len=matchneedwith|`Need_unknown->ift.available=buf_len(* The buffer is full and the client doesn't know how much to
expect: double the buffer size. *)thenbuf_len*2elsebuf_len|`Needneed->ifneed>buf_lenthenInt.maxneed(buf_len*2)elsebuf_leninifnew_len<0thenraise_s[%message"read_one_chunk_at_a_time got overflow in buffer len"~reader:(t:t_internals)];(* Grow the internal buffer if needed. *)ifnew_len>buf_lenthen(letnew_buf=Bigstring.createnew_leninift.available>0thenBigstring.blit~src:t.buf~src_pos:t.pos~len:t.available~dst:new_buf~dst_pos:0;t.buf<-new_buf;t.pos<-0);loop~force_refill:true)inletdeferred=handle_chunkt.buf~pos:t.pos~lenin(matchDeferred.peekdeferredwith|None->deferred>>>continue|Someresult->continueresult))inloop~force_refill:false);;type'ahandle_iobuf_result=[`Stopof'a|`Continue][@@derivingsexp_of]letread_one_iobuf_at_a_timet~handle_chunk=letiobuf=Iobuf.of_bigstringt.bufinread_one_chunk_at_a_timet~handle_chunk:(funbstr~pos~len->Iobuf.Expert.reinitialize_of_bigstringiobufbstr~pos~len;let%maphandle_result=handle_chunkiobufinifIobuf.is_emptyiobuf(* [is_empty] implies all data was consumed *)then(handle_result:>_handle_chunk_result)else(letconsumed=len-Iobuf.lengthiobufinmatchhandle_resultwith|`Continue->`Consumed(consumed,`Need_unknown)|`Stopa->`Stop_consumed(a,consumed)));;moduleRead(S:Substring_intf.S)(Name:sigvalname:stringend)=structletread_availablets=letlen=Int.mint.available(S.lengths)inS.blit_from_bigstrings~src:t.buf~src_pos:t.pos~len;consumetlen;len;;letreadts=ifS.lengths=0theninvalid_argf"Reader.read_%s with empty string"Name.name();with_nonempty_buffert(function|`Ok->`Ok(read_availablets)|`Eof->`Eof);;letreally_readts=Deferred.create(funresult->letrecloopsamount_read=ifS.lengths=0thenIvar.fillresult`Okelsereadts>>>function|`Eof->Ivar.fillresult(`Eofamount_read)|`Oklen->loop(S.drop_prefixslen)(amount_read+len)inloops0);;endmoduleRead_substring=Read(Substring)(structletname="substring"end)letread_substring_available=Read_substring.read_availableletread_substring=Read_substring.readletreally_read_substring=Read_substring.really_readmoduleRead_bigsubstring=Read(Bigsubstring)(structletname="bigsubstring"end)letread_bigsubstring=Read_bigsubstring.readletreally_read_bigsubstring=Read_bigsubstring.really_readletreally_read_bigstringtbigstring=really_read_bigsubstringt(Bigsubstring.createbigstring);;letpeek_availablet~len=Bigstring.to_stringt.buf~pos:t.pos~len:(Int.minlent.available);;letpeekt~len=match%mapget_data_untilt~available_at_least:lenwith|`Eof(_:int)->assert(t.available<len);`Eof|`Ok->assert(t.available>=len);`Ok(Bigstring.to_stringt.buf~pos:t.pos~len);;letread_availablet?pos?lens=read_substring_availablet(Substring.creates?pos?len);;letreadt?pos?lens=read_substringt(Substring.creates?pos?len)letreally_readt?pos?lens=really_read_substringt(Substring.creates?pos?len)letread_chart=with_nonempty_buffert(function|`Eof->`Eof|`Ok->letc=t.buf.{t.pos}inconsumet1;`Okc);;letfirst_chartp=letlimit=t.pos+t.availableinletbuf=t.bufinmatchpwith|`Predp->letreclooppos=ifpos=limitthenNoneelseifpbuf.{pos}thenSomeposelseloop(pos+1)in(* [p] is supplied by the user and may raise, so we wrap [loop] in a [try_with]. We
put the [try_with] here rather than around the call to [p] to avoid per-character
try-with overhead. *)Or_error.try_with(fun()->loopt.pos)|`Charch->letreclooppos=ifpos=limitthenNoneelseifChar.O.(ch=buf.{pos})thenSomeposelseloop(pos+1)inOk(loopt.pos);;letread_until_gentp~keep_delim~maxk=letrecloopactotal=with_nonempty_buffer't(function|`Eof->k(Ok(ifList.is_emptyacthen`Eofelse`Eof_without_delim(Bigsubstring.concat_string(List.revac))))|`Ok->letconcat_helpersslst=Bigsubstring.concat_string(List.rev_appendlst[ss])in(matchfirst_chartpwith|Error_ase->ke|OkNone->letlen=t.availableinlettotal=total+leninletss=Bigsubstring.createt.buf~pos:t.pos~lenint.buf<-Bigstring.create(Bigstring.lengtht.buf);t.pos<-0;t.available<-0;(matchmaxwith|Somemaxwhentotal>max->lets=concat_helperssacink(Ok(`Max_exceededs))|Some_|None->loop(ss::ac)total)|Ok(Somepos)->letamount_consumed=pos+1-t.posinletlen=ifkeep_delimthenamount_consumedelseamount_consumed-1inletss=Bigsubstring.createt.buf~pos:t.pos~leninconsumetamount_consumed;letres=concat_helperssacink(Ok(`Okres))))inloop[]0;;letread_untiltpred~keep_delimk=read_until_gentpred~keep_delim~max:None(function|Error_asx->kx|Ok(`Max_exceeded_)->assertfalse(* impossible - no maximum set *)|Ok(`Eof|`Eof_without_delim_|`Ok_)asx->kx);;letline_delimiter_pred=`Char'\n'letread_line_gentk=read_untiltline_delimiter_pred~keep_delim:false(function|Error_->(* Impossible, since we supplied a [`Char] predicate. *)assertfalse|Ok((`Eof|`Eof_without_delim_)asx)->kx|Ok(`Okline)->k(`Ok(letlen=String.lengthlineiniflen>=1&&Char.O.(line.[len-1]='\r')thenString.subline~pos:0~len:(len-1)elseline)));;letread_linet=Deferred.create(funresult->read_line_gent(funz->Ivar.fillresult(matchzwith|`Eof_without_delimstr->`Okstr|(`Ok_|`Eof)asx->x)));;letreally_read_line~wait_timet=Deferred.create(funresult->letfill_result=function|[]->Ivar.fillresultNone|ac->Ivar.fillresult(Some(String.concat(List.revac)))inletreccontinueac=matcht.statewith|`Not_in_use->assertfalse|`Closed->fill_resultac|`In_use->Clock.afterwait_time>>>fun()->loopacandloopac=read_line_gent(function|`Eof->continueac|`Eof_without_delimstr->continue(str::ac)|`Okline->fill_result(line::ac))inloop[]);;letspace=Bigstring.of_string" "type'sexpsexp_kind=|Plain:Sexp.tsexp_kind|Annotated:Sexp.Annotated.tsexp_kindletgen_read_sexp(typesexp)?parse_post~(sexp_kind:sexpsexp_kind)k=letrecloopparse_fun=with_nonempty_buffer't(function|`Eof->(* The sexp parser doesn't know that a token ends at EOF, so we add a space to
be sure. *)(matchOr_error.try_with(fun()->parse_fun~pos:0~len:1space)with|Error_ase->ke|Ok(Sexp.Done(sexp,parse_pos))->k(Ok(`Ok(sexp,parse_pos)))|Ok(Cont(Parsing_toplevel_whitespace,_))->k(Ok`Eof)|Ok(Cont((Parsing_atom|Parsing_list|Parsing_nested_whitespace|Parsing_sexp_comment|Parsing_block_comment),_))->raise_s[%message"Reader.read_sexp got unexpected eof"~reader:(t:t)])|`Ok->(matchOr_error.try_with(fun()->parse_fun~pos:t.pos~len:t.availablet.buf)with|Error_ase->ke|Ok(Done(sexp,parse_pos))->consumet(parse_pos.buf_pos-t.pos);k(Ok(`Ok(sexp,parse_pos)))|Ok(Cont(_,parse_fun))->t.available<-0;loopparse_fun))inletparse~pos~lenbuf:(_,sexp)Sexp.parse_result=(* [parse_pos] will be threaded through the entire reading process by the sexplib
code. Every occurrence of [parse_pos] above will be identical to the [parse_pos]
defined here. *)letparse_pos=matchparse_poswith|None->Sexp.Parse_pos.create~buf_pos:pos()|Someparse_pos->Sexp.Parse_pos.with_buf_posparse_posposinmatchsexp_kindwith|Plain->Sexp.parse_bigstring?parse_pos:(Someparse_pos)?len:(Somelen)buf|Annotated->Sexp.Annotated.parse_bigstring?parse_pos:(Someparse_pos)?len:(Somelen)bufinloopparse;;type'aread=?parse_pos:Sexp.Parse_pos.t->'aletgen_read_sexps?parse_post~sexp_kind=letpipe_r,pipe_w=Pipe.create()inletfinished=Deferred.create(funresult->letrecloopparse_pos=gen_read_sexpt~sexp_kind?parse_pos(function|Errorerror->Error.raiseerror|Ok`Eof->Ivar.fillresult()|Ok(`Ok(sexp,parse_pos))->ifPipe.is_closedpipe_wthenIvar.fillresult()elsePipe.writepipe_wsexp>>>fun()->loop(Someparse_pos))inloopparse_pos)inuponfinished(fun()->closet>>>fun()->Pipe.closepipe_w);pipe_r;;letread_sexps?parse_post=gen_read_sexpst~sexp_kind:Plain?parse_posletread_annotated_sexps?parse_post=gen_read_sexpst~sexp_kind:Annotated?parse_pos;;modulePeek_or_read=structtypet=|Peek|Read[@@derivingsexp_of]letto_string=Sexplib.Conv.string_of__of__sexp_of[%sexp_of:t]endletpeek_or_read_bin_prot?(max_len=100*1024*1024)t~(peek_or_read:Peek_or_read.t)(bin_prot_reader:_Bin_prot.Type_class.reader)k=leterrorf=ksprintf(funmsg()->k(Or_error.error"Reader.read_bin_prot"(msg,t)[%sexp_of:string*t]))finlethandle_eofn=ifn=0thenk(Ok`Eof)elseerror"got Eof with %d bytes left over"n()inget_data_untilt~available_at_least:Bin_prot.Utils.size_header_length>>>function|`Eofn->handle_eofn|`Ok->(matcht.statewith|`Not_in_use->assertfalse|`Closed->error"Reader.read_bin_prot got closed reader"()|`In_use->letpos=t.posinletpos_ref=refposin(matchOr_error.try_with(fun()->Bin_prot.Utils.bin_read_size_headert.buf~pos_ref)with|Error_ase->ke|Oklen->if!pos_ref-pos<>Bin_prot.Utils.size_header_lengththenerror"pos_ref <> len, (%d <> %d)"(!pos_ref-pos)Bin_prot.Utils.size_header_length();iflen>max_lenthenerror"max read length exceeded: %d > %d"lenmax_len();iflen<0thenerror"negative length %d"len();letneed=Bin_prot.Utils.size_header_length+leninget_data_untilt~available_at_least:need>>>(function|`Eofn->handle_eofn|`Ok->(matcht.statewith|`Not_in_use->assertfalse|`Closed->error"Reader.read_bin_prot got closed reader"()|`In_use->letpos=t.pos+Bin_prot.Utils.size_header_lengthinpos_ref:=pos;(matchOr_error.try_with(fun()->bin_prot_reader.readt.buf~pos_ref)with|Error_ase->ke|Okv->if!pos_ref-pos<>lenthenerror"pos_ref <> len, (%d <> %d)"(!pos_ref-pos)len();(matchpeek_or_readwith|Peek->()|Read->consumetneed);k(Ok(`Okv)))))));;letread_marshal_rawt=leteofnn=ifn=0then`Eofelseraise_s[%message"Reader.read_marshal got EOF with bytes remaining"~_:(n:int)]inletheader=Bytes.createMarshal.header_sizeinmatch%bindreally_readtheaderwith|`Eofn->return(eofnn)|`Ok->letlen=Marshal.data_sizeheader0inletbuf=Bytes.create(len+Marshal.header_size)inBytes.blit~src:header~dst:buf~src_pos:0~dst_pos:0~len:Marshal.header_size;letsub=Substring.createbuf~pos:Marshal.header_size~lenin(match%mapreally_read_substringtsubwith|`Eofn->eofnn|`Ok->`Okbuf);;letread_marshalt=match%mapread_marshal_rawtwith|`Eof->`Eof|`Okbuf->`Ok(Marshal.from_bytesbuf0);;letread_alltread_one=letpipe_r,pipe_w=Pipe.create()inletfinished=Deferred.repeat_until_finished()(fun()->match%bindread_onetwith|`Eof->return(`Finished())|`Okone->ifPipe.is_closedpipe_wthenreturn(`Finished())else(let%map()=Pipe.writepipe_wonein`Repeat()))inuponfinished(fun()->closet>>>fun()->Pipe.closepipe_w);pipe_r;;letlinest=read_alltread_lineletcontentst=letbuf=Buffer.create1024inletsbuf=Bytes.create1024inlet%bind()=Deferred.repeat_until_finished()(fun()->match%mapreadtsbufwith|`Eof->`Finished()|`Okl->Buffer.add_subbytesbufsbuf~pos:0~len:l;`Repeat())inlet%map()=closetinBuffer.contentsbuf;;letrecvt=Deferred.create(funi->read_linet>>>function|`Eof->Ivar.filli`Eof|`Oklength_str->(matchtryOk(int_of_stringlength_str)with|_->Error()with|Error()->raise_s[%message"Reader.recv got strange length"(length_str:string)~reader:(t:t)]|Oklength->letbuf=Bytes.createlengthinreally_readtbuf>>>(function|`Eof_->raise_s[%message"Reader.recv got unexpected EOF"]|`Ok->Ivar.filli(`Okbuf))));;lettransfertpipe_w=Deferred.create(funfinished->don't_wait_for(let%map()=Pipe.closedpipe_winIvar.fill_if_emptyfinished());letrecloop()=with_nonempty_buffer't(function|`Eof->Ivar.fill_if_emptyfinished()|`Ok->ifnot(Pipe.is_closedpipe_w)then(letpos=t.posinletlen=t.availableinconsumetlen;Pipe.writepipe_w(Bigstring.to_stringt.buf~pos~len)>>>loop))inloop());;endopenInternal(* We now expose all the functions in the mli. For functions that access a reader in a
deferred manner, we enclude code to dynamically ensure that there aren't simultaneous
reads. *)typenonrect=t[@@derivingsexp_of]typenonrec'ahandle_chunk_result='ahandle_chunk_result[@@derivingsexp_of]typenonrec'ahandle_iobuf_result='ahandle_iobuf_result[@@derivingsexp_of]typenonrec'aread_one_chunk_at_a_time_result='aread_one_chunk_at_a_time_result[@@derivingsexp_of]typenonrec'aread='areadletclose=closeletclose_finished=close_finishedletcreate=createletfd=fdletid=idletinvariant=invariantletio_stats=io_statsletis_closed=is_closedletlast_read_time=last_read_timeletof_in_channel=of_in_channelletopen_file=open_fileletstdin=stdinletwith_close=with_closeletwith_file=with_fileletuset=leterrorreason=raise_s[%message"can not read from reader"(reason:string)~reader:(t:t)]inmatcht.statewith|`Closed->error"closed"|`In_use->error"in use"|`Not_in_use->t.state<-`In_use;;letfinished_readt=matcht.statewith|`Closed->()(* [f ()] closed it. Leave it closed. *)|`Not_in_use->assertfalse(* we're using it *)|`In_use->t.state<-`Not_in_use;;letdo_read_nowtf=uset;letx=f()infinished_readt;x;;letbytes_availablet=do_read_nowt(fun()->t.available)letpeek_availablet~len=do_read_nowt(fun()->peek_availablet~len)letread_availablet?pos?lens=do_read_nowt(fun()->read_availablet?pos?lens)letdo_readtf=uset;let%mapx=f()infinished_readt;x;;letpeekt~len=iflen<0thenraise_s[%message"[Reader.peek] got negative len"(len:int)];do_readt(fun()->peekt~len);;letreadt?pos?lens=do_readt(fun()->readt?pos?lens)letread_chart=do_readt(fun()->read_chart)letread_substringts=do_readt(fun()->read_substringts)letread_bigsubstringts=do_readt(fun()->read_bigsubstringts)letread_one_chunk_at_a_timet~handle_chunk=do_readt(fun()->read_one_chunk_at_a_timet~handle_chunk);;letread_one_iobuf_at_a_timet~handle_chunk=do_readt(fun()->read_one_iobuf_at_a_timet~handle_chunk);;letreally_readt?pos?lens=do_readt(fun()->really_readt?pos?lens)letreally_read_substringts=do_readt(fun()->really_read_substringts)letreally_read_bigsubstringts=do_readt(fun()->really_read_bigsubstringts)letread_linet=do_readt(fun()->read_linet)letreally_read_line~wait_timet=do_readt(fun()->really_read_line~wait_timet)(* [do_read_k] takes a [read_k] function that takes a continuation expecting an
[Or_error.t]. It uses this to do a read returning a deferred. This allows it to call
[finished_read] before continuing, in the event that the result is an error. *)letdo_read_k(typerr')t(read_k:(rOr_error.t->unit)->unit)(make_result:r->r'):r'Deferred.t=uset;Deferred.create(funresult->read_k(funr->finished_readt;Ivar.fillresult(make_result(ok_exnr))));;letread_untiltp~keep_delim=do_read_kt(read_untiltp~keep_delim)Fn.idletread_until_maxtp~keep_delim~max=do_read_kt(read_until_gentp~keep_delim~max:(Somemax))Fn.id;;letread_sexp?parse_post=do_read_kt(gen_read_sexpt~sexp_kind:Plain?parse_pos)(function|`Eof->`Eof|`Ok(sexp,_)->`Oksexp);;letread_sexps?parse_post=uset;read_sexps?parse_post;;letread_annotated_sexps?parse_post=uset;read_annotated_sexps?parse_post;;letpeek_or_read_bin_prot?max_lentreader~peek_or_read=do_read_kt(peek_or_read_bin_prot?max_lentreader~peek_or_read)Fn.id;;letpeek_bin_prot?max_lentreader=peek_or_read_bin_prot?max_lentreader~peek_or_read:Peek;;letread_bin_prot?max_lentreader=peek_or_read_bin_prot?max_lentreader~peek_or_read:Read;;letread_marshal_rawt=do_readt(fun()->read_marshal_rawt)letread_marshalt=do_readt(fun()->read_marshalt)letrecvt=do_readt(fun()->recvt)(* [read_all] does not call [use t], because [read_one] will do so each time it is using
[t]. *)letread_alltread_one=read_alltread_oneletlinest=uset;linest;;letcontentst=do_readt(fun()->contentst)letfile_contentsfile=with_filefile~f:contentsletfile_linesfile=let%bindt=open_filefileinPipe.to_list(linest);;lettransfert=uset;transfert;;letlseektoffset~mode=do_readt(fun()->t.pos<-0;t.available<-0;Unix_syscalls.lseekt.fdoffset~mode);;letltellt=do_readt(fun()->let%mapfd_offset=Unix_syscalls.lseekt.fdInt64.zero~mode:`CurinInt64.(-)fd_offset(Int64.of_intt.available));;letget_error(typeasexp)~file~(sexp_kind:sexpsexp_kind)~(a_of_sexp:sexp->a)(annotated_sexp:Sexp.Annotated.t)=tryignore(a_of_sexp(matchsexp_kindwith|Plain->(Sexp.Annotated.get_sexpannotated_sexp:sexp)|Annotated->(annotated_sexp:sexp)):a);Ok()with|exn->letunexpected_error()=error"Reader.load_sexp error"(file,exn)[%sexp_of:string*exn]in(matchexnwith|Of_sexp_error(exc,bad_sexp)->(matchSexp.Annotated.find_sexpannotated_sexpbad_sexpwith|None->unexpected_error()|Somebad_annotated_sexp->(matchSexp.Annotated.get_conv_exn~file~excbad_annotated_sexpwith|Of_sexp_error(Sexp.Annotated.Conv_exn(pos,exn),sexp)->(* The error produced by [get_conv_exn] already has the file position, so
we don't wrap with a redundant error message. *)Or_error.error"invalid sexp"(pos,exn,"in",sexp)[%sexp_of:string*exn*string*Sexp.t]|_->unexpected_error()))|_->unexpected_error());;letgen_load_exn(typesexpa)?exclusive~(sexp_kind:sexpsexp_kind)~file(convert:sexplist->a)(get_error:Sexp.Annotated.tlist->Error.t):aDeferred.t=letmay_load_file_multiple_times=reffalseinletload~sexp_kind=match%mapMonitor.try_with~extract_exn:true(fun()->with_file?exclusivefile~f:(funt->(may_load_file_multiple_times:=(* Although [file] typically is of kind [Fd.Kind.File], it may also have other
kinds. We can only load it multiple times if it has kind [File]. *)matchFd.kind(fdt)with|File->true|Char|Fifo|Socket_->false);uset;Pipe.to_list(gen_read_sexpst~sexp_kind)))with|Oksexps->sexps|Errorexn->(matchexnwith|Sexp.Parse_error{err_msg;parse_state;_}->(* This code reformats the [Parse_error] produced by sexplib to be more
readable. *)letparse_pos=matchparse_statewith|`Sexp{parse_pos;_}|`Annot{parse_pos;_}->parse_posinError.raise(Error.create"syntax error when parsing sexp"(sprintf"%s:%d:%d"fileparse_pos.text_lineparse_pos.text_char,err_msg)[%sexp_of:string*string])|_->raiseexn)inlet%bindsexps=load~sexp_kindintryreturn(convertsexps)with|Of_sexp_error(exn,_bad_subsexp)->if!may_load_file_multiple_timesthen(let%bindsexps=load~sexp_kind:AnnotatedinError.raise(get_errorsexps))elseraise_s[%message"invalid sexp (failed to determine location information)"(file:string)(exn:exn)]|exn->raise_s[%message"Reader.load_sexp(s) error"(file:string)(exn:exn)];;type('sexp,'a,'b)load=?exclusive:bool->string->('sexp->'a)->'bDeferred.tletget_load_result_exn=function|`Resultx->x|`Error(exn,_sexp)->raiseexn;;letgen_load_sexp_exn(typeasexp)?exclusive~(sexp_kind:sexpsexp_kind)~file~(a_of_sexp:sexp->a)=letmultiplesexps=Error.create"Reader.load_sexp requires one sexp but got"(List.lengthsexps,file)[%sexp_of:int*string]ingen_load_exn?exclusive~file~sexp_kind(funsexps->matchsexpswith|[sexp]->a_of_sexpsexp|_->Error.raise(multiplesexps))(funannot_sexps->matchannot_sexpswith|[annot_sexp]->(matchget_error~file~sexp_kind~a_of_sexpannot_sexpwith|Errore->e|Ok()->Error.create"conversion of annotated sexp unexpectedly succeeded"(Sexp.Annotated.get_sexpannot_sexp)[%sexp_of:Sexp.t])|_->multipleannot_sexps);;letload_sexp_exn?exclusivefilea_of_sexp=gen_load_sexp_exn?exclusive~sexp_kind:Plain~file~a_of_sexp;;letload_annotated_sexp_exn?exclusivefilea_of_sexp=gen_load_sexp_exn?exclusive~sexp_kind:Annotated~file~a_of_sexp;;letgen_load_sexp?exclusive~sexp_kind~file~a_of_sexp=Deferred.Or_error.try_with~extract_exn:true(fun()->gen_load_sexp_exn?exclusive~sexp_kind~file~a_of_sexp);;letload_sexp?exclusivefilea_of_sexp=gen_load_sexp?exclusive~sexp_kind:Plain~file~a_of_sexp;;letload_annotated_sexp?exclusivefilea_of_sexp=gen_load_sexp?exclusive~sexp_kind:Annotated~file~a_of_sexp;;letgen_load_sexps_exn(typeasexp)?exclusive~(sexp_kind:sexpsexp_kind)~file~(a_of_sexp:sexp->a)=gen_load_exn?exclusive~file~sexp_kind(funsexps->List.mapsexps~f:a_of_sexp)(funannot_sexps->Error.of_list(List.filter_mapannot_sexps~f:(funannot_sexp->matchget_error~file~sexp_kind~a_of_sexpannot_sexpwith|Ok_->None|Errorerror->Someerror)));;letload_sexps_exn?exclusivefilea_of_sexp=gen_load_sexps_exn?exclusive~sexp_kind:Plain~file~a_of_sexp;;letload_annotated_sexps_exn?exclusivefilea_of_sexp=gen_load_sexps_exn?exclusive~sexp_kind:Annotated~file~a_of_sexp;;letgen_load_sexps?exclusive~sexp_kind~file~a_of_sexp=Deferred.Or_error.try_with~extract_exn:true(fun()->gen_load_sexps_exn?exclusive~sexp_kind~file~a_of_sexp);;letload_sexps?exclusivefilea_of_sexp=gen_load_sexps?exclusive~sexp_kind:Plain~file~a_of_sexp;;letload_annotated_sexps?exclusivefilea_of_sexp=gen_load_sexps?exclusive~sexp_kind:Annotated~file~a_of_sexp;;letpipet=letpipe_r,pipe_w=Pipe.create()inupon(transfertpipe_w)(fun()->closet>>>fun()->Pipe.closepipe_w);pipe_r;;letdraint=match%bindread_one_chunk_at_a_timet~handle_chunk:(fun_bigstring~pos:_~len:_->return`Continue)with|`Stopped_|`Eof_with_unconsumed_data_->assertfalse|`Eof->closet;;type('a,'b)load_bin_prot=?exclusive:bool->?max_len:int->string->'aBin_prot.Type_class.reader->'bDeferred.tletload_bin_prot?exclusive?max_lenfilebin_reader=match%mapMonitor.try_with_or_error~here:[%here]~name:"Reader.load_bin_prot"(fun()->with_file?exclusivefile~f:(funt->read_bin_prot?max_lentbin_reader))with|Ok(`Okv)->Okv|Ok`Eof->Or_error.error_string"Reader.load_bin_prot got unexpected eof"|Error_asresult->result;;letload_bin_prot_exn?exclusive?max_lenfilebin_reader=load_bin_prot?exclusive?max_lenfilebin_reader>>|ok_exn;;