1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495openLwt.Infix(* Slightly rude to set signal handlers in a library, but SIGPIPE makes no sense
in a modern application. *)let()=ifnotSys.win32thenSys.(set_signalsigpipeSignal_ignore)typeflow={fd:Lwt_unix.file_descr;mutablecurrent_write:intLwt.toption;mutablecurrent_read:intLwt.toption;mutableclosed:bool;}typeerror=[`Exceptionofexn]typewrite_error=[`Closed|`Exceptionofexn]letopt_cancel=function|None->()|Somex->Lwt.cancelxletcloset=ift.closedthenLwt.return_unitelse(t.closed<-true;opt_cancelt.current_read;opt_cancelt.current_write;Lwt_unix.closet.fd)letpp_errorf=function|`Exceptionex->Fmt.exnfex|`Closed->Fmt.stringf"Closed"letpp_write_error=pp_errorletwritetbuf=letrecauxbuf=ift.closedthenLwt.return(Error`Closed)else(assert(t.current_write=None);letwrite_thread=Lwt_cstruct.writet.fdbufint.current_write<-Somewrite_thread;write_thread>>=funwrote->t.current_write<-None;ifwrote=Cstruct.lengthbufthenLwt.return(Ok())elseaux(Cstruct.shiftbufwrote))inLwt.catch(fun()->auxbuf)(function|Unix.Unix_error(Unix.ECONNRESET,_,_)|Unix.Unix_error(Unix.ENOTCONN,_,_)(* macos *)|Unix.Unix_error(Unix.EPIPE,_,_)->Lwt.return@@Error`Closed|ex->Lwt.return@@Error(`Exceptionex))letrecwritevt=function|[]->Lwt.return(Ok())|x::xs->writetx>>=function|Ok()->writevtxs|Error_ase->Lwt.returneletreadt=letlen=4096inletbuf=Cstruct.create_unsafeleninLwt.try_bind(fun()->assert(t.current_read=None);ift.closedthenraiseLwt.Canceled;letread_thread=Lwt_cstruct.readt.fdbufint.current_read<-Someread_thread;read_thread)(function|0->Lwt.return@@Ok`Eof|got->t.current_read<-None;Lwt.return@@Ok(`Data(Cstruct.subbuf0got)))(function|Lwt.Canceled|Unix.Unix_error(Unix.EPIPE,_,_)|Unix.Unix_error(Unix.ECONNRESET,_,_)->Lwt_result.return`Eof|ex->Lwt.return@@Error(`Exceptionex))letconnect?switchfd=lett={fd;closed=false;current_read=None;current_write=None}inLwt_switch.add_hookswitch(fun()->closet);tletsocketpair?switch()=leta,b=Lwt_unix.(socketpairPF_UNIXSOCK_STREAM0)inconnect?switcha,connect?switchb