123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391(*----------------------------------------------------------------------------
Copyright (c) 2017 Inhabited Type LLC.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)moduleReader=Parse.ReadermoduleWriter=Serialize.Writertyperequest_handler=Reqd.t->unittypeerror=[`Bad_gateway|`Bad_request|`Internal_server_error|`Exnofexn]typeerror_handler=?request:Request.t->error->(Headers.t->Body.Writer.t)->unittypeerror_code=|No_error|Errorof{request:Request.toption;mutableresponse_state:Response_state.t}typet={reader:Reader.request;writer:Writer.t;response_body_buffer:Bigstringaf.t;request_handler:request_handler;error_handler:error_handler;request_queue:Reqd.tQueue.t(* invariant: If [request_queue] is not empty, then the head of the queue
has already had [request_handler] called on it. *);mutableerror_code:error_code(* Represents an unrecoverable error that will cause the connection to
* shutdown. Holds on to the response body created by the error handler
* that might be streaming to the client. *)}letis_closedt=Reader.is_closedt.reader&&Writer.is_closedt.writerletis_activet=not(Queue.is_emptyt.request_queue)letcurrent_reqd_exnt=Queue.peekt.request_queueletyield_readertk=Reader.on_wakeupt.readerkletwakeup_readert=ifis_activetthenbeginletreqd=current_reqd_exntin(* Before going through another read loop, give the body a chance to flush
its buffered bytes to the application. This fixes a pathological case
where the body could buffer too much without a chance of executing
scheduled reads. *)Reqd.flush_request_bodyreqd;end;Reader.wakeupt.readerletyield_writertk=Writer.on_wakeupt.writerk;;letwakeup_writert=Writer.wakeupt.writerletdefault_error_handler?request:_errorhandle=letmessage=matcherrorwith|`Exnexn->Printexc.to_stringexn|(#Status.client_error|#Status.server_error)aserror->Status.to_stringerrorinletbody=handleHeaders.emptyinBody.Writer.write_stringbodymessage;Body.Writer.closebody;;letcreate?(config=Config.default)?(error_handler=default_error_handler)request_handler=let{Config.response_buffer_size;response_body_buffer_size;_}=configinletwriter=Writer.create~buffer_size:response_buffer_size()inletrequest_queue=Queue.create()inletresponse_body_buffer=Bigstringaf.createresponse_body_buffer_sizeinletrecreader=lazy(Reader.request~wakeup:(fun()->wakeup_reader(Lazy.forcet))handler)andhandlerrequestrequest_body=letreqd=Reqd.createerror_handlerrequestrequest_body(Lazy.forcereader)writerresponse_body_bufferinQueue.pushreqdrequest_queue;andt=lazy{reader=Lazy.forcereader;writer;response_body_buffer;request_handler=request_handler;error_handler=error_handler;request_queue;error_code=No_error}inLazy.forcetletshutdown_readert=Reader.force_closet.reader;ifis_activetthenReqd.close_request_body(current_reqd_exnt)elsewakeup_readertletshutdown_writert=Writer.closet.writer;ifis_activetthenReqd.close_request_body(current_reqd_exnt)elsewakeup_writertleterror_codet=ifis_activetthenReqd.error_code(current_reqd_exnt)elseNoneletshutdownt=Queue.iterReqd.close_request_bodyt.request_queue;Queue.cleart.request_queue;shutdown_readert;shutdown_writert;wakeup_readert;wakeup_writertletset_error_and_handle?requestterror=ifis_activetthenbeginassert(request=None);letreqd=current_reqd_exntinReqd.report_errorreqderrorendelsebeginletstatus=match(error:>[error|Status.standard])with|`Exn_->`Internal_server_error|#Status.standardasstatus->statusinshutdown_readert;letwriter=t.writerinmatcht.error_codewith|No_error->t.error_code<-Error{request;response_state=Waiting};t.error_handler?requesterror(funheaders->letresponse=Response.create~headersstatusinletencoding=(* If we haven't parsed the request method, just use GET as a standard
placeholder. The method is only used for edge cases, like HEAD or
CONNECT. *)letrequest_method=matchrequestwith|None->`GET|Some(request:Request.t)->request.methinmatchResponse.body_length~request_methodresponsewith|`Fixed_|`Close_delimited|`Chunkedasencoding->encoding|`Error(`Bad_gateway|`Internal_server_error)->failwith"httpaf.Server_connection.error_handler: invalid response body length"inletresponse_body=(* The (shared) response body buffer can be used in this case
* because in this conditional branch we're not sending a response
* (is_active t == false), and are therefore not making use of that
* buffer. *)Body.Writer.createt.response_body_buffer~encoding~when_ready_to_write:(Optional_thunk.some(fun()->wakeup_writert))inWriter.write_responsewriterresponse;t.error_code<-Error{request;response_state=Streaming(response,response_body)};wakeup_writert;response_body)|Error_->(* When reading, this should be impossible: even if we try to read more,
* the parser does not ingest it, and even if someone attempts to feed
* more bytes to the parser when we already told them to [`Close], that's
* really their own fault.
*
* We do, however, need to handle this case if any other exception is
* reported (we're already handling an error and e.g. the writing channel
* is closed). Just shut down the connection in that case.
*)Writer.close_and_draint.writer;shutdowntendletreport_exntexn=set_error_and_handlet(`Exnexn)letadvance_request_queuet=ignore(Queue.taket.request_queue);ifnot(Queue.is_emptyt.request_queue)thent.request_handler(Queue.peekt.request_queue);;;letrec_next_read_operationt=ifnot(is_activet)then(letnext=Reader.nextt.readerinbeginmatchnextwith|`Error_->(* Don't tear down the whole connection if we saw an unrecoverable
* parsing error, as we might be in the process of streaming back the
* error response body to the client. *)shutdown_readert|`Close->(matcht.error_codewith|No_error->shutdownt|Error_->())|_->()end;next)else(letreqd=current_reqd_exntinmatchReqd.input_statereqdwith|Wait->beginmatchReqd.output_statereqdwith|Complete->(* this branch happens if the writer has completed sending the response
and there are still bytes remaining to be read in the request body.
*)Reader.nextt.reader|Waiting|Ready->(* `Wait` signals that we should add backpressure to the read channel,
* meaning the reader should tell the runtime to yield.
*
* The exception here is if there has been an error in the parser; in
* that case, we need to return that exception and signal the runtime to
* close. *)beginmatchReader.nextt.readerwith|`Error_asoperation->operation|_->`Yieldendend|Ready->Reader.nextt.reader|Complete->_final_read_operation_fortreqd)and_final_read_operation_fortreqd=ifReader.is_closedt.reader||not(Reqd.persistent_connectionreqd)then(shutdown_readert;Reader.nextt.reader;)elsematchReqd.output_statereqdwith|Waiting|Ready->`Yield|Complete->(* The "final read" operation for a request descriptor that is
* `Complete` from both input and output perspectives needs to account
* for the fact that the reader may not have finished reading the
* request body.
* It's important that we don't advance the request queue in this case
* for persistent connections, or we'd break the invariant that a
* non-empty `request_queue` has had the request handler called on its
* head element. *)matchReader.nextt.readerwith|`Error_asop->(* Keep reading when in a "partial" state (`Read).
* Don't advance the request queue if in an error state. *)op|`Readasop->(* we just don't advance the request queue in the case of a parser
error. *)advance_request_queuet;op|_->advance_request_queuet;_next_read_operationtletnext_read_operationt=match_next_read_operationtwith|`Error(`Parse_)->set_error_and_handlet`Bad_request;`Close|`Error(`Bad_requestrequest)->set_error_and_handle~requestt`Bad_request;`Close|`Start|`Read->`Read|(`Yield|`Close)asoperation->operationletread_with_moretbs~off~lenmore=letcall_handler=Queue.is_emptyt.request_queueinletconsumed=Reader.read_with_moret.readerbs~off~lenmoreinifis_activetthen(letreqd=current_reqd_exntinifcall_handlerthent.request_handlerreqd;Reqd.flush_request_bodyreqd;);consumed;;letreadtbs~off~len=read_with_moretbs~off~lenIncompleteletread_eoftbs~off~len=read_with_moretbs~off~lenCompleteletflush_response_error_bodytresponse_state=Response_state.flush_response_bodyresponse_statet.writerletrec_next_write_operationt=ifnot(is_activet)then(matcht.error_codewith|No_error->ifReader.is_closedt.readerthenshutdownt;Writer.nextt.writer|Error{response_state;_}->matchResponse_state.output_stateresponse_statewith|Waiting->`Yield|Ready->flush_response_error_bodytresponse_state;Writer.nextt.writer|Complete->shutdown_writert;Writer.nextt.writer)else(letreqd=current_reqd_exntinmatchReqd.output_statereqdwith|Waiting->Writer.nextt.writer|Ready->Reqd.flush_response_bodyreqd;Writer.nextt.writer|Complete->_final_write_operation_fortreqd)and_final_write_operation_fortreqd=ifnot(Reqd.persistent_connectionreqd)then(shutdown_writert;wakeup_readert;Writer.nextt.writer;)else(matchReqd.input_statereqdwith|Wait->wakeup_readert;Writer.nextt.writer|Ready->(* we can't close the request body here, otherwise the reader loop is
going to think that its "input state" is complete, and remove the
request descriptor from the request queue, when in fact it needs to
read the remainder of the request body. It needs to hang around
because there could be a sudden EOF while discarding the request body,
which we need to handle. *)wakeup_readert;Writer.nextt.writer|Complete->matchReader.nextt.readerwith|`Error_|`Read->Writer.nextt.writer|_->advance_request_queuet;wakeup_readert;_next_write_operationt);;letnext_write_operationt=_next_write_operationtletreport_write_resulttresult=Writer.report_resultt.writerresult