123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170exceptionRetry=Connection.Retry(** This functor establishes a new connection for each request. *)moduleMake_no_cache(Connection:S.Connection):sigincludeS.Connection_cachevalcreate:?ctx:Connection.Net.ctx->unit->t(** [create ?ctx ()] creates a connection for handling a single request. The
connection accepts only a single request and will automatically be closed
as soon as possible.
@param ctx See {!Connection.Net.ctx} *)end=structmoduleNet=Connection.NetmoduleIO=Net.IOopenIOtypet=S.callletcall=Fun.idletcreate?(ctx=Lazy.forceNet.default_ctx)()?headers?bodymethuri=Net.resolve~ctxuri(* TODO: Support chunked encoding without ~persistent:true ? *)>>=Connection.connect~ctx~persistent:true>>=funconnection->letres=Connection.callconnection?headers?bodymethuriin(* this can be simplified when https://github.com/mirage/ocaml-conduit/pull/319 is released. *)Lwt.dont_wait(fun()->res>>=fun(_,body)->(matchbodywith|`Empty|`String_|`Strings_->Lwt.return_unit|`Streamstream->Lwt_stream.closedstream)>>=fun()->Connection.closeconnection;Lwt.return_unit)(functionRetry->()|e->raisee);resend(** This functor keeps a cache of connections for reuse. Connections are reused
based on their remote {!type:Conduit.endp} (effectively IP / port). *)moduleMake(Connection:S.Connection)(Sleep:S.Sleep):sigincludeS.Connection_cachevalcreate:?ctx:Connection.Net.ctx->?keep:int64->?retry:int->?parallel:int->?depth:int->unit->t(** Create a new connection cache
@param ctx Conduit context to use. See {!type:Connection.Net.ctx}.
@param keep Number of nanoseconds to keep an idle connection around.
@param retry
Number of times a {e gracefully} failed request is automatically
retried. {e graceful} means failed with {!exception:Connection.Retry}.
Requests with a [`Stream] {!module:Body} cannot be retried
automatically. Such requests will fail with
{!exception:Connection.Retry} and a new {!module:Body} will need to be
provided to retry.
@param parallel
maximum number of connections to establish to a single endpoint. Beware:
A single hostname may resolve to multiple endpoints. In such a case
connections may be created in excess to what was intended.
@param depth
maximum number of requests to queue and / or send on a single
connection. *)end=structmoduleNet=Connection.NetmoduleIO=Net.IOopenIOtypectx=Net.ctxtypet={cache:(Net.endp,Connection.t)Hashtbl.t;ctx:ctx;keep:int64;retry:int;parallel:int;depth:int;}letcreate?(ctx=Lazy.forceNet.default_ctx)?(keep=60_000_000_000L)?(retry=2)?(parallel=4)?(depth=100)()={cache=Hashtbl.create~random:true10;ctx;keep;retry;parallel;depth;}letrecget_connectionselfendp=letfinaliseconnection=letrecremovekeep=letcurrent=Hashtbl.findself.cacheendpinHashtbl.removeself.cacheendp;ifcurrent==connectionthenList.iter(Hashtbl.addself.cacheendp)keepelseremove(current::keep)inremove[];Lwt.return_unitinletcreate()=letconnection=Connection.create~finalise~ctx:self.ctxendpandtimeout=refLwt.return_unitinletrecbusy()=Lwt.cancel!timeout;ifConnection.lengthconnection=0then(timeout:=Sleep.sleep_nsself.keep>>=fun()->Connection.closeconnection;(* failure is ignored *)Lwt.return_unit);Lwt.on_termination(Connection.notifyconnection)busyinbusy();connectioninmatchHashtbl.find_allself.cacheendpwith|[]->letconnection=create()inHashtbl.addself.cacheendpconnection;Lwt.returnconnection|conns->(letrecsearchlength=function|[a]->(a,length+1)|a::b::tlwhenConnection.lengtha<Connection.lengthb->search(length+1)(a::tl)|_::tl->search(length+1)tl|[]->assertfalseinmatchsearch0connswith|shallowest,_whenConnection.lengthshallowest=0->Lwt.returnshallowest|_,lengthwhenlength<self.parallel->letconnection=create()inHashtbl.addself.cacheendpconnection;Lwt.returnconnection|shallowest,_whenConnection.lengthshallowest<self.depth->Lwt.returnshallowest|_->Lwt.try_bind(fun()->Lwt.choose(List.mapConnection.notifyconns))(fun_->get_connectionselfendp)(fun_->get_connectionselfendp))letcallself?headers?bodymethuri=Net.resolve~ctx:self.ctxuri>>=funendp->letrecrequestretry=get_connectionselfendp>>=funconn->Lwt.catch(fun()->Connection.callconn?headers?bodymethuri)(function|Retry->(matchbodywith|Some(`Stream_)->raiseRetry|None|Some`Empty|Some(`String_)|Some(`Strings_)->ifretry<=0thenraiseRetryelserequest(retry-1))|e->Lwt.reraisee)inrequestself.retryend