123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547(*----------------------------------------------------------------------------
* Copyright (c) 2017 Inhabited Type LLC.
* Copyright (c) 2019 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleWriter=Serialize.WriteropenStreamtypeerror=[`Bad_request|`Internal_server_error|`Exnofexn]typeerror_handler=?request:Request.t->error->(Headers.t->[`write]Body.t)->unittyperesponse_state=|Waitingof(unit->unit)ref|Fixedof{response:Response.t;mutableiovec:[`Stringofstring|`BigstringofBigstringaf.t]Httpaf.IOVec.t}|StreamingofResponse.t*[`write]Body.t|CompleteofResponse.ttyperequest_info={request:Request.t;request_body:[`read]Body.t;mutablerequest_body_bytes:int64}typeactive_state=(request_info,request_info)Stream.active_statetypeactive_stream={body_buffer_size:int;encoder:Hpack.Encoder.t;mutableresponse_state:response_state;mutablewait_for_first_flush:bool(* We're not doing anything with these yet, we could probably have a
* `Reqd.schedule_read_trailers` function that would be called once
* trailer headers are emitted. *);mutabletrailers_parser:partial_headersoption;mutabletrailers:Headers.toption;create_push_stream:unit->(t*(unit->unit),[`Push_disabled|`Stream_ids_exhausted])result}andstate=(active_state,active_stream,request_info*active_stream)Stream.stateandt=(state,[`Ok|error],error_handler)Stream.streamletdefault_waiting=Sys.opaque_identity(fun()->())letcreate_active_requestrequestrequest_body={request;request_body;request_body_bytes=Int64.zero}letcreate_active_streamencoderbody_buffer_sizecreate_push_stream={body_buffer_size;encoder;response_state=Waiting(refdefault_waiting);wait_for_first_flush=true;trailers_parser=None;trailers=None;create_push_stream}letdone_waitingwhen_done_waiting=letf=!when_done_waitinginwhen_done_waiting:=default_waiting;f()letrequestt=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_|FullHeaders),_)->assertfalse|Active((Open(ActiveMessage{request;_})|HalfClosed{request;_}),_)|Reserved({request;_},_)->request|Closed_->assertfalseletrequest_bodyt=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_|FullHeaders),_)->assertfalse|Active((Open(ActiveMessage{request_body;_})|HalfClosed{request_body;_}),_)->request_body|Reserved_->(* From RFC7540§8.1:
* Promised requests MUST NOT include a request body. *)failwith"h2.Reqd.request_body: Promised requests must not include a request body"|Closed_->assertfalseletresponset=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->None|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})|Reserved(_,{response_state;_})->(matchresponse_statewith|Waiting_->None|Streaming(response,_)|Fixed{response;_}|Completeresponse->Someresponse)|Closed_->Noneletresponse_exnt=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->failwith"h2.Reqd.response_exn: response has not started"|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})|Reserved(_,{response_state;_})->(matchresponse_statewith|Waiting_->failwith"h2.Reqd.response_exn: response has not started"|Streaming(response,_)|Fixed{response;_}|Completeresponse->response)|Closed_->assertfalseletsend_fixed_responsetsresponsedata=matchs.response_statewith|Waitingwhen_done_waiting->letiovec,length=matchdatawith|`Strings->letlen=String.lengthsinletiovec={Httpaf.IOVec.buffer=`Strings;off=0;len}iniovec,len|`Bigstringb->letlen=Bigstringaf.lengthbinletiovec={Httpaf.IOVec.buffer=`Bigstringb;off=0;len}iniovec,leninletshould_send_data=length!=0inletframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_size~flags:(ifshould_send_datathenFlags.default_flagselseFlags.(set_end_streamdefault_flags))t.idinWriter.write_response_headerst.writers.encoderframe_inforesponse;(* From RFC7540§8.1:
* An HTTP request/response exchange fully consumes a single stream.
* [...] A response starts with a HEADERS frame and ends with a frame
* bearing END_STREAM, which places the stream in the "closed" state. *)ifshould_send_datathens.response_state<-Fixed{response;iovec}elses.response_state<-Completeresponse;done_waitingwhen_done_waiting|Streaming_->failwith"h2.Reqd.respond_with_*: response already started"|Fixed_|Complete_->failwith"h2.Reqd.respond_with_*: response already complete"letunsafe_respond_with_datatresponsedata=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->send_fixed_responsetstreamresponsedata|Reserved(request_info,stream)->send_fixed_responsetstreamresponsedata;(* From RFC7540§8.1:
* reserved (local): [...] In this state, only the following transitions
* are possible: The endpoint can send a HEADERS frame. This causes the
* stream to open in a "half-closed (remote)" state. *)Writer.flusht.writer(fun()->t.state<-Active(HalfClosedrequest_info,stream))|Closed_->assertfalseletrespond_with_stringtresponsestr=iffstt.error_code<>`Okthenfailwith"h2.Reqd.respond_with_string: invalid state, currently handling error";unsafe_respond_with_datatresponse(`Stringstr)letrespond_with_bigstringtresponsebstr=iffstt.error_code<>`Okthenfailwith"h2.Reqd.respond_with_bigstring: invalid state, currently handling error";unsafe_respond_with_datatresponse(`Bigstringbstr)letsend_streaming_response~flush_headers_immediatelytsresponse=s.wait_for_first_flush<-notflush_headers_immediately;matchs.response_statewith|Waitingwhen_done_waiting->letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_sizet.idinletresponse_body_buffer=Bigstringaf.creates.body_buffer_sizeinletresponse_body=Body.createresponse_body_bufferinWriter.write_response_headerst.writers.encoderframe_inforesponse;ifs.wait_for_first_flushthenWriter.yieldt.writer;s.response_state<-Streaming(response,response_body);done_waitingwhen_done_waiting;response_body|Streaming_->failwith"h2.Reqd.respond_with_streaming: response already started"|Fixed_|Complete_->failwith"h2.Reqd.respond_with_streaming: response already complete"letunsafe_respond_with_streamingt~flush_headers_immediatelyresponse=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->send_streaming_response~flush_headers_immediatelytstreamresponse|Reserved(request_info,stream)->letresponse_body=send_streaming_response~flush_headers_immediatelytstreamresponsein(* From RFC7540§8.1:
* reserved (local): [...] In this state, only the following transitions
* are possible: The endpoint can send a HEADERS frame. This causes the
* stream to open in a "half-closed (remote)" state. *)Writer.flusht.writer(fun()->t.state<-Active(HalfClosedrequest_info,stream));response_body|Closed_->assertfalseletrespond_with_streamingt?(flush_headers_immediately=false)response=iffstt.error_code<>`Okthenfailwith"h2.Reqd.respond_with_streaming: invalid state, currently handling error";unsafe_respond_with_streaming~flush_headers_immediatelytresponseletstart_push_streamtsrequest=matchs.create_push_stream()with|Ok(promised_reqd,wakeup_writer)->letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_sizet.idinWriter.write_push_promiset.writers.encoderframe_info~promised_id:promised_reqd.idrequest;let{encoder;body_buffer_size;create_push_stream;_}=sin(* From RFC7540§8.2:
* Promised requests [...] MUST NOT include a request body. *)letrequest_info=create_active_requestrequestBody.emptyinletactive_stream=create_active_streamencoderbody_buffer_sizecreate_push_streamin(* From RFC7540§8.2.1:
* Sending a PUSH_PROMISE frame creates a new stream and puts the stream
* into the "reserved (local)" state for the server and the "reserved
* (remote)" state for the client.
*
* Note: we do this before flushing the writer because request handlers
* might immediately call one of the `respond_with` functions and expect
* the stream to be in the `Reserved` state. *)promised_reqd.state<-Reserved(request_info,active_stream);wakeup_writer();Okpromised_reqd|Errore->Error(e:>[`Push_disabled|`Stream_cant_push|`Stream_ids_exhausted])(* TODO: We could easily allow the priority of the PUSH request to be
* configurable. We should allow users of this API to define the weight (maybe
* not strictly), dependency on the current Reqd, and exclusivity *)letunsafe_pushtrequest=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->start_push_streamtstreamrequest(* Already checked in `push` *)|Reserved_|Closed_->assertfalseletpushtrequest=iffstt.error_code<>`Okthenfailwith"h2.Reqd.push: invalid state, currently handling error";ifStream_identifier.is_pushedt.idthen(* From RFC7540§6.6:
* PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
* is in either the "open" or "half-closed (remote)" state. *)Error`Stream_cant_pushelseunsafe_pushtrequestletclose_streamt=matcht.error_codewith|_,Someerror_code->reset_streamterror_code|_,None->(matcht.statewith|Active(Open(FullHeaders|ActiveMessage_),_)->(* From RFC7540§8.1:
* A server can send a complete response prior to the client sending an
* entire request if the response does not depend on any portion of the
* request that has not been sent and received. When this is true, a
* server MAY request that the client abort transmission of a request
* without error by sending a RST_STREAM with an error code of NO_ERROR
* after sending a complete response (i.e., a frame with the END_STREAM
* flag). *)reset_streamtError.NoError|Active(HalfClosed_,_)->Writer.flusht.writer(fun()->Stream.finish_streamtFinished)|_->assertfalse)let_report_error?requesttsexnerror_code=matchs.response_state,fstt.error_codewith|Waiting_,`Ok->t.error_code<-(exn:>[`Ok|error]),Someerror_code;letstatus=match(exn:>[error|Status.standard])with|`Exn_->`Internal_server_error|#Status.standardasstatus->statusint.error_handler?requestexn(funheaders->letresponse=Response.create~headersstatusinunsafe_respond_with_streaming~flush_headers_immediately:truetresponse)|Waiting_,`Exn_->(* XXX(seliopou): Decide what to do in this unlikely case. There is an
* outstanding call to the [error_handler], but an intervening exception
* has been reported as well. *)failwith"h2.Reqd.report_exn: NYI"|Streaming(_response,response_body),`Ok->Body.close_writerresponse_body;t.error_code<-(exn:>[`Ok|error]),Someerror_code;reset_streamterror_code|Streaming(_response,response_body),`Exn_->Body.close_writerresponse_body;t.error_code<-fstt.error_code,Someerror_code;reset_streamterror_code;Writer.close_and_draint.writer|(Fixed_|Complete_|Streaming_|Waiting_),_->(* XXX(seliopou): Once additional logging support is added, log the error
* in case it is not spurious. *)(* Still need to send an RST_STREAM frame. Set t.error_code with
* `error_code` and `flush_response_body` below will take care of it. *)t.error_code<-fstt.error_code,Someerror_code;reset_streamterror_codeletreport_errortexnerror_code=matcht.statewith|Idle|Reserved_|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active(OpenFullHeaders,stream)->_report_errortstreamexnerror_code|Active((Open(ActiveMessage{request;request_body;_})|HalfClosed{request;request_body;_}),stream)->Body.close_readerrequest_body;_report_errortstream~requestexnerror_code|Closed_->()letreport_exntexn=report_errort(`Exnexn)Error.InternalErrorlettry_withtf:(unit,exn)Result.result=tryf();Ok()with|exn->report_exntexn;Errorexn(* Private API, not exposed to the user through h2.mli *)letclose_request_body{request_body;_}=Body.close_readerrequest_bodyleterror_codet=matchfstt.error_codewith#erroraserror->Someerror|`Ok->Noneleton_more_output_availabletf=matcht.statewith|Idle|Reserved_|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})->(matchresponse_statewith|Waitingwhen_done_waiting->(* Due to the flow-control window, this function might be called when
* another callback is already stored. We don't enforce that the
* currently stored callback is equal to `default_waiting` because it is
* OK for that not to happen. *)when_done_waiting:=f|Streaming(_,response_body)->Body.when_ready_to_writeresponse_bodyf|Fixed_->()|Complete_->failwith"h2.Reqd.on_more_output_available: response already complete")|Closed_->assertfalseletresponse_body_requires_outputresponse_body=(not(Body.is_closedresponse_body))||Body.has_pending_outputresponse_bodyletrequires_outputt=matcht.statewith|Idle->false|Reserved_->true(* From RFC7540§8.1:
* A server can send a complete response prior to the client sending an
* entire request if the response does not depend on any portion of the
* request that has not been sent and received. *)|Active(Open(WaitingForPeer|PartialHeaders_),_)->false|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})->(matchresponse_statewith|Complete_->false|Fixed{iovec={len;_};_}->len>0|Streaming(_,response_body)->response_body_requires_outputresponse_body|Waiting_->true)|Closed_->falseletflush_request_bodyt=letrequest_body=request_bodytinifBody.has_pending_outputrequest_bodythentryBody.execute_readrequest_bodywithexn->report_exntexnletwrite_buffer_datawriter~off~lenframe_infobuffer=matchbufferwith|`Stringstr->Writer.write_datawriter~off~lenframe_infostr|`Bigstringbstr->Writer.schedule_datawriter~off~lenframe_infobstrletflush_response_bodyt~max_bytes=matcht.statewith|Active((Open_|HalfClosed_),{response_state;_})->(matchresponse_statewith|Streaming(_,response_body)->letwritten=Body.transfer_to_writerresponse_bodyt.writer~max_frame_size:t.max_frame_size~max_bytest.idinifnot(response_body_requires_outputresponse_body)thenclose_streamt;written|Fixedr->(matchr.iovecwith|{buffer;off;len}asiovec->ifmax_bytes<lenthen(letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_sizet.idinwrite_buffer_datat.writer~off~len:max_bytesframe_infobuffer;r.iovec<-Httpaf.IOVec.shiftiovecmax_bytes;max_bytes)elseletframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_size~flags:Flags.(set_end_streamdefault_flags)t.idinwrite_buffer_datat.writer~off~lenframe_infobuffer;close_streamt;len)|Waiting_|Complete_->0)|_->0letdeliver_trailer_headerstheaders=matcht.statewith|Active(Open(PartialHeaders_|FullHeaders),_)->assertfalse|Active((Open(ActiveMessage_)|HalfClosed_),stream)->(* TODO: call the schedule_trailers callback *)stream.trailers<-Someheaders|_->assertfalse