123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934open!Coreopen!ImportmoduleUnix=Unix_syscallsmoduleSocket=Unix.SocketmoduleWhere_to_connect=structtype'addrt={socket_type:'addrSocket.Type.t;remote_address:unit->'addrDeferred.t;local_address:'addroption;info:Sexp.t}letsexp_of_t_{info;_}=infotypeinet=Socket.Address.Inet.tt[@@derivingsexp_of]typeunix=Socket.Address.Unix.tt[@@derivingsexp_of]letremote_addresst=t.remote_address()letcreate_local_address~bind_to_address~bind_to_port=letport=Option.valuebind_to_port~default:0inmatchbind_to_addresswith|None->Socket.Address.Inet.create_bind_any~port|Someinet_addr->Socket.Address.Inet.create~portinet_addr;;letof_host_and_port?bind_to_address?bind_to_port({Host_and_port.host;port}ashp)={socket_type=Socket.Type.tcp;remote_address=(fun()->Unix.Inet_addr.of_string_or_getbynamehost>>|funinet_addr->Socket.Address.Inet.createinet_addr~port);local_address=Some(create_local_address~bind_to_address~bind_to_port);info=[%sexp(hp:Host_and_port.t)]};;letof_filefile={socket_type=Socket.Type.unix;remote_address=(fun()->return(Socket.Address.Unix.createfile));local_address=None;info=[%sexp_of:string]file};;letof_inet_address?bind_to_address?bind_to_portaddress={socket_type=Socket.Type.tcp;remote_address=(fun()->returnaddress);local_address=Some(create_local_address~bind_to_address~bind_to_port);info=[%sexp_of:Socket.Address.Inet.t]address};;letof_unix_addressaddress={socket_type=Socket.Type.unix;remote_address=(fun()->returnaddress);local_address=None;info=[%sexp_of:Socket.Address.Unix.t]address};;endletclose_sock_on_errorsf=Monitor.try_with~run:`Schedule~rest:`Log~name:"Tcp.close_sock_on_error"f>>|function|Okv->v|Errore->(* [close] may fail, but we don't really care, since it will fail
asynchronously. The error we really care about is [e], and the
[raise_error] will cause the current monitor to see that. *)don't_wait_for(Unix.close(Socket.fds));raisee;;letreader_writer_of_sock?buffer_age_limit?reader_buffer_size?writer_buffer_sizes=letfd=Socket.fdsin(Reader.create?buf_len:reader_buffer_sizefd,Writer.create?buffer_age_limit?buf_len:writer_buffer_sizefd);;letconnect_sock?socket?interrupt?(timeout=sec10.)?time_source(where_to_connect:_Where_to_connect.t)=lettime_source=matchtime_sourcewith|Somex->Time_source.read_onlyx|None->Time_source.wall_clock()inwhere_to_connect.remote_address()>>=funaddress->lettimeout=Time_source.Event.aftertime_source(Time_ns.Span.of_span_float_round_nearesttimeout)inletinterrupt=lettimeout=Time_source.Event.firedtimeout>>=function|Aborted()->Deferred.never()|Happened()->Deferred.unitinmatchinterruptwith|None->timeout|Someinterrupt->Deferred.any[interrupt;timeout]inletconnect_interruptibles=Socket.connect_interruptiblesaddress~interruptinDeferred.create(funresult->lets=matchsocketwith|Somes->s|None->Socket.createwhere_to_connect.socket_typeinclose_sock_on_errors(fun()->matchwhere_to_connect.local_addresswith|None->connect_interruptibles|Somelocal_interface->Socket.bindslocal_interface>>=funs->connect_interruptibles)>>>function|`Oks->Time_source.Event.abort_if_possibletimeout();Ivar.fillresults|`Interrupted->don't_wait_for(Unix.close(Socket.fds));letaddress=Socket.Address.to_stringaddressin(matchTime_source.Event.aborttimeout()with|Previously_happened()->raise_s[%sexp"connection attempt timeout",(address:string)]|Ok|Previously_aborted()->raise_s[%sexp"connection attempt aborted",(address:string)]));;type'awith_connect_options=?buffer_age_limit:[`At_mostofTime.Span.t|`Unlimited]->?interrupt:unitDeferred.t->?reader_buffer_size:int->?writer_buffer_size:int->?timeout:Time.Span.t->?time_source:Time_source.t->'aletconnect?socket?buffer_age_limit?interrupt?reader_buffer_size?writer_buffer_size?timeout?time_sourcewhere_to_connect=connect_sock?socket?interrupt?timeout?time_sourcewhere_to_connect>>|funs->letr,w=reader_writer_of_sock?buffer_age_limit?reader_buffer_size?writer_buffer_sizesins,r,w;;letcollect_errorswriterf=letmonitor=Writer.monitorwriterinignore(Monitor.detach_and_get_error_streammonitor:_Stream.t);(* don't propagate errors up, we handle them here *)choose[choice(Monitor.get_next_errormonitor)(fune->Errore);choice(Monitor.try_with~run:`Schedule~rest:`Log~name:"Tcp.collect_errors"f)Fn.id];;letclose_connection_via_reader_and_writerrw=letforce_close_event=Clock.Event.after(sec30.)inletforce_close=Clock.Event.firedforce_close_event>>=function|Aborted()->Deferred.never()|Happened()->Deferred.unitinWriter.closew~force_close>>=fun()->Clock.Event.abort_if_possibleforce_close_event();Reader.closer;;letwith_connection?buffer_age_limit?interrupt?reader_buffer_size?writer_buffer_size?timeout?time_sourcewhere_to_connectf=connect_sock?interrupt?timeout?time_sourcewhere_to_connect>>=funsocket->letr,w=reader_writer_of_sock?buffer_age_limit?reader_buffer_size?writer_buffer_sizesocketinletres=collect_errorsw(fun()->fsocketrw)inDeferred.any[(res>>|fun(_:('a,exn)Result.t)->());Reader.close_finishedr;Writer.close_finishedw]>>=fun()->close_connection_via_reader_and_writerrw>>=fun()->res>>|function|Okv->v|Errore->raisee;;moduleBind_to_address=structtypet=|AddressofUnix.Inet_addr.t|All_addresses|Localhost[@@derivingsexp_of]endmoduleBind_to_port=structtypet=|On_portofint|On_port_chosen_by_os[@@derivingsexp_of]endmoduleWhere_to_listen=structtype('address,'listening_on)t={socket_type:'addressSocket.Type.t;address:'address;listening_on:('address->'listening_on[@sexp.opaque])}[@@derivingsexp_of,fields]typeinet=(Socket.Address.Inet.t,int)t[@@derivingsexp_of]typeunix=(Socket.Address.Unix.t,string)t[@@derivingsexp_of]letis_inet_witnesst=Socket.Family.is_inet_witness(Socket.Type.familyt.socket_type)letcreate~socket_type~address~listening_on={socket_type;address;listening_on}letbind_to(bind_to_address:Bind_to_address.t)(bind_to_port:Bind_to_port.t)=letport=matchbind_to_portwith|On_portport->port|On_port_chosen_by_os->0inletaddress=matchbind_to_addresswith|All_addresses->Socket.Address.Inet.create_bind_any~port|Addressaddr->Socket.Address.Inet.createaddr~port|Localhost->Socket.Address.Inet.createUnix.Inet_addr.localhost~portin{socket_type=Socket.Type.tcp;address;listening_on=(function|`Inet(_,port)->port)};;letof_portport=bind_toAll_addresses(On_portport)letof_port_chosen_by_os=bind_toAll_addressesOn_port_chosen_by_osletof_filepath={socket_type=Socket.Type.unix;address=Socket.Address.Unix.createpath;listening_on=(fun_->path)};;letbinding_on_port_chosen_by_ost=matcht.addresswith|`Inet_asinet->Socket.Address.Inet.portinet=0|`Unix_->false;;letmax_retries_upon_addr_in_uset=matchbinding_on_port_chosen_by_ostwith|true->10|false->0;;endmoduleServer=structmoduleConnection=structtype'addresst={client_socket:([`Active],'address)Socket.t;client_address:'address}[@@derivingfields,sexp_of]letinvariantinvariant_addresst=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~client_socket:ignore~client_address:(checkinvariant_address));;letcreate~client_socket~client_address={client_socket;client_address}letcloset=Fd.close(Socket.fdt.client_socket)endmoduleMax_connections=structtypet={limit:int;time_source:Time_source.t;listening_on:Info.t;mutablelast_logged:Time_ns.toption}letsexp_of_tt=[%sexp_of:int]t.limitletcreate~limit~time_source~listening_on={limit;time_source;listening_on;last_logged=None};;(* We make sure not to be too spammy with logs. This number was chosen pretty
arbitrarily. *)letlog_threshold=Time_ns.Span.of_min1.letlog_at_limitt~now=Log.Global.error_s[%message"At limit of Tcp server [max_connections]. New connections will not be \
accepted until an existing connection is closed."~limit:(t.limit:int)~listening_on:(t.listening_on:Info.t)];t.last_logged<-Somenow;;letmaybe_log_at_limitt=letnow=Time_source.nowt.time_sourceinmatcht.last_loggedwith|None->log_at_limitt~now|Somelast_logged->ifTime_ns.Span.(>)(Time_ns.diffnowlast_logged)log_thresholdthenlog_at_limitt~now;;endtype('address,'listening_on)t={socket:([`Passive],'address)Socket.t;listening_on:'listening_on;on_handler_error:[`Raise|`Ignore|`Callof'address->exn->unit];handle_client:'address->([`Active],'address)Socket.t->(unit,exn)Result.tDeferred.t;max_connections:Max_connections.t;max_accepts_per_batch:int;connections:'addressConnection.tBag.t;mutableaccept_is_pending:bool;mutabledrop_incoming_connections:bool;close_finished_and_handlers_determined:unitIvar.t}[@@derivingfields,sexp_of]letnum_connectionst=Bag.lengtht.connectionsletlistening_socket=sockettypeinet=(Socket.Address.Inet.t,int)t[@@derivingsexp_of]typeunix=(Socket.Address.Unix.t,string)t[@@derivingsexp_of]letlistening_on_address(t:(_,_)t)=Socket.getsocknamet.socketletinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~socket:ignore~listening_on:ignore~on_handler_error:ignore~handle_client:ignore~max_connections:(check(fun(max_connections:Max_connections.t)->assert(max_connections.limit>=1)))~max_accepts_per_batch:(check(funmax_accepts_per_batch->assert(max_accepts_per_batch>=1)))~connections:(check(funconnections->Bag.invariant(Connection.invariantignore)connections;letnum_connections=num_connectionstinassert(num_connections>=0);assert(num_connections<=t.max_connections.limit)))~accept_is_pending:ignore~drop_incoming_connections:ignore~close_finished_and_handlers_determined:ignorewith|exn->failwiths~here:[%here]"invariant failed"(exn,t)[%sexp_of:exn*(_,_)t];;letfdt=Socket.fdt.socketletis_closedt=Fd.is_closed(fdt)letclose_finishedt=Fd.close_finished(fdt)letclose_finished_and_handlers_determinedt=Ivar.readt.close_finished_and_handlers_determined;;letclose?(close_existing_connections=false)t=letfd_closed=Fd.close(fdt)inifnotclose_existing_connectionsthenfd_closedelse(* Connections are removed from the bag by the [maybe_accept] below, as the fds are
closed. *)Deferred.all_unit(fd_closed::List.map(Bag.to_listt.connections)~f:Connection.close);;(* [maybe_accept] is a bit tricky, but the idea is to avoid calling [accept] until we
have an available slot (determined by [num_connections < max_connections]). *)letrecmaybe_acceptt=letavailable_slots=t.max_connections.limit-num_connectionstinif(not(is_closedt))&&available_slots>0&¬t.accept_is_pendingthen(t.accept_is_pending<-true;Socket.accept_at_most~limit:(mint.max_accepts_per_batchavailable_slots)t.socket>>>funaccept_result->t.accept_is_pending<-false;matchaccept_resultwith|`Socket_closed->()|`Okconns->(* It is possible that someone called [close t] after the [accept] returned but
before we got here. In that case, we just close the clients. *)ifis_closedt||t.drop_incoming_connectionsthenList.iterconns~f:(fun(sock,_)->don't_wait_for(Fd.close(Socket.fdsock)))else(* We first [handle_client] on all the connections, which increases
[num_connections], and then call [maybe_accept] to try to accept more
clients, which respects the just-increased [num_connections]. *)List.iterconns~f:(fun(sock,addr)->handle_clienttsockaddr);maybe_acceptt)elseif(not(is_closedt))&&available_slots=0thenMax_connections.maybe_log_at_limitt.max_connectionsandhandle_clienttclient_socketclient_address=letconnection=Connection.create~client_socket~client_addressinletconnections_elt=Bag.addt.connectionsconnectionint.handle_clientclient_addressclient_socket>>>funres->Connection.closeconnection>>>fun()->Bag.removet.connectionsconnections_elt;ifDeferred.is_determined(close_finishedt)&&num_connectionst=0thenIvar.fill_if_emptyt.close_finished_and_handlers_determined();(matchreswith|Ok()->()|Errore->(trymatcht.on_handler_errorwith|`Ignore->()|`Raise->raisee|`Callf->fclient_addressewith|e->don't_wait_for(closet);raisee));maybe_acceptt;;letcreate_from_socket~max_connections?(max_accepts_per_batch=1)?(drop_incoming_connections=false)~on_handler_error(where_to_listen:_Where_to_listen.t)handle_clientsocket=lett={socket;listening_on=where_to_listen.listening_on(Socket.getsocknamesocket);on_handler_error;handle_client;max_connections;max_accepts_per_batch;connections=Bag.create();accept_is_pending=false;drop_incoming_connections;close_finished_and_handlers_determined=Ivar.create()}in(close_finishedt>>>fun()->ifnum_connectionst=0thenIvar.fill_if_emptyt.close_finished_and_handlers_determined());maybe_acceptt;t;;letget_max_connections_limitmax_connections=matchmax_connectionswith|None->10_000|Somemax_connections->ifmax_connections<=0thenfailwiths~here:[%here]"Tcp.Server.creater got negative [max_connections]"max_connectionssexp_of_int;max_connections;;moduleSocket_creator:sigtype'atconstraint'a=[<Socket.Address.t]valcreate:([`Unconnected],'addr)Socket.toption->('addr,_)Where_to_listen.t->'addrtvalbind_and_listen_maybe_retry:'addrt->f:(([`Unconnected],'addr)Socket.t->reuseaddr:bool->([`Passive],'addr)Socket.tDeferred.t)->([`Passive],'addr)Socket.tDeferred.tvalbind_and_listen_maybe_retry':'addrt->f:(([`Unconnected],'addr)Socket.t->reuseaddr:bool->([`Passive],'addr)Socket.t)->([`Passive],'addr)Socket.tend=structtype'addrt={create_socket:unit->([`Unconnected],'addr)Socket.t;should_set_reuseaddr:bool;retries_upon_addr_in_use:int}letcreatemaybe_socketwhere_to_listen=matchmaybe_socketwith|Somesocket->{create_socket=Fn.constsocket;should_set_reuseaddr=false;retries_upon_addr_in_use=0}|None->{create_socket=(fun()->Socket.createwhere_to_listen.Where_to_listen.socket_type);should_set_reuseaddr=true;retries_upon_addr_in_use=Where_to_listen.max_retries_upon_addr_in_usewhere_to_listen};;lethandle_exntsocketexn~retries_attempted_upon_addr_in_use=don't_wait_for(Unix.close(Socket.fdsocket));matcht.retries_upon_addr_in_use>retries_attempted_upon_addr_in_use,exnwith|true,Unix.Unix_error(EADDRINUSE,_,_)->`Please_retry|_,_->ifretries_attempted_upon_addr_in_use>0thenraise_s[%message"Failed to bind and listen to socket."(exn:Exn.t)(retries_attempted_upon_addr_in_use:int)]elseraiseexn;;letrecaux_bind_and_listen_maybe_retryt~retries_attempted_upon_addr_in_use~f=letsocket=t.create_socket()inmatch%bindMonitor.try_with~extract_exn:true(fun()->fsocket~reuseaddr:t.should_set_reuseaddr)with|Okv->returnv|Errorexn->let`Please_retry=handle_exntsocketexn~retries_attempted_upon_addr_in_useinaux_bind_and_listen_maybe_retryt~retries_attempted_upon_addr_in_use:(retries_attempted_upon_addr_in_use+1)~f;;letrecaux_bind_and_listen_maybe_retry't~retries_attempted_upon_addr_in_use~f=letsocket=t.create_socket()intryfsocket~reuseaddr:t.should_set_reuseaddrwith|exn->let`Please_retry=handle_exntsocketexn~retries_attempted_upon_addr_in_useinaux_bind_and_listen_maybe_retry't~retries_attempted_upon_addr_in_use:(retries_attempted_upon_addr_in_use+1)~f;;letbind_and_listen_maybe_retry=aux_bind_and_listen_maybe_retry~retries_attempted_upon_addr_in_use:0;;letbind_and_listen_maybe_retry'=aux_bind_and_listen_maybe_retry'~retries_attempted_upon_addr_in_use:0;;endletcreate_sock_non_inet_internal?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_error(where_to_listen:_Where_to_listen.t)handle_client=lettime_source=matchtime_sourcewith|Somex->Time_source.read_onlyx|None->Time_source.wall_clock()inlet%mapsocket=letsocket_creator=Socket_creator.createsocketwhere_to_listeninSocket_creator.bind_and_listen_maybe_retrysocket_creator~f:(funsocket~reuseaddr->Socket.bind~reuseaddrsocketwhere_to_listen.address>>|Socket.listen?backlog)inletmax_connections=Max_connections.create~limit:(get_max_connections_limitmax_connections)~time_source(* We must call [Fd.info] on the socket's fd after [Socket.bind] is called,
otherwise the [Info.t] won't have been set yet. *)~listening_on:(Fd.info(Socket.fdsocket))increate_from_socket~max_connections?max_accepts_per_batch?drop_incoming_connections~on_handler_errorwhere_to_listenhandle_clientsocket;;letcreate_sock_inet_internal?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?(socket:([`Unconnected],Socket.Address.Inet.t)Socket.toption)?time_source~on_handler_error(where_to_listen:(Socket.Address.Inet.t,'listening_on)Where_to_listen.t)handle_client=lettime_source=matchtime_sourcewith|Somex->Time_source.read_onlyx|None->Time_source.wall_clock()inletsocket=letsocket_creator=Socket_creator.createsocketwhere_to_listeninSocket_creator.bind_and_listen_maybe_retry'socket_creator~f:(funsocket~reuseaddr->Socket.bind_inet~reuseaddrsocketwhere_to_listen.address|>Socket.listen?backlog)inletmax_connections=Max_connections.create~limit:(get_max_connections_limitmax_connections)~time_source(* We must call [Fd.info] on the socket's fd after [Socket.bind_inet] is called,
otherwise the [Info.t] won't have been set yet. *)~listening_on:(Fd.info(Socket.fdsocket))increate_from_socket~max_connections?max_accepts_per_batch?drop_incoming_connections~on_handler_errorwhere_to_listenhandle_clientsocket;;type('address,'listening_on,'time_source_access)create_sock_async=?max_connections:int->?max_accepts_per_batch:int->?backlog:int->?drop_incoming_connections:bool->?socket:([`Unconnected],'address)Socket.t->?time_source:([>read]as'time_source_access)Time_source.T1.t->on_handler_error:[`Callof'address->exn->unit|`Ignore|`Raise]->('address,'listening_on)Where_to_listen.t->('address->([`Active],'address)Socket.t->(unit,exn)Result.tDeferred.t)->('address,'listening_on)tDeferred.tletcreate_sock_inet_internal_async:('address,_,_)create_sock_async=fun?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client->return(create_sock_inet_internal?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client);;type('address,'listening_on,'t)create_sock_async_no_constraint=|T:('address,'listening_on,'t)create_sock_async->('address,'listening_on,'t)create_sock_async_no_constrainttype'addressis_address_type=T:[<Socket.Address.t]is_address_typeletcreate_sock_internal_type_hackery:typeaddresslistening_on.is_address:addressis_address_type->is_inet:(address,[`InetofUnix.Inet_addr.t*int])Type_equal.toption->(address,listening_on,_)create_sock_async_no_constraint=fun~is_address~is_inet->matchis_inetwith|SomeT->Tcreate_sock_inet_internal_async|None->letT=is_addressinTcreate_sock_non_inet_internal;;letcreate_sock_internal:(_,_,[>read])create_sock_async=fun?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client->let(Tf)=create_sock_internal_type_hackery~is_inet:(Where_to_listen.is_inet_witnesswhere_to_listen)~is_address:Tinf?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client;;letcreate_sock?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_error(where_to_listen:('address,'listening_on)Where_to_listen.t)(handle_client:([<Socket.Address.t]as'b)->([`Active],'b)Socket.t->unitDeferred.t)=create_sock_internal?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections~on_handler_error?socket?time_sourcewhere_to_listen(funclient_addressclient_socket->Monitor.try_with~run:`Schedule~rest:`Log~name:"Tcp.Server.create_sock"(fun()->handle_clientclient_addressclient_socket));;letcreate_sock_inet?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client=create_sock_inet_internal?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listen(funclient_addressclient_socket->Monitor.try_with~run:`Schedule~rest:`Log~name:"Tcp.Server.create_sock_inet"(fun()->handle_clientclient_addressclient_socket));;letcreate_internal~create_sock?buffer_age_limit?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client=create_sock?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listen(funclient_addressclient_socket->letr,w=reader_writer_of_sock?buffer_age_limitclient_socketinWriter.set_raise_when_consumer_leaveswfalse;Deferred.any[collect_errorsw(fun()->handle_clientclient_addressrw);Writer.consumer_leftw|>Deferred.ok]>>=funres->close_connection_via_reader_and_writerrw>>|fun()->res);;letcreate_inet?buffer_age_limit?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client=create_internal~create_sock:create_sock_inet_internal?buffer_age_limit?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client;;letcreate?buffer_age_limit?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client=create_internal~create_sock:create_sock_internal?buffer_age_limit?max_connections?max_accepts_per_batch?backlog?drop_incoming_connections?socket?time_source~on_handler_errorwhere_to_listenhandle_client;;modulePrivate=structletfd=fdendendmodulePrivate=structletclose_connection_via_reader_and_writer=close_connection_via_reader_and_writerend