123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125openLwt.InfixmoduleHeader=Cohttp.HeadermoduleMake(IO:S.IO)(Net:S.NetwithmoduleIO=IO)=structmoduleIO=IOmoduleResponse=Make.Response(IO)moduleRequest=Make.Request(IO)typectx=Net.ctx[@@derivingsexp_of]letdefault_ctx=Net.default_ctxletread_response~closefnic_ocmeth=Response.readic>>=beginfunction|`Invalidreason->Lwt.fail(Failure("Failed to read response: "^reason))|`Eof->Lwt.fail(Failure"Client connection was closed")|`Okres->beginlethas_body=matchmethwith|`HEAD->`No|_->Response.has_bodyresinmatchhas_bodywith|`Yes|`Unknown->letreader=Response.make_body_readerresicinletstream=Body.create_streamResponse.read_body_chunkreaderinletclosefn=closefninLwt.on_success(Lwt_stream.closedstream)closefn;letgcfn_st=closefn()inGc.finalisegcfnstream;letbody=Body.of_streamstreaminLwt.return(res,body)|`No->closefn();Lwt.return(res,`Empty)endend|>funt->Lwt.on_canceltclosefn;Lwt.on_failuret(fun_exn->closefn());tletis_meth_chunked=function|`HEAD->false|`GET->false|`DELETE->false|_->trueletcall?(ctx=default_ctx)?headers?(body=`Empty)?chunkedmethuri=letheaders=matchheaderswithNone->Header.init()|Someh->hinNet.connect_uri~ctxuri>>=fun(_conn,ic,oc)->letclosefn()=Net.closeicocinletchunked=matchchunkedwith|None->is_meth_chunkedmeth|Somev->vinletsent=matchchunkedwith|true->letreq=Request.make_for_client~headers~chunkedmethuriinRequest.write(funwriter->Body.write_body(Request.write_bodywriter)body)reqoc|false->(* If chunked is not allowed, then obtain the body length and
insert header *)Body.lengthbody>>=fun(body_length,buf)->letreq=Request.make_for_client~headers~chunked~body_lengthmethuriinRequest.write(funwriter->Body.write_body(Request.write_bodywriter)buf)reqocinsent>>=fun()->read_response~closefnicocmeth(* The HEAD should not have a response body *)lethead?ctx:_?headersuri=call?headers`HEADuri>|=fstletget?ctx?headersuri=call?ctx?headers`GETuriletdelete?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`DELETEuriletpost?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`POSTuriletput?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`PUTuriletpatch?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`PATCHuriletpost_form?ctx?headers~paramsuri=letheaders=Header.add_opt_unless_existsheaders"content-type""application/x-www-form-urlencoded"inletbody=Body.of_string(Uri.encoded_of_queryparams)inpost?ctx~chunked:false~headers~bodyuriletcallv?(ctx=default_ctx)urireqs=Net.connect_uri~ctxuri>>=fun(_conn,ic,oc)->(* Serialise the requests out to the wire *)letmeth_stream=Lwt_stream.map_s(fun(req,body)->Request.write(funwriter->Body.write_body(Request.write_bodywriter)body)reqoc>>=fun()->Lwt.return(Request.methreq))reqsin(* Read the responses. For each response, ensure that the previous
response has consumed the body before continuing to the next
response because HTTP/1.1-pipelining cannot be interleaved. *)letread_m=Lwt_mutex.create()inletlast_body=refNoneinletclosefn()=Lwt_mutex.unlockread_minletresps=Lwt_stream.map_s(funmeth->beginmatch!last_bodywith|None->Lwt.return_unit|Somebody->Body.drain_bodybodyend>>=fun()->Lwt_mutex.with_lockread_m(fun()->read_response~closefnicocmeth)>|=(fun((_,body)asx)->last_body:=Somebody;x))meth_streaminLwt.on_success(Lwt_stream.closedresps)(fun()->Net.closeicoc);Lwt.returnrespsend