123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899openLwt.Infixletsrc=Logs.Src.create"conduit_lwt_server"~doc:"Conduit Lwt transport"moduleLog=(valLogs.src_logsrc:Logs.LOG)letsafe_closet=Lwt.catch(fun()->Lwt_io.closet)(fun_->Lwt.return_unit)letclose(ic,oc)=safe_closeoc>>=fun()->safe_closeicletwith_socketsockaddrf=letfd=Lwt_unix.socket(Unix.domain_of_sockaddrsockaddr)Unix.SOCK_STREAM0inLwt.catch(fun()->ffd)(fune->Lwt.catch(fun()->Lwt_unix.closefd)(fun_->Lwt.return_unit)>>=fun()->Lwt.faile)letlisten?(backlog=128)sa=with_socketsa(funfd->Lwt_unix.(setsockoptfdSO_REUSEADDRtrue);Lwt_unix.bindfdsa>|=fun()->Lwt_unix.listenfdbacklog;Lwt_unix.set_close_on_execfd;fd)letprocess_accept?timeoutcallback(sa,ic,oc)=letc=callbacksaicocinletevents=matchtimeoutwith|None->[c]|Somet->[c;(Lwt_unix.sleep(float_of_intt))]inLwt.finalize(fun()->Lwt.pickevents)(fun()->close(ic,oc))(* File descriptors are a global resource so this has to be a global limit too *)letmaxactive=refNoneletactive=ref0letcond=Lwt_condition.create()letconnected()=incractiveletdisconnected()=decractive;Lwt_condition.broadcastcond()letrecthrottle()=match!maxactivewith|Somelimitwhen!active>limit->Lwt_condition.waitcond>>=throttle|_->Lwt.return_unitletset_max_activemax_active=maxactive:=Somemax_active;Lwt_condition.broadcastcond()letrun_handlerhandlerv=Lwt.asyncbeginfun()->Lwt.try_bind(fun()->handlerv)(fun()->disconnected();Lwt.return_unit)(funx->disconnected();beginmatchxwith|Lwt.Canceled->()|ex->Log.warn(funf->f"Uncaught exception in handler: %s"(Printexc.to_stringex))end;Lwt.return_unit)endletinit?(stop=fst(Lwt.wait()))handlerfd=letstop=Lwt.map(fun()->`Stop)stopinletrecloop()=Lwt.try_bind(fun()->connected();throttle()>>=fun()->letaccept=Lwt.map(funv->`Acceptv)(Lwt_unix.acceptfd)inLwt.choose[accept;stop]>|=function|`Stop->Lwt.cancelaccept;`Stop|(`Accept_)asx->x)(function|`Stop->disconnected();Lwt.return_unit|`Acceptv->run_handlerhandlerv;loop())(funexn->disconnected();matchexnwith|Lwt.Canceled->Lwt.return_unit|ex->Log.warn(funf->f"Uncaught exception accepting connection: %s"(Printexc.to_stringex));Lwt_unix.yield()>>=loop)inLwt.finalizeloop(fun()->Lwt_unix.closefd)