123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523(*----------------------------------------------------------------------------
* Copyright (c) 2017 Inhabited Type LLC.
* Copyright (c) 2019 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleWriter=Serialize.Writertypeerror=[`Bad_request|`Internal_server_error|`Exnofexn]typeerror_handler=?request:Request.t->error->(Headers.t->Body.Writer.t)->unittyperesponse_state=|Waiting|Fixedof{response:Response.t;mutableiovec:[`Stringofstring|`BigstringofBigstringaf.t]Httpun_types.IOVec.t}|Streamingof{response:Response.t;response_body:Body.Writer.t;trailers:Headers.t}|CompleteofResponse.ttyperequest_info={request:Request.t;request_body:Body.Reader.t;mutablerequest_body_bytes:int64}typeactive_state=(request_info,request_info)Stream.active_statetypeactive_stream={body_buffer_size:int;encoder:Hpack.Encoder.t;mutableresponse_state:response_state(* We're not doing anything with these yet, we could probably have a
* `Reqd.schedule_read_trailers` function that would be called once
* trailer headers are emitted. *);mutabletrailers_parser:Stream.partial_headersoption;mutabletrailers:Headers.toption;create_push_stream:Stream_identifier.t->(t,[`Push_disabled|`Stream_ids_exhausted])result}andstate=(active_state,active_stream,request_info*active_stream)Stream.stateandt=(state,error,error_handler)Stream.tletcreate_active_requestrequestrequest_body={request;request_body;request_body_bytes=Int64.zero}letcreate_active_streamencoderbody_buffer_sizecreate_push_stream={body_buffer_size;encoder;response_state=Waiting;trailers_parser=None;trailers=None;create_push_stream}letrequest(t:t)=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_|FullHeaders),_)->assertfalse|Active((Open(ActiveMessage{request;_})|HalfClosed{request;_}),_)|Reserved({request;_},_)->request|Closed_->assertfalseletrequest_body(t:t)=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_|FullHeaders),_)->assertfalse|Active((Open(ActiveMessage{request_body;_})|HalfClosed{request_body;_}),_)->request_body|Reserved_->(* From RFC7540§8.1:
* Promised requests MUST NOT include a request body. *)failwith"h2.Reqd.request_body: Promised requests must not include a request body"|Closed_->failwith"h2.Reqd.request_body: Stream already closed"letresponse(t:t)=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->None|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})|Reserved(_,{response_state;_})->(matchresponse_statewith|Waiting->None|Streaming{response;_}|Fixed{response;_}|Completeresponse->Someresponse)|Closed_->Noneletresponse_exn(t:t)=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->failwith"h2.Reqd.response_exn: response has not started"|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})|Reserved(_,{response_state;_})->(matchresponse_statewith|Waiting->failwith"h2.Reqd.response_exn: response has not started"|Streaming{response;_}|Fixed{response;_}|Completeresponse->response)|Closed_->assertfalseletsend_fixed_response(t:t)sresponsedata=matchs.response_statewith|Waiting->letiovec,length=matchdatawith|`Strings->letlen=String.lengthsinletiovec={Httpun_types.IOVec.buffer=`Strings;off=0;len}iniovec,len|`Bigstringb->letlen=Bigstringaf.lengthbinletiovec={Httpun_types.IOVec.buffer=`Bigstringb;off=0;len}iniovec,leninletshould_send_data=length<>0inletframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_size~flags:(ifshould_send_datathenFlags.default_flagselseFlags.(set_end_streamdefault_flags))t.idinWriter.write_response_headerst.writers.encoderframe_inforesponse;(* From RFC7540§8.1:
* An HTTP request/response exchange fully consumes a single stream.
* [...] A response starts with a HEADERS frame and ends with a frame
* bearing END_STREAM, which places the stream in the "closed" state. *)ifshould_send_datathens.response_state<-Fixed{response;iovec}elses.response_state<-Completeresponse;Writer.wakeupt.writer|Streaming_->failwith"h2.Reqd.respond_with_*: response already started"|Fixed_->failwith"h2.Reqd.respond_with_*: response already sent"|Complete_->failwith"h2.Reqd.respond_with_*: response already complete"letschedule_trailers(t:t)new_trailers=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Closed_->failwith"h2.Reqd.schedule_trailers: stream already closed"|Reserved_->failwith"h2.Reqd.schedule_trailers: response not started"|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->(matchstream.response_statewith|Streaming{response;response_body;trailers=old_trailers}->ifold_trailers<>Headers.emptythenfailwith"h2.Reqd.schedule_trailers: trailers already scheduled";stream.response_state<-Streaming{response;response_body;trailers=new_trailers}|_->failwith"h2.Reqd.schedule_trailers: can only send trailers in Streaming mode")letunsafe_respond_with_data(t:t)responsedata=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->send_fixed_responsetstreamresponsedata|Reserved(request_info,stream)->send_fixed_responsetstreamresponsedata;(* From RFC7540§8.1:
* reserved (local): [...] In this state, only the following transitions
* are possible: The endpoint can send a HEADERS frame. This causes the
* stream to open in a "half-closed (remote)" state. *)Writer.flusht.writer(fun_reason->(* TODO(anmonteiro): different if closed? *)t.state<-Active(HalfClosedrequest_info,stream))|Closed_->assertfalseletrespond_with_string(t:t)responsestr=matcht.error_codewith|No_error->unsafe_respond_with_datatresponse(`Stringstr)|_->failwith"h2.Reqd.respond_with_string: invalid state, currently handling error"letrespond_with_bigstring(t:t)responsebstr=matcht.error_codewith|No_error->unsafe_respond_with_datatresponse(`Bigstringbstr)|_->failwith"h2.Reqd.respond_with_bigstring: invalid state, currently handling error"letsend_streaming_response~flush_headers_immediately(t:t)sresponse=letwait_for_first_flush=notflush_headers_immediatelyinmatchs.response_statewith|Waiting->letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_sizet.idinletresponse_body_buffer=Bigstringaf.creates.body_buffer_sizeinletresponse_body=Body.Writer.createresponse_body_buffer~writer:t.writerinWriter.write_response_headerst.writers.encoderframe_inforesponse;ifwait_for_first_flushthenWriter.yieldt.writer;s.response_state<-Streaming{response;response_body;trailers=Headers.empty};Writer.wakeupt.writer;response_body|Streaming_->failwith"h2.Reqd.respond_with_streaming: response already started"|Fixed_|Complete_->failwith"h2.Reqd.respond_with_streaming: response already complete"letunsafe_respond_with_streaming(t:t)~flush_headers_immediatelyresponse=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->send_streaming_response~flush_headers_immediatelytstreamresponse|Reserved(request_info,stream)->letresponse_body=send_streaming_response~flush_headers_immediatelytstreamresponsein(* From RFC7540§8.1:
* reserved (local): [...] In this state, only the following transitions
* are possible: The endpoint can send a HEADERS frame. This causes the
* stream to open in a "half-closed (remote)" state. *)Writer.flusht.writer(fun_reason->(* TODO(anmonteiro): different if closed? *)t.state<-Active(HalfClosedrequest_info,stream));response_body|Closed_->assertfalseletrespond_with_streaming(t:t)?(flush_headers_immediately=false)response=matcht.error_codewith|No_error->unsafe_respond_with_streaming~flush_headers_immediatelytresponse|_->failwith"h2.Reqd.respond_with_streaming: invalid state, currently handling error"letstart_push_stream(t:t)srequest=matchs.create_push_streamt.idwith|Okpromised_reqd->letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_sizet.idinWriter.write_push_promiset.writers.encoderframe_info~promised_id:promised_reqd.idrequest;let{encoder;body_buffer_size;create_push_stream;_}=sin(* From RFC7540§8.2:
* Promised requests [...] MUST NOT include a request body. *)letrequest_info=create_active_requestrequestBody.Reader.emptyinletactive_stream=create_active_streamencoderbody_buffer_sizecreate_push_streamin(* From RFC7540§8.2.1:
* Sending a PUSH_PROMISE frame creates a new stream and puts the stream
* into the "reserved (local)" state for the server and the "reserved
* (remote)" state for the client.
*
* Note: we do this before flushing the writer because request handlers
* might immediately call one of the `respond_with` functions and expect
* the stream to be in the `Reserved` state. *)promised_reqd.state<-Reserved(request_info,active_stream);Writer.wakeupt.writer;Okpromised_reqd|Errore->Error(e:>[`Push_disabled|`Stream_cant_push|`Stream_ids_exhausted])(* TODO: We could easily allow the priority of the PUSH request to be
* configurable. We should allow users of this API to define the weight (maybe
* not strictly), dependency on the current Reqd, and exclusivity *)letunsafe_push(t:t)request=matcht.statewith|Idle|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),stream)->start_push_streamtstreamrequest(* Already checked in `push` *)|Reserved_|Closed_->assertfalseletpush(t:t)request=matcht.error_codewith|No_error->ifStream_identifier.is_pushedt.idthen(* From RFC7540§6.6:
* PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
* is in either the "open" or "half-closed (remote)" state. *)Error`Stream_cant_pushelseunsafe_pushtrequest|_->failwith"h2.Reqd.push: invalid state, currently handling error"let_report_error?request(t:t)s(error:error)error_code=matchs.response_state,t.error_codewith|Waiting,No_error->t.error_code<-Stream.error_to_codeerrorerror_code;letstatus=match(error:>[error|Status.standard])with|`Exn_->`Internal_server_error|#Status.standardasstatus->statusint.error_handler?requesterror(funheaders->letresponse=Response.create~headersstatusinunsafe_respond_with_streaming~flush_headers_immediately:truetresponse)|Streaming{response_body;_},No_error->Body.Writer.closeresponse_body;t.error_code<-Stream.error_to_codeerrorerror_code;Stream.reset_streamterror_code|Fixed_,No_error->(* Still need to send an RST_STREAM frame. Set t.error_code with
* `error_code` and `flush_response_body` below will reset the stream after
* flushing any remaining body bytes. *)t.error_code<-Stream.error_to_codeerrorerror_code;Stream.reset_streamterror_code|(Waiting|Fixed_|Streaming_),Exn_->(* XXX(seliopou): Decide what to do in this unlikely case. There is an
* outstanding call to the [error_handler], but an intervening exception
* has been reported as well. *)failwith"h2.Reqd.report_exn: NYI"|(Waiting|Streaming_|Fixed_|Complete_),_->()letreport_error(t:t)exnerror_code=matcht.statewith|Idle|Reserved_|Active(Open(WaitingForPeer|PartialHeaders_),_)->assertfalse|Active(OpenFullHeaders,stream)->_report_errortstreamexnerror_code|Active((Open(ActiveMessage{request;request_body;_})|HalfClosed{request;request_body;_}),stream)->Body.Reader.closerequest_body;_report_errortstream~requestexnerror_code|Closed_->()letreport_exntexn=report_errort(`Exnexn)Error_code.InternalErrorlettry_withtf:(unit,exn)result=tryf();Ok()with|exn->report_exntexn;Errorexnleterror_code=Stream.error_code(* Private API, not exposed to the user through h2.mli *)letrequires_output(t:t)=matcht.statewith|Idle->false|Reserved_->true|Active(Open(WaitingForPeer|PartialHeaders_),_)->false|Active((Open(FullHeaders|ActiveMessage_)|HalfClosed_),{response_state;_})->(* From RFC7540§8.1:
* A server can send a complete response prior to the client sending an
* entire request if the response does not depend on any portion of the
* request that has not been sent and received. *)(matchresponse_statewith|Complete_->false|Fixed{iovec={len;_};_}->len>0|Streaming_->true|Waiting->true)|Closed_->falseletflush_request_body(t:t)=matcht.statewith|Active((Open(ActiveMessage{request_body;_})|HalfClosed{request_body;_}),_)->ifBody.Reader.has_pending_outputrequest_bodythen(tryBody.Reader.execute_readrequest_bodywithexn->report_exntexn)|_->()letwrite_buffer_datawriter~off~lenframe_infobuffer=matchbufferwith|`Stringstr->Writer.write_datawriter~off~lenframe_infostr|`Bigstringbstr->Writer.schedule_datawriter~off~lenframe_infobstrletclose_stream(t:t)=matcht.error_codewith|No_error->(matcht.statewith|Active(Open(FullHeaders|ActiveMessage_),_)->(* From RFC7540§8.1:
* A server can send a complete response prior to the client sending an
* entire request if the response does not depend on any portion of the
* request that has not been sent and received. When this is true, a
* server MAY request that the client abort transmission of a request
* without error by sending a RST_STREAM with an error code of NO_ERROR
* after sending a complete response (i.e., a frame with the END_STREAM
* flag). *)Stream.reset_streamtError_code.NoError|Active(HalfClosed_,_)->Writer.flusht.writer(fun_reason->Stream.finish_streamtFinished)|_->assertfalse)|Exn_->Stream.reset_streamtInternalError|Other{code;_}->Stream.reset_streamtcodeletflush_response_body(t:t)~max_bytes=matcht.statewith|Active((Open_|HalfClosed_),stream)->(matchstream.response_statewith|Streaming{response;response_body;trailers}->ifBody.Writer.has_pending_outputresponse_body&&max_bytes>0thenBody.Writer.transfer_to_writerresponse_bodyt.writer~max_frame_size:t.max_frame_size~max_bytest.idelseifBody.Writer.is_closedresponse_bodythen((* no pending output and closed, we can finalize the message and close
the stream *)letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_size~flags:Flags.(set_end_streamdefault_flags)t.idinmatchtrailerswith|_::_->Writer.write_response_trailerst.writerstream.encoderframe_infotrailers;close_streamt;stream.response_state<-Completeresponse;0|[]->(* From RFC7540§6.9.1:
* Frames with zero length with the END_STREAM flag set (that is,
* an empty DATA frame) MAY be sent if there is no available space
* in either flow-control window. *)Writer.schedule_datat.writerframe_info~len:0Bigstringaf.empty;close_streamt;stream.response_state<-Completeresponse;0)else(* no pending output but Body is still open *)0|Fixed({iovec={buffer;off;len}asiovec;_}asr)whenmax_bytes>0->letis_partial_flush=max_bytes<leninletframe_info=letflags=ifis_partial_flushthenFlags.default_flagselseFlags.(set_end_streamdefault_flags)inWriter.make_frame_info~max_frame_size:t.max_frame_size~flagst.idinletlen_to_write=ifis_partial_flushthenmax_byteselseleninwrite_buffer_datat.writer~off~len:len_to_writeframe_infobuffer;r.iovec<-Httpun_types.IOVec.shiftioveclen_to_write;ifnotis_partial_flushthenclose_streamt;len_to_write|Fixed_|Waiting|Complete_->0)|_->0letdeliver_trailer_headers(t:t)headers=matcht.statewith|Active(Open(PartialHeaders_|FullHeaders),_)->assertfalse|Active((Open(ActiveMessage_)|HalfClosed_),stream)->(* TODO: call the schedule_trailers callback *)stream.trailers<-Someheaders|_->assertfalse