123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306(*----------------------------------------------------------------------------
Copyright (c) 2017 Inhabited Type LLC.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)openHttpun_typesmoduleQueue=structincludeQueueletpeek_exn=peekletpeekt=ifis_emptytthenNoneelseSome(peek_exnt)endmoduleReader=Parse.ReadermoduleWriter=Serialize.Writertyperequest_handler=Reqd.t->unittypeerror=[`Bad_gateway|`Bad_request|`Internal_server_error|`Exnofexn]typeerror_handler=?request:Request.t->error->(Headers.t->Body.Writer.t)->unittypet={reader:Reader.request;writer:Writer.t;response_body_buffer:Bigstringaf.t;request_handler:request_handler;error_handler:error_handler;request_queue:Reqd.tQueue.t;(* invariant: If [request_queue] is not empty, then the head of the queue
has already had [request_handler] called on it. *)mutableis_errored:bool;(* if there is a parse or connection error, we invoke the [error_handler]
and set [is_errored] to indicate we should not close the writer yet. *)mutablewakeup_reader:Optional_thunk.t;}letis_closedt=Reader.is_closedt.reader&&Writer.is_closedt.writerletis_activet=not(Queue.is_emptyt.request_queue)letcurrent_reqd_exnt=Queue.peek_exnt.request_queueletyield_readertk=ifis_closedtthenfailwith"yield_reader on closed conn"elseifOptional_thunk.is_somet.wakeup_readerthenfailwith"yield_reader: only one callback can be registered at a time"elset.wakeup_reader<-Optional_thunk.somekletwakeup_readert=letf=t.wakeup_readerint.wakeup_reader<-Optional_thunk.none;Optional_thunk.call_if_somefletyield_writertk=ifWriter.is_closedt.writerthenk()elseWriter.on_wakeupt.writerkletwakeup_writert=Writer.wakeupt.writerletdefault_error_handler?request:_errorhandle=letmessage=matcherrorwith|`Exnexn->Printexc.to_stringexn|(#Status.client_error|#Status.server_error)aserror->Status.to_stringerrorinletbody=handleHeaders.emptyinBody.Writer.write_stringbodymessage;Body.Writer.closebodyletcreate?(config=Config.default)?(error_handler=default_error_handler)request_handler=let{Config.response_buffer_size;response_body_buffer_size;_}=configinletwriter=Writer.create~buffer_size:response_buffer_size()inletrequest_queue=Queue.create()inletresponse_body_buffer=Bigstringaf.createresponse_body_buffer_sizeinlethandlerrequestrequest_body=letreqd=Reqd.createerror_handlerrequestrequest_bodywriterresponse_body_bufferinQueue.pushreqdrequest_queuein{reader=Reader.requesthandler;writer;response_body_buffer;request_handler;error_handler;request_queue;is_errored=false;wakeup_reader=Optional_thunk.none;}letshutdown_readert=ifis_activetthenReqd.close_request_body(current_reqd_exnt);Reader.force_closet.reader;wakeup_readertletshutdown_writert=ifis_activetthen(letreqd=current_reqd_exntin(* XXX(dpatti): I'm not sure I understand why we close the *request* body
here. Maybe we can write a test such that removing this line causes it to
fail? *)Reqd.close_request_bodyreqd;Reqd.flush_response_bodyreqd);Writer.closet.writer;wakeup_writertleterror_codet=ifis_activetthenReqd.error_code(current_reqd_exnt)elseNoneletshutdownt=shutdown_readert;shutdown_writertletset_error_and_handle?requestterror=ifis_activetthen(assert(request=None);letreqd=current_reqd_exntinReqd.report_errorreqderror)else(t.is_errored<-true;letstatus=match(error:>[error|Status.standard])with|`Exn_->`Internal_server_error|#Status.standardasstatus->statusinshutdown_readert;letwriter=t.writerint.error_handler?requesterror(funheaders->letresponse=Response.create~headersstatusinWriter.write_responsewriterresponse;letencoding=(* If we haven't parsed the request method, just use GET as a standard
placeholder. The method is only used for edge cases, like HEAD or
CONNECT. *)letrequest_method=matchrequestwithNone->`GET|Somerequest->request.methinmatchResponse.body_length~request_methodresponsewith|(`Fixed_|`Close_delimited)asencoding->encoding|`Chunked->(* XXX(dpatti): Because we pass the writer's faraday directly to the
new body, we don't write the chunked encoding. A client won't be
able to interpret this. *)`Close_delimited|`Error(`Bad_gateway|`Internal_server_error)->failwith"H1.Server_connection.error_handler: invalid response body \
length"inBody.Writer.of_faraday(Writer.faradaywriter)writer~encoding))letreport_exntexn=set_error_and_handlet(`Exnexn)letadvance_request_queuet=ignore(Queue.taket.request_queue);ifnot(Queue.is_emptyt.request_queue)thent.request_handler(Queue.peek_exnt.request_queue)letrec_next_read_operationt=ifnot(is_activet)then((* If the request queue is empty, there is no connection error, and the
reader is closed, then we can assume that no more user code will be able
to write. *)ifReader.is_closedt.reader&¬t.is_erroredthenshutdown_writert;Reader.nextt.reader)elseletreqd=current_reqd_exntinmatchReqd.input_statereqdwith|Waiting->_yield_readert|Ready->Reader.nextt.reader|Complete->_final_read_operation_fortreqd|Upgraded->`Upgradeand_final_read_operation_fortreqd=ifnot(Reqd.persistent_connectionreqd)then(shutdown_readert;Reader.nextt.reader;)else(matchReqd.output_statereqdwith|Waiting|Ready->_yield_readert|Upgraded->(* If the input state is not [Upgraded], the output state cannot be
either. *)assertfalse|Complete->advance_request_queuet;_next_read_operationt;)and_yield_readert=(* XXX(dpatti): This is a way in which the reader and writer are not
parallel -- we tell the writer when it needs to yield but the reader is
always asking for more data. This is the only branch in either
operation function that does not return `(Reader|Writer).next`, which
means there are surprising states you can get into. For example, we ask
the runtime to yield but then raise when it tries to because the reader
is closed. I think this can be avoided if we allow this module to tell the
reader when it should yield/resume, then we'd just do an inlined
`Reader.next` call instead. I put this function here to describe why this
is subtle. *)ifReader.is_closedt.readerthenReader.nextt.readerelse`Yield;;letnext_read_operationt=match_next_read_operationtwith|`Error(`Parse_)->set_error_and_handlet`Bad_request;`Close|`Error(`Bad_requestrequest)->set_error_and_handle~requestt`Bad_request;`Close|(`Read|`Yield|`Close|`Upgrade)asoperation->operationletrecread_with_moretbs~off~lenmore=letcall_handler=Queue.is_emptyt.request_queueinletconsumed=Reader.read_with_moret.readerbs~off~lenmoreinifis_activetthen(letreqd=current_reqd_exntinifcall_handlerthent.request_handlerreqd;Reqd.flush_request_bodyreqd);(* Keep consuming input as long as progress is made and data is
available, in case multiple requests were received at once. *)ifconsumed>0&&consumed<lenthenletoff=off+consumedandlen=len-consumedinconsumed+read_with_moretbs~off~lenmoreelseconsumedletreadtbs~off~len=read_with_moretbs~off~lenIncompleteletread_eoftbs~off~len=read_with_moretbs~off~lenCompleteletrec_next_write_operationt=ifnot(is_activet)thenWriter.nextt.writerelse(letreqd=current_reqd_exntinmatchReqd.output_statereqdwith|Waiting->(* XXX(dpatti): I don't think we should need to call this, but it is
necessary in the case of a streaming, non-chunked body so that you can
set the appropriate flag. *)Reqd.flush_response_bodyreqd;Writer.nextt.writer|Ready->Reqd.flush_response_bodyreqd;Writer.nextt.writer|Complete->_final_write_operation_fortreqd|Upgraded->wakeup_readert;(* Even in the Upgrade case, we're still responsible for writing the
response header, so we might have work to do. *)ifWriter.has_pending_outputt.writerthenWriter.nextt.writerelse`Upgrade)and_final_write_operation_fortreqd=letnext=ifnot(Reqd.persistent_connectionreqd)then(shutdown_writert;Writer.nextt.writer)elsematchReqd.input_statereqdwith|Waiting->`Yield|Ready->Writer.nextt.writer;|Upgraded->`Upgrade|Complete->advance_request_queuet;_next_write_operationtinwakeup_readert;nextletnext_write_operationt=_next_write_operationtletreport_write_resulttresult=Writer.report_resultt.writerresult