123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251(** Asynchronous IO helpers *)openPreludemoduleEv=LibeventmoduleInternal=structletlog=Log.from"async"endopenInternal(** Create a new event or use the provided [ev] and make it persistent
with the infinite timeout (or use the provided [timeout]).
Schedule this event with provided callback [f].
Don't forget [del] to unschedule. *)letsimple_eventevents?(ev=Ev.create())?timeoutfdflagsf=Ev.seteventsevfdflags~persist:true(funfdflags->tryfevfdflagswithexn->log#warn~exn"simple_event");Ev.addevtimeout;evletsetup_simple_eventevents?ev?timeoutfdflagsf=let(_:Ev.event)=simple_eventevents?ev?timeoutfdflagsfin()typeresult=End|Dataofint|Block|Exnofexnletread_somefdbufofslen=trymatchUnix.readfdbufofslenwith|0->End|n->Datanwith|Unix.Unix_error((Unix.EAGAIN|Unix.EWOULDBLOCK),_,_)->Block|exn->log#warn~exn"read_some";Exnexnletwrite_somefdbufofslen=tryUnix.write_substringfdbufofslenwith|Unix.Unix_error((Unix.EAGAIN|Unix.EWOULDBLOCK),_,_)->0(** Read out all immediately available input (no blocking)
@return `Limit when [limit] is exceeded, `Chunk (data,final) otherwise
*)letread_available~limitfd=letbuf=Buffer.create1024inlets=Bytes.create1024inletrecloop()=matchread_somefds0(Bytes.lengths)with|End->`Chunk(Buffer.contentsbuf,true)|Block->`Chunk(Buffer.contentsbuf,false)|Exnexn->raiseexn|Datalen->Buffer.add_substringbuf(Bytes.to_strings)0len;ifBuffer.lengthbuf>limitthen`Limit(Buffer.contentsbuf)elseloop()inloop()letshow_error=function|`Timeout->"timeout"|`EofImm->"eof (immediate)"|`Eof->"eof (async)"|`Exnexn->Printf.sprintf"exn %s (async)"(Exn.strexn)|`ExnImmexn->Printf.sprintf"exn %s (immediate)"(Exn.strexn)(** [read_buf buf fd err k] - asynchronously fill [buf] with data from [fd] and call [k buf] when done (buffer is full).
[fd] should be nonblocking. Call [err] on error (EOF). *)letread_bufbase?ev?timeoutbuffderrk=letlen=Bytes.lengthbufinletlatercur=letcur=refcurinsetup_simple_eventbase?ev?timeoutfd[Ev.READ](funevfdflags->matchflagswith|Ev.TIMEOUT->Ev.delev;err`Timeout!cur|Ev.WRITE|Ev.SIGNAL->assertfalse|Ev.READ->matchread_somefdbuf!cur(len-!cur)with|End->Ev.delev;err`Eof!cur|Exnexn->Ev.delev;err(`Exnexn)!cur|Datan->cur:=!cur+n;if!cur=lenthenbeginEv.delev;kbufend|Block->log#warn"Async.read_buf: useless wakeup on fd %d, ignoring"(U.int_of_file_descrfd))inmatchread_somefdbuf0lenwith|End->err`EofImm0|Exnexn->err(`ExnImmexn)0|Datanwhenn=len->kbuf|Block->later0|Datan->laternletread_nbase?ev?timeoutnfderrk=read_bufbase?ev?timeout(Bytes.createn)fderr(funbuf->k(Bytes.unsafe_to_stringbuf))(** Call [f] with [delay]-second pauses between invocations.
Set [stop] to [true] to stop the timer.
NB do not [Ev.del] the event inside the [f] callback. *)letperiodic_timer_0eventsstopfirst_delaydelay?(name="")f=lettimer=Ev.create()inEv.set_timereventstimer~persist:falsebeginfun()->ifnot!stopthenbegintryf()withexn->log#warn~exn"periodic_timer %s"nameend;ifnot!stopthenEv.addtimer(Somedelay);end;ifnot!stopthenEv.addtimer(Somefirst_delay);timerletperiodic_timer_nowevents?(stop=reffalse)delay?namef=periodic_timer_0eventsstop0.delay?namefletperiodic_timer_waitevents?(stop=reffalse)delay?namef=periodic_timer_0eventsstopdelaydelay?namefletsetup_periodic_timer_nowevents?stopdelay?namef=let(_:Ev.event)=periodic_timer_nowevents?stopdelay?namefin()letsetup_periodic_timer_waitevents?stopdelay?namef=let(_:Ev.event)=periodic_timer_waitevents?stopdelay?namefin()(*
(** Call [f] with [delay]-second pauses between invocations.
Set [stop] to [true] to stop the timer.
NB do not [Ev.del] the event inside the [f] callback. *)
let periodic_timer_0 stop first_delay delay ?(name="") f =
let rec loop () =
if not !stop then begin try f () with exn -> log #warn ~exn "periodic_timer %s" name end;
if not !stop then Ev.add timer (Some delay);
end;
if not !stop then Ev.add timer (Some first_delay);
timer
let periodic_timer_now events ?(stop=ref false) delay ?name f = periodic_timer_0 events stop 0. delay ?name f
let periodic_timer_wait events ?(stop=ref false) delay ?name f = periodic_timer_0 events stop delay delay ?name f
*)modulePeer=structtypet={events:Ev.event_base;read:Ev.event;write:Ev.event;timeout:floatoption;fd:Unix.file_descr;addr:Unix.sockaddr;err:(unit->unit);}letcreateevents?(err=id)?timeout(fd,addr)=Unix.set_nonblockfd;{events=events;fd=fd;addr=addr;timeout=timeout;read=Ev.create();write=Ev.create();err=err;}letadd_eventpevtimeout=lettimeout=iftimeout=Nonethenp.timeoutelsetimeoutinEv.addevtimeoutletfinishp=Ev.delp.read;Ev.delp.write;begintryUnix.shutdownp.fdUnix.SHUTDOWN_ALLwith_->()end;Unix.closep.fdleterror?exnpmsg=log#warn?exn"Peer %s %s"(Nix.show_addrp.addr)msg;Std.finally(fun()->finishp)p.err()letreceivep?timeoutbufk=letlen=Bytes.lengthbufinletlatercur=letcur=refcurinEv.setp.eventsp.readp.fd[Ev.READ]~persist:true(funfdflags->trymatchflagswith|Ev.TIMEOUT->errorp"receive timeout"|Ev.WRITE->assertfalse|Ev.SIGNAL->assertfalse|Ev.READ->matchread_somefdbuf!cur(len-!cur)with|End->errorp"receive eof"|Exnexn->error~exnp"receive"|Datan->cur:=!cur+n;if!cur=lenthen(Ev.delp.read;kbuf)|Block->log#warn"Async.receive: useless wakeup on fd %d, ignoring"(U.int_of_file_descrfd)withexn->error~exnp"receive");add_eventpp.readtimeoutinlater0(*
match read_some p.fd buf 0 len with
| End -> error p "receive eof"
| Exn exn -> error ~exn p "receive"
| Data n when n = len -> k buf
| Block -> later 0
| Data n -> later n
*)letsendp?timeoutbufk=letlen=String.lengthbufinletlatercur=letcur=refcurinEv.setp.eventsp.writep.fd[Ev.WRITE]~persist:true(funfdflags->trymatchflagswith|Ev.TIMEOUT->errorp"send timeout"|Ev.READ->assertfalse|Ev.SIGNAL->assertfalse|Ev.WRITE->cur:=!cur+write_somefdbuf!cur(len-!cur);if!cur=lenthen(Ev.delp.write;k())withexn->error~exnp"send");add_eventpp.writetimeoutinlater0(*
match try Some (write_some p.fd buf 0 len) with exn -> error ~exn p "send"; None with
| Some n when n = len -> k ()
| Some n -> later n
| None -> ()
*)letsend_allp?timeoutbufsk=letrecloop=function|[]->k()|x::xs->sendp?timeoutx(fun()->loopxs)inloopbufsletconnectp?timeoutk=tryUnix.connectp.fdp.addrwith|Unix.Unix_error(Unix.EINPROGRESS,_,_)->Ev.setp.eventsp.readp.fd[Ev.READ]~persist:false(funfdflags->trymatchflagswith|Ev.TIMEOUT->errorp"connect timeout"|Ev.WRITE->assertfalse|Ev.SIGNAL->assertfalse|Ev.READ->matchUnix.getsockopt_errorfdwith|Someerr->errorp~exn:(Unix.Unix_error(err,"connect",""))"connect"|None->k()withexn->error~exnp"connect");add_eventpp.readtimeout|exn->error~exnp"connect"end(* Peer *)letdelayeventstimeoutfx=lettimer=Ev.create()inEv.set_timereventstimer~persist:falsebeginfun()->begintryfxwithexn->log#warn~exn"pause"end;Ev.deltimer;end;Ev.addtimer(Sometimeout)letpollevents=Ev.loopeventsEv.NONBLOCK