123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473(**
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the "hack" directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*)exceptionTimeoutmoduleAlarm_timeout=struct(** Timeout *)typet=unitletwith_timeout~timeout?on_timeout~do_=letold_handler=refSys.Signal_defaultinletold_timeout=ref0inleton_timeout_sigalrm=matchon_timeoutwith|None->raiseTimeout|Somef->f();raiseTimeoutin(* TODO - something smarter than pausing alarms when running other alarms.
* We should instead have a priority queue, run the shortest alarm, and
* decrement the other alarms after each alarm fires or is turned off *)Utils.with_context~enter:(fun()->old_handler:=Sys.signalSys.sigalrm(Sys.Signal_handleon_timeout);old_timeout:=Unix.alarmtimeout)~exit:(fun()->ignore(Unix.alarm!old_timeout);Sys.set_signalSys.sigalrm!old_handler)~do_letcheck_timeout()=()(** Channel *)typein_channel=Pervasives.in_channel*intoptionletignore_timeoutf?timeout(ic,_pid)=ignore(timeout);ficletinput=ignore_timeoutPervasives.inputletreally_input=ignore_timeoutPervasives.really_inputletinput_char=ignore_timeoutPervasives.input_charletinput_line=ignore_timeoutPervasives.input_lineletinput_value_with_workaroundic=(* OCaml 4.03.0 changed the behavior of input_value to no longer
* throw End_of_file when the pipe has closed. We can simulate that
* behavior, however, by trying to read a byte afterwards, which WILL
* raise End_of_file if the pipe has closed
* http://caml.inria.fr/mantis/view.php?id=7142 *)tryPervasives.input_valueicwithFailuremsgase->ifmsg="input_value: truncated object"thenPervasives.input_charic|>ignore;raiseeletinput_value=ignore_timeoutinput_value_with_workaroundletopen_inname=Pervasives.open_inname,Noneletclose_in(ic,_)=Pervasives.close_inicletclose_in_noerr(ic,_)=Pervasives.close_in_noerricletin_channel_of_descrfd=Unix.in_channel_of_descrfd,Noneletdescr_of_in_channel(ic,_)=Unix.descr_of_in_channelicletopen_processcmdargs=letchild_in_fd,out_fd=Unix.pipe()inletin_fd,child_out_fd=Unix.pipe()inUnix.set_close_on_execin_fd;Unix.set_close_on_execout_fd;letpid=Unix.(create_processcmdargschild_in_fdchild_out_fdstderr)inUnix.closechild_out_fd;Unix.closechild_in_fd;letic=(Unix.in_channel_of_descrin_fd,Somepid)inletoc=Unix.out_channel_of_descrout_fdin(ic,oc)letopen_process_incmdargs=letchild_in_fd,out_fd=Unix.pipe()inletin_fd,child_out_fd=Unix.pipe()inUnix.set_close_on_execin_fd;Unix.set_close_on_execout_fd;Unix.closeout_fd;letpid=Unix.(create_processcmdargschild_in_fdchild_out_fdstderr)inUnix.closechild_out_fd;Unix.closechild_in_fd;letic=(Unix.in_channel_of_descrin_fd,Somepid)inicletclose_process_in(ic,pid)=matchpidwith|None->invalid_arg"Timeout.close_process_in"|Somepid->Pervasives.close_inic;snd(Unix.waitpid[]pid)letread_process~timeout?on_timeout~readercmdargs=let(ic,oc)=open_processcmdargsinwith_timeout~timeout?on_timeout~do_:(funtimeout->tryreadertimeouticocwithexn->close_inic;close_outoc;raiseexn)letopen_connection?timeoutsockaddr=let(ic,oc)=Unix.open_connectionsockaddrinignore(timeout);((ic,None),oc)letshutdown_connection(ic,_)=Unix.shutdown_connectionicendmoduleSelect_timeout=struct(** Timeout *)typet={timeout:float;}letcreatetimeout={timeout=Unix.gettimeofday()+.timeout}letwith_timeout~timeout?on_timeout~do_=lett=create(floattimeout)intrydo_twithTimeoutasexn->matchon_timeoutwith|None->raiseexn|Someft->ft();raiseTimeoutletcheck_timeoutt=ifUnix.gettimeofday()>t.timeoutthenraiseTimeoutletget_current_timeout=function|None->-.1.|Some{timeout}->lettimeout=timeout-.Unix.gettimeofday()iniftimeout<0.thenraiseTimeout;timeout(** Channel *)typechannel={fd:Unix.file_descr;buf:Bytes.t;mutablecurr:int;mutablemax:int;mutablepid:intoption;}typein_channel=channelletbuffer_size=65536-9(* From ocaml/byterun/io.h *)letin_channel_of_descrfd=begintryUnix.set_nonblockfdwith_->(* On windows, only 'socket' need to be tagged non-blocking. *)()end;letbuf=Bytes.createbuffer_sizein{fd;buf;curr=0;max=0;pid=None}letdescr_of_in_channel{fd;_}=fdletopen_inname=letfd=Unix.openfilename[Unix.O_RDONLY;Unix.O_NONBLOCK]0o640inin_channel_of_descrfdletclose_intic=Unix.closetic.fdletclose_in_noerrtic=tryUnix.closetic.fdwith_->()letrecwaitpid_non_intrpid=tryUnix.waitpid[]pidwithUnix.Unix_error(Unix.EINTR,_,_)->waitpid_non_intrpidletclose_process_intic=matchtic.pidwith|None->invalid_arg"Timeout.close_process_in"|Somepid->close_intic;snd(waitpid_non_intrpid)letdo_read?timeouttic=lettimeout=get_current_timeouttimeoutinmatchUnix.select[tic.fd][][]timeoutwith|[],_,_->raiseTimeout|[_],_,_->letread=tryUnix.readtic.fdtic.buftic.max(buffer_size-tic.max)withUnix.Unix_error(Unix.EPIPE,_,_)->raiseEnd_of_fileintic.max<-tic.max+read;read|_::_,_,_->assertfalse(* Should never happen *)letrefill?timeouttic=tic.curr<-0;tic.max<-0;letnread=do_read?timeoutticinifnread=0thenraiseEnd_of_file;nreadletunsafe_input?timeoutticsofslen=letn=iflen>max_intthenmax_intelseleninletavail=tic.max-tic.currinifn<=availthenbegin(* There is enough to read in the buffer. *)Bytes.blittic.buftic.currsofsn;tic.curr<-tic.curr+n;nendelseifavail>0thenbegin(* Read the rest of the buffer. *)Bytes.blittic.buftic.currsofsavail;tic.curr<-tic.curr+avail;availendelsebegin(* No input to read, refill buffer. *)letnread=refill?timeoutticinletn=minnreadninBytes.blittic.buftic.currsofsn;tic.curr<-tic.curr+n;nendletinput?timeoutticsofslen=ifofs<0||len<0||ofs>Bytes.lengths-lentheninvalid_arg"input"elseunsafe_input?timeoutticsofslenletinput_char?timeouttic=iftic.curr=tic.maxthenignore(refill?timeouttic);tic.curr<-tic.curr+1;Bytes.gettic.buf(tic.curr-1)(* Read in channel until we discover a '\n' *)letinput_scan_line?timeouttic=letrecscan_lineticpos=ifpos<tic.maxthenifBytes.gettic.bufpos='\n'thenpos-tic.curr+1elsescan_linetic(pos+1)elsebeginletpos=iftic.curr<>0thenbegintic.max<-tic.max-tic.curr;Bytes.blittic.buftic.currtic.buf0tic.max;tic.curr<-0;tic.maxendelseposiniftic.max=buffer_sizethen-(tic.max-tic.curr)elseletnread=do_read?timeoutticinifnread=0then-(tic.max-tic.curr)elsebeginscan_lineticposendendinscan_linetictic.currletinput_line?timeouttic=letrecbuild_resultbufpos=function|[]->buf|hd::tl->letlen=Bytes.lengthhdinBytes.blithd0buf(pos-len)len;build_resultbuf(pos-len)tlinletrecscanacculen=letn=input_scan_line?timeoutticin(* End of file, if accu is not empty, return the last line. *)ifn=0thenbeginmatchaccuwith|[]->raiseEnd_of_file|_->build_result(Bytes.createlen)lenaccu(* New line found in the buffer. *)endelseifn>0thenbeginletresult=Bytes.create(n-1)in(* No need to keep '\n' *)ignore(unsafe_inputticresult0(n-1));ignore(input_chartic);(* Skip newline *)matchaccuwith|[]->result|_->letlen=len+n-1inbuild_result(Bytes.createlen)len(result::accu)(* New line not found in the buffer *)endelsebeginletofs=Bytes.create(-n)inignore(unsafe_inputticofs0(-n));scan(ofs::accu)(len-n)endinBytes.to_string(scan[]0)letrecunsafe_really_input?timeoutticbufofslen=iflen=0then()elseletr=unsafe_input?timeoutticbufofsleninifr=0thenraiseEnd_of_fileelseunsafe_really_input?timeoutticbuf(ofs+r)(len-r)letreally_input?timeoutticbufofslen=ifofs<0||len<0||ofs>Bytes.lengthbuf-lentheninvalid_arg"really_input"elseunsafe_really_input?timeoutticbufofslen(** Marshal *)letmarshal_magic=Bytes.of_string"\x84\x95\xA6\xBE"letinput_value?timeouttic=letmagic=Bytes.create4inBytes.setmagic0(input_char?timeouttic);Bytes.setmagic1(input_char?timeouttic);Bytes.setmagic2(input_char?timeouttic);Bytes.setmagic3(input_char?timeouttic);ifmagic<>marshal_magicthenfailwith"Select.input_value: bad object.";letb1=int_of_char(input_char?timeouttic)inletb2=int_of_char(input_char?timeouttic)inletb3=int_of_char(input_char?timeouttic)inletb4=int_of_char(input_char?timeouttic)inletlen=((b1lsl24)lor(b2lsl16)lor(b3lsl8)lorb4)+12inletdata=Bytes.create(len+8)inBytes.blitmagic0data04;Bytes.setdata4(char_of_intb1);Bytes.setdata5(char_of_intb2);Bytes.setdata6(char_of_intb3);Bytes.setdata7(char_of_intb4);begintryunsafe_really_input?timeoutticdata8lenwithEnd_of_file->failwith"Select.input_value: truncated object."end;Marshal.from_bytesdata0(** Process *)letopen_processcmdargs=letchild_in_fd,out_fd=Unix.pipe()inletin_fd,child_out_fd=Unix.pipe()inUnix.set_close_on_execin_fd;Unix.set_close_on_execout_fd;letpid=Unix.(create_processcmdargschild_in_fdchild_out_fdstderr)inUnix.closechild_out_fd;Unix.closechild_in_fd;lettic=in_channel_of_descrin_fdintic.pid<-Somepid;letoc=Unix.out_channel_of_descrout_fdin(tic,oc)letopen_process_incmdargs=letchild_in_fd,out_fd=Unix.pipe()inletin_fd,child_out_fd=Unix.pipe()inUnix.set_close_on_execin_fd;Unix.set_close_on_execout_fd;Unix.closeout_fd;letpid=Unix.(create_processcmdargschild_in_fdchild_out_fdstderr)inUnix.closechild_out_fd;Unix.closechild_in_fd;lettic=in_channel_of_descrin_fdintic.pid<-Somepid;ticletread_process~timeout?on_timeout~readercmdargs=let(tic,oc)=open_processcmdargsinleton_timeout()=Hack_option.iter~f:Sys_utils.terminate_processtic.pid;tic.pid<-None;matchon_timeoutwith|None->raiseTimeout|Somef->f()inwith_timeout~timeout~on_timeout~do_:(funtimeout->tryreadertimeoutticocwithexn->Hack_option.iter~f:Sys_utils.terminate_processtic.pid;tic.pid<-None;close_intic;close_outoc;raiseexn)(** Socket *)letopen_connection?timeoutsockaddr=letconnectsocksockaddr=tryUnix.connectsocksockaddr;with|Unix.Unix_error((Unix.EINPROGRESS|Unix.EWOULDBLOCK),_,_)->beginlettimeout=get_current_timeouttimeoutinmatchUnix.select[][sock][]timeoutwith|_,[],_->raiseTimeout|_,[_sock],_->()|_,_,_->assertfalseend|exn->Unix.closesock;raiseexninletsock=Unix.socket(Unix.domain_of_sockaddrsockaddr)Unix.SOCK_STREAM0inUnix.set_nonblocksock;connectsocksockaddr;Unix.clear_nonblocksock;Unix.set_close_on_execsock;lettic=in_channel_of_descrsockinletoc=Unix.out_channel_of_descrsockin(tic,oc)letshutdown_connection{fd;_}=Unix.(shutdownfdSHUTDOWN_SEND)endmoduletypeS=sigtypetvalwith_timeout:timeout:int->?on_timeout:(unit->unit)->do_:(t->'a)->'avalcheck_timeout:t->unittypein_channelvalin_channel_of_descr:Unix.file_descr->in_channelvaldescr_of_in_channel:in_channel->Unix.file_descrvalopen_in:string->in_channelvalclose_in:in_channel->unitvalclose_in_noerr:in_channel->unitvalinput:?timeout:t->in_channel->bytes->int->int->intvalreally_input:?timeout:t->in_channel->bytes->int->int->unitvalinput_char:?timeout:t->in_channel->charvalinput_line:?timeout:t->in_channel->stringvalinput_value:?timeout:t->in_channel->'avalopen_process:string->stringarray->in_channel*out_channelvalopen_process_in:string->stringarray->in_channelvalclose_process_in:in_channel->Unix.process_statusvalread_process:timeout:int->?on_timeout:(unit->unit)->reader:(t->in_channel->out_channel->'a)->string->stringarray->'avalopen_connection:?timeout:t->Unix.sockaddr->in_channel*out_channelvalshutdown_connection:in_channel->unitendletselect=(moduleSelect_timeout:S)letalarm=(moduleAlarm_timeout:S)include(val(ifSys.win32thenselectelsealarm))letread_connection~timeout?on_timeout~readersockaddr=with_timeout~timeout?on_timeout~do_:(funtimeout->let(tic,oc)=open_connection~timeoutsockaddrintryreadertimeoutticocwithexn->close_outoc;raiseexn)