123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396moduletypeRUNTIME=sigtypetvalnext_read_operation:t->[`Read|`Yield|`Close](** [next_read_connection t] returns a value describing the next operation
that the caller should conduit on behalf of the connection. *)valread:t->Bigstringaf.t->off:int->len:int->int(** [read t bigstring ~off ~len] reads bytes of input from the provided range
of [bigstring] an returns the number of bytes consumed by the connection.
{!read} should be called after {!next_read_operation} returns a [`Read]
value an additional input is available for the connection to consume. *)valread_eof:t->Bigstringaf.t->off:int->len:int->int(** [read_eof t bigstring ~off ~len] reads bytes of input from the provided
range of [bigstring] and returns the number of bytes consumed by the
connection. {!read_eof} should be called after {!next_read_operation}
returns a [`Read] and an EOF has been received from the communication
channel. The connection will attempt to consume any buffered input and
then shutdown the HTTP parser for the connection. *)valyield_reader:t->(unit->unit)->unit(** [yield_reader t continue] registers with the connection to call [continue]
when reading should resume. {!yield_reader} should be called after
{!next_read_operation} returns a [`Yield] value. *)valnext_write_operation:t->[`WriteofBigstringaf.tFaraday.ioveclist|`Yield|`Closeofint](** [next_write_operation t] returns a value describing the next operation
that the caller should conduct on behalf the connection. *)valreport_write_result:t->[`Okofint|`Closed]->unit(** [report_write_result t result] reports the result of the latest write
attempt to the connection. {!report_write_result} should be called after a
call to {!next_write_operation} that returns a [`Write buffer] value.
- [`Ok n] indicates that the caller successfully wrote [n] bytes of output
from the buffer that the caller was provided by {!next_write_operation}
that returns a [`Write buffer] value.
- [`Closed] indicates that the output destination will no longer accept
bytes from the write processor. *)valyield_writer:t->(unit->unit)->unit(** [yield_writer t continue] registers with the connection to call [continue]
when writing should resume. {!yield_writer} should be called after
{!next_write_operation} returns a [`Yield] value. *)valreport_exn:t->exn->unit(** [report_exn t exn] reports that an error [exn] has been caught and that it
has been attributed to [t]. Calling this function will switch [t] into an
error state. Depending on the tate [t] is transitioning from, it may call
its error handler before terminating the connection. *)valis_closed:t->bool(** [is_closed t] is [true] if both the read and write processors have been
shutdown. When this is the case {!next_read_operation} will return
[`Close _] and {!next_write_operation} will return a [`Write _] until all
buffered output has been flushed, at which point it will return [`Close]. *)valshutdown:t->unit(** [shutdown t] asks to shutdown the connection. *)endtype'connruntime=(moduleRUNTIMEwithtypet='conn)exceptionFlowofstringexceptionFlow_writeofstringletsrc=Logs.Src.create"paf-flow"moduleLog_flow=(valLogs.src_logsrc:Logs.LOG)moduleMake(Flow:Mirage_flow.S)=structtypeflow={flow:Flow.flow;queue:(char,Bigarray.int8_unsigned_elt)Ke.Rke.t;mutablerd_closed:bool;mutablewr_closed:bool;}letcreateflow=letqueue=Ke.Rke.create~capacity:0x1000Bigarray.charinLwt.return{flow;queue;rd_closed=false;wr_closed=false}letsafely_closeflow=ifflow.rd_closed&&flow.wr_closedthen(Log_flow.debug(funm->m"Close the connection.");Flow.closeflow.flow)elseLwt.return()letblitsrcsrc_offdstdst_offlen=letdst=Cstruct.of_bigarray~off:dst_off~lendstinCstruct.blitsrcsrc_offdst0lenopenLwt.Infixtypeeof=[`Eof]letrecvflow~report_error~report_closed~read~read_eof=Ke.Rke.compressflow.queue;Flow.readflow.flow>>=function|(Error_|Ok#eof)asv->flow.rd_closed<-true;safely_closeflow>>=fun()->let_shift=matchKe.Rke.compressflow.queue;Ke.Rke.N.peekflow.queuewith|[]->read_eofBigstringaf.empty~off:0~len:0|[slice]->read_eofslice~off:0~len:(Bigstringaf.lengthslice)|_->assertfalse(* XXX(dinosaure): impossible due to [compress]. *)in(matchvwith|Ok`Eof->report_closed()|Errorerr->report_errorerr);Lwt.return`Closed|Ok(`Datav)->letlen=Cstruct.lengthvinKe.Rke.N.pushflow.queue~blit~length:Cstruct.length~off:0~lenv;let[@warning"-8"](slice::_)=Ke.Rke.N.peekflow.queueinletshift=readslice~off:0~len:(Bigstringaf.lengthslice)inKe.Rke.N.shift_exnflow.queueshift;Lwt.return`Continueletwritev~report_errorflowiovecs=letiovecs=List.map(fun{Faraday.buffer;off;len}->Cstruct.to_string(Cstruct.of_bigarraybuffer~off~len)~off:0~len)iovecsinletiovecs=List.mapCstruct.of_stringiovecsin(* XXX(dinosaure): the copy is needed:
1) [Mirage_flow.S] explicitly says that [write] takes the ownership on
the given [Cstruct.t]
2) [ocaml-h2] wants to keep the ownership on given [Faraday.iovec]s
To protect one from the other, copying is necessary. *)Log_flow.debug(funm->m"Start to write %d byte(s)."(List.fold_left(funacccs->Cstruct.lengthcs+acc)0iovecs));Flow.writevflow.flowiovecs>>=function|Ok()->Lwt.return(`Ok(List.fold_left(funacccs->acc+Cstruct.lengthcs)0iovecs))|Errorerr->Log_flow.err(funm->m"Got an error when we wrote something: %a."Flow.pp_write_errorerr);report_errorerr;flow.wr_closed<-true;safely_closeflow>>=fun()->Lwt.return`Closedletsend~report_errorflowiovecs=ifflow.wr_closedthensafely_closeflow>>=fun()->Lwt.return`Closedelsewritev~report_errorflowiovecsletcloseflow=match(flow.rd_closed,flow.wr_closed)with|true,true->Lwt.return_unit|_->flow.rd_closed<-true;flow.wr_closed<-true;Flow.closeflow.flowendletsrc=Logs.Src.create"paf-server"moduleLog_server=(valLogs.src_logsrc:Logs.LOG)moduleServer(Flow:Mirage_flow.S)(Runtime:RUNTIME):sigvalserver:Runtime.t->Flow.flow->unitLwt.tend=structmoduleEasy_flow=Make(Flow)openLwt.Infixletto_flow_exceptionerr:exn=Flow(Fmt.str"%a"Flow.pp_errorerr)letto_flow_write_exceptionerr:exn=Flow_write(Fmt.str"%a"Flow.pp_write_errorerr)letserverconnectionflow=Easy_flow.createflow>>=funflow->letrd_exit,notify_rd_exit=Lwt.wait()inletwr_exit,notify_wr_exit=Lwt.wait()inletrecrd_fiber()=letreport_errorerr=Runtime.report_exnconnection(to_flow_exceptionerr)inletrecgo()=Log_server.debug(funm->m"Compute next read operation.");matchRuntime.next_read_operationconnectionwith|`Read->Log_server.debug(funm->m"next read operation: `read");Easy_flow.recvflow~report_error~report_closed:ignore~read:(Runtime.readconnection)~read_eof:(Runtime.read_eofconnection)>>=fun_->Lwt.pause()>>=go|`Yield->Log_server.debug(funm->m"next read operation: `yield");Runtime.yield_readerconnectionrd_fiber;Lwt.pause()|`Close->Log_server.debug(funm->m"next read operation: `close");Lwt.wakeup_laternotify_rd_exit();Flow.shutdownflow.flow`readinLwt.async@@fun()->Lwt.catchgo(funexn->Runtime.report_exnconnectionexn;Lwt.return_unit)inletrecwr_fiber()=letreport_errorerr=Runtime.report_exnconnection(to_flow_write_exceptionerr)inletrecgo()=Log_server.debug(funm->m"Compute next write operation.");matchRuntime.next_write_operationconnectionwith|`Writeiovecs->Log_server.debug(funm->m"next write operation: `write");Easy_flow.send~report_errorflowiovecs>>=funres->Runtime.report_write_resultconnectionres;Lwt.pause()>>=go|`Yield->Log_server.debug(funm->m"next write operation: `yield");Runtime.yield_writerconnectionwr_fiber;Lwt.pause()|`Close_->Log_server.debug(funm->m"next write operation: `close");Lwt.wakeup_laternotify_wr_exit();Flow.shutdownflow.flow`writeinLwt.async@@fun()->Lwt.catchgo(funexn->(* Runtime.report_write_result connection `Closed ; *)Runtime.report_exnconnectionexn;Lwt.return_unit)inrd_fiber();wr_fiber();Lwt.join[rd_exit;wr_exit]>>=fun()->Log_server.debug(funm->m"End of transmission.");Easy_flow.closeflowendletsrc=Logs.Src.create"paf-client"moduleLog_client=(valLogs.src_logsrc:Logs.LOG)moduleClient(Flow:Mirage_flow.S)(Runtime:RUNTIME):sigvalrun:Runtime.t->Flow.flow->unitLwt.tend=structopenLwt.InfixmoduleEasy_flow=Make(Flow)letto_flow_exceptionerr:exn=Flow(Fmt.str"%a"Flow.pp_errorerr)letto_flow_write_exceptionerr:exn=Flow_write(Fmt.str"%a"Flow.pp_write_errorerr)letrunconnectionflow=Easy_flow.createflow>>=funflow->letrd_exit,notify_rd_exit=Lwt.wait()inletwr_exit,notify_wr_exit=Lwt.wait()inletrecrd_fiber()=letreport_errorerr=Runtime.report_exnconnection(to_flow_exceptionerr)inletrecgo()=matchRuntime.next_read_operationconnectionwith|`Read->Log_client.debug(funm->m"next read operation: `read");Easy_flow.recvflow~report_error~report_closed:ignore~read:(Runtime.readconnection)~read_eof:(Runtime.read_eofconnection)>>=fun_->Lwt.pause()>>=go|`Yield->Log_client.debug(funm->m"next read operation: `yield");Runtime.yield_readerconnectionrd_fiber;Lwt.pause()|`Close->Log_client.debug(funm->m"next read operation: `close.");Lwt.wakeup_laternotify_rd_exit();flow.Easy_flow.rd_closed<-true;Easy_flow.safely_closeflowinLwt.async@@fun()->Lwt.catchgo(funexn->Runtime.report_exnconnectionexn;Lwt.return_unit)inletrecwr_fiber()=letreport_errorerr=Runtime.report_exnconnection(to_flow_write_exceptionerr)inletrecgo()=matchRuntime.next_write_operationconnectionwith|`Writeiovecs->Log_client.debug(funm->m"next write operation: `write.");Easy_flow.send~report_errorflowiovecs>>=funres->Runtime.report_write_resultconnectionres;Lwt.pause()>>=go|`Yield->Log_client.debug(funm->m"next write operation: `yield.");Runtime.yield_writerconnectionwr_fiber;Lwt.pause()|`Close_->Log_client.debug(funm->m"next write operation: `close.");Lwt.wakeup_laternotify_wr_exit();flow.Easy_flow.wr_closed<-true;Easy_flow.safely_closeflowinLwt.async@@fun()->Lwt.catchgo(funexn->Runtime.report_exnconnectionexn;Lwt.return())inwr_fiber();rd_fiber();Lwt.join[rd_exit;wr_exit]>>=fun()->Log_client.debug(funm->m"End of transmission.");Easy_flow.closeflowendtypeimpl=Runtime:'connruntime*'conn->impltype'tservice=|Service:{accept:'t->('socket,([>`Closed]as'error))resultLwt.t;handshake:'socket->('flow,([>`Closed]as'error))resultLwt.t;connection:'flow->(Mimic.flow*impl,'error)resultLwt.t;close:'t->unitLwt.t;}->'tserviceand('t,'socket,'flow,'error)posix={accept:'t->('socket,'error)resultLwt.t;handshake:'socket->('flow,'error)resultLwt.t;close:'t->unitLwt.t;}constraint'error=[>`Closed]letserviceconnectionhandshakeacceptclose=Service{accept;connection;handshake;close}openLwt.Infixletserve_when_ready:typetsocketflow.(t,socket,flow,_)posix->?stop:Lwt_switch.t->handler:(flow->unitLwt.t)->t->[`InitializedofunitLwt.t]=funservice?stop~handlert->let{accept;handshake;close}=servicein`Initialized(letswitched_off=lett,u=Lwt.wait()inLwt_switch.add_hookstop(fun()->Lwt.wakeup_lateru(Ok`Stopped);Lwt.return_unit);tinletrecloop()=acceptt>>=function|Oksocket->Lwt.async(fun()->handshakesocket>>=function|Okflow->handlerflow|Error`Closed->Logs.info(funm->m"Connection closed by peer");Lwt.return()|Error_err->Logs.err(funm->m"Got an error from a TCP/IP connection.");Lwt.return());loop()|Error`Closed->Lwt.return_error`Closed|Error_->Lwt.pause()>>=loopinletstop_result=Lwt.pick[switched_off;loop()]>>=function|Ok`Stopped->closet>>=fun()->Lwt.return_ok()|Error_aserr->closet>>=fun()->Lwt.returnerrinstop_result>>=functionOk()|Error`Closed->Lwt.return_unit)letserver:typet.truntime->t->Mimic.flow->unitLwt.t=fun(moduleRuntime)connflow->letmoduleServer=Server(Mimic)(Runtime)inServer.serverconnflowletserve?stopservicet=let(Service{accept;handshake;connection;close})=serviceinlethandlerflow=connectionflow>>=function|Ok(flow,Runtime(runtime,conn))->serverruntimeconnflow|Error_->Lwt.return_unitinserve_when_ready?stop~handler{accept;handshake;close}tletrun:typet.truntime->t->Mimic.flow->unitLwt.t=fun(moduleRuntime)connflow->letmoduleClient=Client(Mimic)(Runtime)inClient.runconnflow