letsrc=Logs.Src.create"runtime"let_minor=(Sys.word_size/8*256)-1externalreraise:exn->'a="%reraise"moduleLog=(valLogs.src_logsrc:Logs.LOG)moduletypeS=sigtypetvalnext_read_operation:t->[`Read|`Yield|`Close|`Upgrade]valread:t->Bigstringaf.t->off:int->len:int->intvalread_eof:t->Bigstringaf.t->off:int->len:int->intvalyield_reader:t->(unit->unit)->unitvalnext_write_operation:t->[`WriteofBigstringaf.tFaraday.ioveclist|`Closeofint|`Yield|`Upgrade]valreport_write_result:t->[`Okofint|`Closed]->unitvalyield_writer:t->(unit->unit)->unitvalreport_exn:t->exn->unitvalis_closed:t->boolendmoduleBuffer:sigtypetvalcreate:int->tvalget:t->fn:(Bigstringaf.t->off:int->len:int->int)->intvalput:t->fn:(Bigstringaf.t->off:int->len:int->int)->intend=structtypet={mutablebuffer:Bigstringaf.t;mutableoff:int;mutablelen:int}letcreatesize=letbuffer=Bigstringaf.createsizein{buffer;off=0;len=0}letcompresst=ift.len=0thenbegint.off<-0;t.len<-0endelseift.off>0thenbeginBigstringaf.blitt.buffer~src_off:t.offt.buffer~dst_off:0~len:t.len;t.off<-0endletgett~fn=letn=fnt.buffer~off:t.off~len:t.lenint.off<-t.off+n;t.len<-t.len-n;ift.len=0thent.off<-0;nletputt~fn=compresst;letoff=t.off+t.leninletbuf=t.bufferinifBigstringaf.lengthbuf=t.lenthenbegint.buffer<-Bigstringaf.create(2*Bigstringaf.lengthbuf);Bigstringaf.blitbuf~src_off:t.offt.buffer~dst_off:0~len:t.lenend;letn=fnt.buffer~off~len:(Bigstringaf.lengtht.buffer-off)int.len<-t.len+n;nendletempty_bt=Printexc.get_callstackmax_intletrecterminateorphans=matchMiou.careorphanswith|None->Miou.yield()|SomeNone->Miou.yield();terminateorphans|Some(Someprm)->(matchMiou.awaitprmwith|Ok()->terminateorphans|Errorexn->Log.err(funm->m"unexpected exception from an asynchronous task: %S"(Printexc.to_stringexn));terminateorphans)letreccleanorphans=matchMiou.careorphanswith|None->Miou.yield()|SomeNone->Miou.yield()|Some(Someprm)->beginmatchMiou.awaitprmwith|Ok()->cleanorphans|Errorexn->Log.err(funm->m"unexpected exception from an asynchronous task: %S"(Printexc.to_stringexn));cleanorphansendexceptionClosed_by_peer=Flow.Closed_by_peermoduleMake(Flow:Flow.S)(Runtime:S)=structtypeconn=Runtime.ttypeflow=Flow.tletshutdownflowcmd=tryFlow.shutdownflowcmdwithexn->Log.err(funm->m"error when we shutdown: %S"(Printexc.to_stringexn))(* TODO(dinosaure): It can happen that we try to shutdown a connection when it
is already closed (it all depends on the behavior of the peer). It seems
that the closing of a socket between two peers via HTTP is not as
standardized as all that. Thus, shutdown can raise an exception (saying
that the connection has already been closed by the peer).
We could check before attempting to shutdown the connection instead of
ignoring the exception that may have been raised. *)letrecvflowbuffer=letbytes_read=Buffer.putbuffer~fn:(funbstr~off:dst_off~len->letlen=minlen_minorinletbuf=Bytes.createlenintryletlen'=Flow.readflowbuf~off:0~leninBigstringaf.blit_from_bytesbuf~src_off:0bstr~dst_off~len:len';len'withexn->Flow.closeflow;reraiseexn)inifbytes_read=0then`Eofelse`Okbytes_readletrecsplitaccbstrofflen=iflen<=_minorthenList.rev(Bigstringaf.substringbstr~off~len::acc)elseletlen'=minlen_minorinletstr=Bigstringaf.substringbstr~off~len:len'insplit(str::acc)bstr(off+len')(len-len')letwritevflowbstrs=letstrss=List.map(fun{Faraday.buffer;off;len}->split[]bufferofflen)bstrsinletlen=List.fold_left(funa{Faraday.len;_}->a+len)0bstrsintryList.iter(List.iter(Flow.writeflow))strss;`Oklenwith|Closed_by_peer->`Closed|_exn->Flow.closeflow;`Closedtypet={src:Logs.src;conn:Runtime.t;flow:Flow.t;tasks:(unit->unit)Queue.t;buffer:Buffer.t;stop:boolref;upgrade:unitMiou.Computation.t;lock:Miou.Mutex.t;cond:Miou.Condition.t}letreadert=letrecprotected()=matchRuntime.next_read_operationt.connwith|`Read->Logs.debug~src:t.src(funm->m"+reader");letfn=matchrecvt.flowt.bufferwith|`Eof->Logs.debug~src:t.src(funm->m"+reader eof");Runtime.read_eoft.conn|`Oklen->Logs.debug~src:t.src(funm->m"+reader %d byte(s)"len);Runtime.readt.conninlet_=Buffer.gett.buffer~fninprotected()|`Yield->letk()=Miou.Mutex.protectt.lock@@fun()->Queue.pushgot.tasks;Miou.Condition.signalt.condinLogs.debug~src:t.src(funm->m"+reader yield");Runtime.yield_readert.connk|`Close->shutdownt.flow`read;t.stop:=true;Logs.debug~src:t.src(funm->m"+reader closed")|`Upgrade->Logs.debug~src:t.src(funm->m"+reader upgrade");ignore(Miou.Computation.try_returnt.upgrade())andfinally()=Logs.debug~src:t.src(funm->m"+reader signals");Miou.Mutex.protectt.lock@@fun()->Miou.Condition.signalt.condandgo()=Fun.protect~finallyprotectedingoletwritert=letrecprotected()=matchRuntime.next_write_operationt.connwith|`Writeiovecs->letfnacc{Faraday.len;_}=acc+leninletlen=List.fold_leftfn0iovecsinLogs.debug~src:t.src(funm->m"+write %d byte(s)"len);writevt.flowiovecs|>Runtime.report_write_resultt.conn;protected()|`Yield->letk()=Miou.Mutex.protectt.lock@@fun()->Queue.pushgot.tasks;Miou.Condition.signalt.condinLogs.debug~src:t.src(funm->m"+writer yield");Runtime.yield_writert.connk|`Close_->shutdownt.flow`write;t.stop:=true;Logs.debug~src:t.src(funm->m"+writer closed")|`Upgrade->Logs.debug~src:t.src(funm->m"+writer upgrade");ignore(Miou.Computation.try_returnt.upgrade())andfinally()=Logs.debug~src:t.src(funm->m"+writer signals");Miou.Mutex.protectt.lock@@fun()->Miou.Condition.signalt.condandgo()=Fun.protect~finallyprotectedingo(* NOTE(dinosaure): report exception only once. *)letreport_exnsrcerrorconnexn=Logs.err~src(funm->m"user's exception: %s"(Printexc.to_stringexn));if!error=falsethenbeginRuntime.report_exnconnexn;error:=trueendletrecterminatesrcerrorconnorphans=matchMiou.careorphanswith|None->()|SomeNone->Miou.yield();terminatesrcerrorconnorphans|Some(Someprm)->beginmatchMiou.awaitprmwith|Ok()->terminatesrcerrorconnorphans|Errorexn->report_exnsrcerrorconnexn;terminatesrcerrorconnorphansendletreccleansrcerrorconnorphans=matchMiou.careorphanswith|SomeNone|None->Miou.yield()|Some(Someprm)->beginmatchMiou.awaitprmwith|Ok()->cleansrcerrorconnorphans|Errorexn->report_exnsrcerrorconnexn;cleansrcerrorconnorphansend(* NOTE(dinosaure): [Runtime] design is a "runner" process that is awaiting
tasks. At the very beginning, we launch 2 tasks (one for reading and one
for writing). These can involve the creation of new tasks (via [`Yield]).
To respect the rule of relationship between tasks, the creation of these
is not done directly via [Miou.async] but transmitted to our "runner"
process via a queue.
It is then our runner which will really create these tasks (and probably
clean up the previous ones). To prevent "runner" from being a hot-loop, a
mutex and a condition are used so that the process is waiting for a change
of state (the addition of a new task or a change of state of [conn] after
one of the tasks has finished).
We trust [Runtime.is_closed] to complete our process, but it seems that it
cannot be fully trusted. There are [s_rd] and [s_wr] which
determine the status of the socket (whether it is closed for reading and/or
writing). These are not currently used but may be complementary in
determining the shutdown of [runner]. *)letrunconn?(src=src)?(read_buffer_size=_minor)?upgradeflow=letbuffer=Buffer.createread_buffer_sizeinlets_rd=reffalseands_wr=reffalseanderror=reffalseinletu_rd=Miou.Computation.create()inletu_wr=Miou.Computation.create()inlettasks=Queue.create()inletlock=Miou.Mutex.create()inletcond=Miou.Condition.create()inletis_shutdownconn=Runtime.is_closedconn||(!s_rd&&!s_wr)inletrunner()=letrecgoorphans=cleansrcerrorconnorphans;let()=Miou.Mutex.protectlock@@fun()->ifQueue.is_emptytasks&¬(is_shutdownconn)thenMiou.Condition.waitcondlockinletseq=Queue.to_seqtasksinletlst=List.of_seqseqinQueue.cleartasks;List.iter(funfn->ignore(Miou.async~orphansfn))lst;ifnot(is_shutdownconn)thengoorphanselsebeginLogs.debug~src(funm->m"Connection closed");let_=Miou.Computation.try_cancelu_rd(Miou.Cancelled,empty_bt)inlet_=Miou.Computation.try_cancelu_wr(Miou.Cancelled,empty_bt)in()endinletorphans=Miou.orphans()inletfinally()=terminatesrcerrorconnorphansinFun.protect~finally@@fun()->goorphansinletupgrade()=letrd=Miou.Computation.awaitu_rdinletwr=Miou.Computation.awaitu_wrinmatch(rd,wr,upgrade)with|Error_,_,_|_,Error_,_->()|_,_,None->Logs.debug~src(funm->m"No handler for websocket was given");Fmt.failwith"Upgrade unsupported"|Ok(),Ok(),Somefn->letfn()=fnflow;(* TODO(upgrade)
- multi-shutdown issue?
- Runtime.is_closed not true after shutdown `read and `write
use is_shutdown instead *)(* need to shutdown flow here *)Logs.debug~src(funm->m"Upgrade handler finished, shutdown the underlying flow");s_rd:=true;shutdownflow`read;s_wr:=true;shutdownflow`write;(* assert (Runtime.is_closed conn); *)assert(is_shutdownconn);(* notify runner so it can stop waiting *)Miou.Condition.signalcondinQueue.pushfntasksinletrd=letstop=s_rdandupgrade=u_rdinreader{src;conn;flow;tasks;buffer;stop;upgrade;lock;cond}inletwr=letstop=s_wrandupgrade=u_wrinwriter{src;conn;flow;tasks;buffer;stop;upgrade;lock;cond}inQueue.pushrdtasks;Queue.pushwrtasks;Queue.pushupgradetasks;Miou.asyncrunnerend