123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151openLwt.InfixmoduleHeader=Cohttp.HeadermoduleMake(IO:S.IO)(Net:S.NetwithmoduleIO=IO)=structmoduleIO=IOmoduleResponse=Make.Response(IO)moduleRequest=Make.Request(IO)typectx=Net.ctxletread_body~closefnicres=matchResponse.has_bodyreswith|`Yes|`Unknown->letreader=Response.make_body_readerresicinletstream=Body.create_streamResponse.read_body_chunkreaderinletbody=Body.of_streamstreaminletclosed=reffalsein(* Lwt.on_success registers a callback in the stream.
* The GC will still be able to collect stream. *)Lwt.on_success(Lwt_stream.closedstream)(fun()->closed:=true;closefn());(* finalise could run in a thread different from the lwt main thread.
* You may therefore not call into Lwt from a finaliser. *)Gc.finalise_last(fun()->ifnot!closedthenprerr_endline"Cohttp_lwt: body not consumed - leaking stream!")stream;body|`No->closefn();`Emptyletis_meth_chunked=function|`HEAD->false|`GET->false|`DELETE->false|_->trueletcall?(ctx=Net.default_ctx)?headers?(body=`Empty)?chunkedmethuri=letheaders=matchheaderswithNone->Header.init()|Someh->hinNet.connect_uri~ctxuri>>=fun(_conn,ic,oc)->letclosefn()=Net.closeicocinletchunked=matchchunkedwithNone->is_meth_chunkedmeth|Somev->vinletsent=matchchunkedwith|true->letreq=Request.make_for_client~headers~chunkedmethuriinRequest.write(funwriter->Body.write_body(Request.write_bodywriter)body)reqoc|false->(* If chunked is not allowed, then obtain the body length and
insert header *)Body.lengthbody>>=fun(body_length,buf)->letreq=Request.make_for_client~headers~chunked~body_lengthmethuriinRequest.write(funwriter->Body.write_body(Request.write_bodywriter)buf)reqocinsent>>=fun()->(Response.readic>>=function|`Invalidreason->Lwt.fail(Failure("Failed to read response: "^reason))|`Eof->Lwt.fail(Failure"Server closed connection prematurely.")|`Okres->(matchmethwith|`HEAD->closefn();Lwt.return(res,`Empty)|_->letbody=read_body~closefnicresinLwt.return(res,body)))|>funt->Lwt.on_canceltclosefn;Lwt.on_failuret(fun_exn->closefn());t(* The HEAD should not have a response body *)lethead?ctx?headersuri=call?ctx?headers`HEADuri>|=fstletget?ctx?headersuri=call?ctx?headers`GETuriletdelete?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`DELETEuriletpost?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`POSTuriletput?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`PUTuriletpatch?ctx?body?chunked?headersuri=call?ctx?headers?body?chunked`PATCHuriletpost_form?ctx?headers~paramsuri=letheaders=Header.add_opt_unless_existsheaders"content-type""application/x-www-form-urlencoded"inletbody=Body.of_string(Uri.encoded_of_queryparams)inpost?ctx~chunked:false~headers~bodyuriletcallv?(ctx=Net.default_ctx)urireqs=Net.connect_uri~ctxuri>>=fun(_conn,ic,oc)->(* Serialise the requests out to the wire *)letmeth_stream=Lwt_stream.map_s(fun(req,body)->Request.write(funwriter->Body.write_body(Request.write_bodywriter)body)reqoc>>=fun()->Lwt.return(Request.methreq))reqsin(* Read the responses. For each response, ensure that the previous
response has consumed the body before continuing to the next
response because HTTP/1.1-pipelining cannot be interleaved. *)letread_m=Lwt_mutex.create()inletclosefn()=Lwt_mutex.unlockread_minletresps=Lwt_stream.map_s(funmeth->Lwt_mutex.with_lockread_m(fun()->(Response.readic>>=function|`Invalidreason->Lwt.fail(Failure("Failed to read response: "^reason))|`Eof->Lwt.fail(Failure"Server closed connection prematurely.")|`Okres->(matchmethwith|`HEAD->closefn();Lwt.return(res,`Empty)|_->letbody=read_body~closefnicresinLwt.return(res,body)))|>funt->Lwt.on_canceltclosefn;Lwt.on_failuret(fun_exn->closefn());t))meth_streaminLwt.on_success(Lwt_stream.closedresps)(fun()->Net.closeicoc);Lwt.returnrespsend