123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301exceptionRetry(** Raised on failed requests that may be safely retried - even on
non-idempotent requests. Raised for example on timeout or connection
suhtdown by remote end. *)moduleMake(Net:S.Net):S.ConnectionwithmoduleNet=Net=structmoduleNet=NetmoduleIO=Net.IOmoduleResponse=Make.Response(IO)moduleRequest=Make.Request(IO)moduleHeader=Cohttp.HeaderopenIOletsrc=Logs.Src.create"cohttp.lwt.client"~doc:"Cohttp Lwt client"moduleLog=(valLogs.src_logsrc:Logs.LOG)exceptionRetry=Retrytypestate=|Connectingof(IO.ic*IO.oc)Lwt.t(* Waiting for the TCP handshake / TLS connection setup *)|Fullof(IO.ic*IO.oc)(* "full-duplex". May send requests, may be waiting for responses / EOF. *)|Closingof(IO.ic*IO.oc)(* still in "full-duplex", but no new requests may be queued.
* Will shutdown oc as soon as the last request went out. *)|HalfofIO.ic(* oc has been closed, waiting for outstanding responses on ic. *)|Closed|Failedofexn[@@warning"-37"](* enable warning when https://github.com/mirage/ocaml-conduit/pull/319 is released *)typereq_resr={uri:Uri.t;meth:Cohttp.Code.meth;headers:Header.t;body:Body.t;res_r:(Response.t*Body.t)Lwt.u;}typepersistent=[`True|`False|`Unknown]typet={mutablestate:state;mutablepersistent:persistent;(* keep alive + Chunked supported ? -> essentially HTTP 1.1 *)in_flight:req_resrQueue.t(* writer handles and fails this queue *);waiting:req_resrQueue.t(* reader handles and fails this queue *);condition:unitLwt_condition.t(* watching queues *);finalise:t->unitLwt.t;}letlengthconnection=Queue.lengthconnection.in_flight+Queue.lengthconnection.waitingletnotifyconnection=Lwt_condition.waitconnection.conditionletqueue_failconnectionqe=Queue.iter(fun{res_r;_}->Lwt.wakeup_later_exnres_re)q;Queue.clearq;Lwt_condition.broadcast_exnconnection.conditioneletclose_withstateconnection=matchconnection.statewith|Connectingchannels->connection.state<-state;Lwt.cancelchannels;Lwt.on_successchannels(fun(ic,oc)->Net.closeicoc);Lwt_condition.broadcastconnection.condition()|Closing(ic,oc)|Full(ic,oc)->connection.state<-state;Net.closeicoc;Lwt_condition.broadcastconnection.condition()|Halfic->connection.state<-state;Net.close_inic;Lwt_condition.broadcastconnection.condition()|Closed|Failed_->()letclose=close_withClosedletshutdownconnection=matchconnection.statewith|Connectingchannels->Lwt.async@@fun()->channels>>=funchannels->connection.state<-Closingchannels;Lwt.return_unit|Fullchannels->connection.state<-Closingchannels|Closing_|Half_|Closed|Failed_->()letis_closedconnection=matchconnection.statewith|Full_->false|Connecting_->false|Closing_|Half_->true|Closed|Failed_->trueletrecreaderconnection=matchconnection.statewith|Connecting_|Failed_->assertfalse|Closed->Lwt.return_unit|Full(ic,_)|Closing(ic,_)|Halfic->(Response.readic>>=funres->matchreswith|`Okres->ifconnection.persistent=`Unknown&&Response.versionres=`HTTP_1_1&¬(Header.mem(Response.headersres)"Connection")thenconnection.persistent<-`True;(* don't take from queue yet, because body may still be in flight *)let{meth;res_r;_}=Queue.peekconnection.in_flightin(* A response header to a HEAD request is indistinguishable from a
* response header to a GET request. Therefore look at the method. *)(ifmatchResponse.has_bodyreswith|_whenmeth=`HEAD->false|`No->false|`Yes|`Unknown->truethen(letstream=Body.create_streamResponse.read_body_chunk(Response.make_body_readerresic)in(* finalise could run in a thread different from the lwt main thread.
* You may therefore not call into Lwt from a finaliser. *)letclosed=reffalseinGc.finalise_last(fun()->ifnot!closedthenLog.warn(funm->m"Body not consumed, leaking stream! Refer to \
https://github.com/mirage/ocaml-cohttp/issues/730 \
for additional details"))stream;Lwt.wakeup_laterres_r(res,Body.of_streamstream);Lwt_stream.closedstream>>=fun()->closed:=true;Lwt.return_unit)else(Lwt.wakeup_laterres_r(res,`Empty);Lwt.return_unit))>>=fun()->Queue.takeconnection.in_flight|>ignore;Lwt_condition.broadcastconnection.condition();ifconnection.persistent=`Falsethen(close_withClosedconnection;Lwt.return_unit)elsereaderconnection|`Eof->close_withClosedconnection;connection.finaliseconnection>>=fun()->queue_failconnectionconnection.in_flightRetry;Lwt.return_unit|`Invalidreason->lete=Failure("Cohttp_lwt failed to read response: "^reason)inclose_with(Failede)connection;connection.finaliseconnection>>=fun()->queue_failconnectionconnection.in_flighte;Lwt.return_unit)letcallconnection?headers?(body=`Empty)methuri=letheaders=matchheaderswithSomeh->h|None->Header.init()inmatchconnection.statewith|Connecting_|Full_->letres,res_r=Lwt.wait()inQueue.push{uri;meth;headers;body;res_r}connection.waiting;Lwt_condition.broadcastconnection.condition();res|Closing_|Half_|Closed|Failed_->raiseRetryletrecwriterconnection=matchconnection.statewith|Full_whenQueue.is_emptyconnection.waiting||not(Queue.is_emptyconnection.in_flight||connection.persistent=`True)->Lwt.try_bind(fun()->Lwt_condition.waitconnection.condition)(fun_->writerconnection)(fun_->writerconnection)|Closing(_ic,_oc)whenQueue.is_emptyconnection.waiting->(* uncomment when https://github.com/mirage/ocaml-conduit/pull/319 is released *)(*
Net.close_out oc;
connection.state <- Half ic;
*)Lwt.return_unit|Full(ic,oc)|Closing(ic,oc)->let({uri;meth;headers;body;res_r}aswork)=Queue.takeconnection.waitingin(* select encoding based on (1st) header or (2nd) body *)(matchHeader.get_transfer_encodingheaderswith|Unknown->(matchBody.transfer_encodingbodywith|Fixed_ase->Lwt.return(e,body)|Chunkedasewhenconnection.persistent=`True->Lwt.return(e,body)|Chunked(* connection.persistent <> `True *)->(* We don't know yet whether chunked encoding is supported.
* Therefore use fixed length encoding. *)Body.lengthbody>>=fun(length,body)->Lwt.return(Cohttp.Transfer.Fixedlength,body)|Unknown->assertfalse)|e->Lwt.return(e,body))>>=fun(encoding,body)->letheaders=ifmatchconnection.statewith|_whenconnection.persistent=`False->true|Closing_whenQueue.is_emptyconnection.waiting->true|_->falsethenHeader.add_unless_existsheaders"Connection""close"elseheadersinletreq=Request.make~encoding~meth~headersuriinQueue.pushworkconnection.in_flight;Lwt.catch(fun()->(* try *)Request.write~flush:false(funwriter->Body.write_body(Request.write_bodywriter)body)reqoc)(fune->(* with *)(* uncomment when https://github.com/mirage/ocaml-conduit/pull/319 is released *)(*
(try Net.close_out oc with _ -> ());
connection.state <- Half ic;
*)connection.state<-Closing(ic,oc);Lwt.wakeup_later_exnres_re;queue_failconnectionconnection.waitingRetry;Lwt.return_unit)>>=fun()->ifconnection.persistent=`Falsethen((* uncomment when https://github.com/mirage/ocaml-conduit/pull/319 is released *)(*
Net.close_out oc;
connection.state <- Half ic;
*)connection.state<-Closing(ic,oc);queue_failconnectionconnection.waitingRetry;Lwt.return_unit)elsewriterconnection|Closed->queue_failconnectionconnection.waitingRetry;Lwt.return_unit|Failede->queue_failconnectionconnection.waitinge;Lwt.return_unit|Half_->Lwt.return_unit|Connecting_->assertfalseletcreate?(finalise=fun_->Lwt.return_unit)?persistent?(ctx=Lazy.forceNet.default_ctx)endp=letpersistent=matchpersistentwith|None->`Unknown|Sometrue->`True|Somefalse->`Falseinletchannels=Net.connect_endp~ctxendp>>=fun(_,ic,oc)->return(ic,oc)inletconnection={finalise;in_flight=Queue.create();waiting=Queue.create();state=Connectingchannels;condition=Lwt_condition.create();persistent;}inleton_failuree=connection.state<-FailedeinLwt.on_anychannels(funchannels->connection.state<-Fullchannels;Lwt.dont_wait(fun()->readerconnection)on_failure;Lwt.dont_wait(fun()->writerconnection)on_failure)on_failure;connectionletconnect?finalise?persistent?ctxuri=letconnection=create?finalise?persistent?ctxuriinmatchconnection.statewith|Connectingchannels->channels>>=fun_->Lwt.returnconnection|_->Lwt.returnconnectionend