12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448(*----------------------------------------------------------------------------
* Copyright (c) 2017 Inhabited Type LLC.
* Copyright (c) 2019 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleAB=Angstrom.BufferedmoduleReader=Parse.ReadermoduleWriter=Serialize.WritermoduleScheduler=Scheduler.Make(structincludeStreamtypet=Respd.tletflush_write_body=Respd.flush_request_bodyletrequires_output=Respd.requires_outputend)moduleQueue=structincludeQueuelettake_optt=matchis_emptytwithtrue->None|false->Some(taket)endtypeerror=Respd.errortypetrailers_handler=Headers.t->unittyperesponse_handler=Response.t->[`read]Body.t->unittypeerror_handler=error->unittypet={mutablesettings:Settings.t;reader:Reader.frame;writer:Writer.t;config:Config.t;streams:Scheduler.t;mutablecurrent_stream_id:Stream_identifier.t;mutablemax_pushed_stream_id:Stream_identifier.t;mutablecurrent_server_streams:int;mutablereceiving_headers_for_stream:Stream_identifier.toption;mutabledid_send_go_away:bool;mutableunacked_settings:int;pending_pings:(unit->unit)Queue.t;error_handler:error->unit;push_handler:Request.t->(response_handler,unit)result(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder:Hpack.Encoder.t;hpack_decoder:Hpack.Decoder.t}letdefault_push_handler=Sys.opaque_identity(fun_->Ok(fun__->()))letis_closedt=Reader.is_closedt.reader&&Writer.is_closedt.writerletshutdown_readert=Reader.force_closet.readerletflush_request_bodyt=Scheduler.flusht.streams(t.current_stream_id,t.max_pushed_stream_id)letshutdown_writert=flush_request_bodyt;Writer.closet.writerletshutdown_rwt=shutdown_readert;shutdown_writert(* Handling frames against closed streams is hard. See:
* https://docs.google.com/presentation/d/1iG_U2bKTc9CnKr0jPTrNfmxyLufx_cK2nNh9VjrKH6s
*)letwas_closed_or_implicitly_closedtstream_id=ifStream_identifier.is_requeststream_idthenStream_identifier.(stream_id<=t.current_stream_id)elseStream_identifier.(stream_id<=t.max_pushed_stream_id)letreport_errort=function|Error.ConnectionError(error,data)->ifnott.did_send_go_awaythen((* From RFC7540§5.4.1:
* An endpoint that encounters a connection error SHOULD first send a
* GOAWAY frame (Section 6.8) with the stream identifier of the last
* stream that it successfully received from its peer. The GOAWAY frame
* includes an error code that indicates why the connection is
* terminating. After sending the GOAWAY frame for an error condition,
* the endpoint MUST close the TCP connection. *)letdebug_data=ifString.lengthdata=0thenBigstringaf.emptyelseBigstringaf.of_string~off:0~len:(String.lengthdata)datainletframe_info=Writer.make_frame_infoStream_identifier.connectionin(* TODO: Only write if not already shutdown. *)Writer.write_go_awayt.writerframe_info~debug_data~last_stream_id:(ifStream_identifier.(t.current_stream_id===-1l)thenStream_identifier.connectionelset.current_stream_id)error;iferror<>Error_code.NoErrorthent.error_handler(`Protocol_error(error,data));Writer.flusht.writer(fun()->(* XXX: We need to allow lower numbered streams to complete before
* shutting down. *)shutdown_rwt);t.did_send_go_away<-true;Writer.wakeupt.writer)|StreamError(stream_id,error)->(matchScheduler.findt.streamsstream_idwith|Somerespd->Respd.report_errorrespd(`Protocol_error(error,""))error|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* Possible if the stream was going to enter the Idle state (first time
* we saw e.g. a PRIORITY frame for it) but had e.g. a
* FRAME_SIZE_ERROR. *)letframe_info=Writer.make_frame_infostream_idinWriter.write_rst_streamt.writerframe_infoerror);Writer.wakeupt.writerletreport_connection_errort?(additional_debug_data="")error=report_errort(ConnectionError(error,additional_debug_data))letreport_stream_errortstream_iderror=report_errort(StreamError(stream_id,error))letshutdownt=(* From RFC7540§6.8:
* A server that is attempting to gracefully shut down a connection SHOULD
* send an initial GOAWAY frame with the last stream identifier set to
* 2^31-1 and a NO_ERROR code. *)report_connection_errortError_code.NoErrorletset_error_and_handletstreamerrorerror_code=Respd.report_errorstreamerrorerror_code;Writer.wakeupt.writerletreport_exntexn=ifnot(is_closedt)thenletadditional_debug_data=Printexc.to_stringexninreport_connection_errort~additional_debug_dataError_code.InternalErrorletsend_window_update:typea.t->aScheduler.PriorityTreeNode.node->int->unit=funtstreamn->letsend_window_update_framestream_idn=letvalid_inflow=Scheduler.add_inflowstreamninassertvalid_inflow;letframe_info=Writer.make_frame_infostream_idinWriter.write_window_updatet.writerframe_infoninifn>0then(letmax_window_size=Settings.WindowSize.max_window_sizeinletstream_id=Scheduler.stream_idstreaminletrecloopn=ifn>max_window_sizethen(send_window_update_framestream_idmax_window_size;loop(n-max_window_size))elsesend_window_update_framestream_idninloopn;Writer.wakeupt.writer)lethandle_push_promise_headerstrespdheaders=(* From RFC7540§8.2.2:
* The header fields in PUSH_PROMISE and any subsequent CONTINUATION frames
* MUST be a valid and complete set of request header fields (Section
* 8.1.2.3). *)matchHeaders.method_path_and_scheme_or_malformedheaderswith|`Malformed->(* From RFC7540§8.2.2:
* If a client receives a PUSH_PROMISE that does not include a complete
* and valid set of header fields [...] it MUST respond with a stream
* error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortrespd.Stream.idError_code.ProtocolError|`Valid(meth,path,scheme)->letmeth=Httpaf.Method.of_stringmethin(matchmeth,Headers.get_pseudoheaders"authority",Message.body_lengthheaderswith|(#Httpaf.Method.standardasmeth),_,_whennotHttpaf.Method.(is_cacheablemeth&&is_safemeth)->report_stream_errortrespd.idError_code.ProtocolError|_,_,`Fixedlenwhennot(Int64.equallen0L)->(* From RFC7540§8.2:
* Clients that receive a promised request that is not cacheable,
* that is not known to be safe or that indicates the presence of a
* request body MUST reset the promised stream with a stream error
* (Section 5.4.2) of type PROTOCOL_ERROR.
*
* From RFC7231§4.2.3 (Cacheable Methods):
* [...] this specification defines GET, HEAD, and POST as cacheable
* [...].
*
* From RFC7231§4.2.1 (Safe Methods):
* Of the request methods defined by this specification, the GET, HEAD,
* OPTIONS, and TRACE methods are defined to be safe.
*
* Note: the intersection of safe and cacheable are the GET and HEAD
* methods. *)report_stream_errortrespd.idError_code.ProtocolError(* From RFC7540§8.2:
* The server MUST include a value in the :authority pseudo-header field
* for which the server is authoritative (see Section 10.1). A client
* MUST treat a PUSH_PROMISE for which the server is not authoritative as
* a stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)|_,None,_|_,_,`Error_->report_stream_errortrespd.idError_code.ProtocolError|_->letrequest=Request.create~scheme~headersmethpathin(matcht.push_handlerrequestwith|Okresponse_handler->(* From RFC7540§8.2:
* Promised requests [...] MUST NOT include a request body. *)letrequest_body=Body.emptyin(* From RFC7540§5.1:
* reserved (remote): [...] Receiving a HEADERS frame causes the
* stream to transition to "half-closed (local)". *)respd.state<-Active(HalfClosedStream.WaitingForPeer,{Respd.request;request_body;response_handler;trailers_handler=ignore})|Error_->(* From RFC7540§6.6:
* Recipients of PUSH_PROMISE frames can choose to reject promised
* streams by returning a RST_STREAM referencing the promised stream
* identifier back to the sender of the PUSH_PROMISE. *)Stream.reset_streamrespdError_code.Cancel))lethandle_response_headerststream~end_streamactive_requestheaders=let(Scheduler.Stream{descriptor=respd;_})=streamin(* From RFC7540§8.1.2.6:
* Clients MUST NOT accept a malformed response.
*
* Note: in the branches where a malformed response is detected, the response
* handler is not called. *)matchHeaders.get_multi_pseudoheaders"status"with|[status]->letresponse=Response.create~headers(Status.of_stringstatus)in(* Note: we don't need to check for `end_stream` flag + a non-zero body
* length, as the spec allows for non-zero content-length headers and no
* DATA frames.
*
* From RFC7540§8.1.2.6:
* A response that is defined to have no payload, as described in
* [RFC7230], Section 3.3.2, can have a non-zero content-length header
* field, even though no content is included in DATA frames. *)(matchMessage.body_lengthheaderswith|`Error_->set_error_and_handletrespd(`Invalid_response_body_lengthresponse)ProtocolError|`Fixed_|`Unknown->letresponse_body=ifend_streamthenBody.emptyelseBody.create_reader(Bigstringaf.createt.config.response_body_buffer_size)~done_reading:(funlen->send_window_updatett.streamslen;send_window_updatetstreamlen)inletnew_response_state=Respd.create_active_responseresponseresponse_bodyinrespd.state<-Active((ifStream.is_openrespdthenOpennew_response_stateelseHalfClosednew_response_state),active_request);active_request.response_handlerresponseresponse_body;ifend_streamthen((* Deliver EOF to the response body, as the handler might be waiting
* on it to act. *)Body.close_readerresponse_body;(* From RFC7540§5.1:
* [...] an endpoint receiving an END_STREAM flag causes the stream
* state to become "half-closed (remote)". *)Respd.close_streamrespd))|_->(* From RFC7540§8.1.2.4:
* For HTTP/2 responses, a single :status pseudo-header field is defined
* that carries the HTTP status code field (see [RFC7231], Section 6).
* This pseudo-header field MUST be included in all responses; otherwise,
* the response is malformed (Section 8.1.2.6). *)letmessage="HTTP/2 responses must include a single `:status` pseudo-header"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolErrorlethandle_headerst~end_streamstreamheaders=let(Scheduler.Stream{descriptor=respd;_})=streamin(* From RFC7540§5.1.2:
* Endpoints MUST NOT exceed the limit set by their peer. An endpoint that
* receives a HEADERS frame that causes its advertised concurrent stream
* limit to be exceeded MUST treat this as a stream error (Section 5.4.2)
* of type PROTOCOL_ERROR or REFUSED_STREAM. *)ift.current_server_streams+1>t.config.max_concurrent_streamsthenift.unacked_settings>0then(* From RFC7540§8.1.4:
* The REFUSED_STREAM error code can be included in a RST_STREAM frame
* to indicate that the stream is being closed prior to any processing
* having occurred. Any request that was sent on the reset stream can
* be safely retried.
*
* Note: if there are pending SETTINGS to acknowledge, assume there was a
* race condition and let the client retry. *)report_stream_errortrespd.Stream.idError_code.RefusedStreamelsereport_stream_errortrespd.Stream.idError_code.ProtocolErrorelse((* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_server_streams<-t.current_server_streams+1;matchrespd.statewith|Reserved_->respd.state<-ReservedStream.FullHeaders;handle_push_promise_headerstrespdheaders|Active(active_state,active_request)->(matchactive_statewith|Open_->respd.state<-Active(OpenFullHeaders,active_request)|HalfClosed_->respd.state<-Active(HalfClosedFullHeaders,active_request));handle_response_headerststream~end_streamactive_requestheaders|_->(* Unreachable. This function is only invoked if the stream is active. *)assertfalse)lethandle_headers_blockt?(is_trailers=false)streampartial_headersflagsheaders_block=letopenABinlet(Scheduler.Stream{descriptor=respd;_})=streaminletend_headers=Flags.test_end_headerflagsin(* From RFC7540§6.10:
* An endpoint receiving HEADERS, PUSH_PROMISE, or CONTINUATION
* frames needs to reassemble header blocks and perform decompression
* even if the frames are to be discarded *)letparse_state'=AB.feedpartial_headers.Stream.parse_state(`Bigstringheaders_block)inifend_headersthen(t.receiving_headers_for_stream<-None;letparse_state'=AB.feedparse_state'`Eofinmatchparse_state'with|Done(_,Okheaders)->ifnotis_trailersthen(* `handle_headers` will take care of transitioning the stream state *)letend_stream=partial_headers.end_streaminhandle_headerst~end_streamstreamheaderselseifHeaders.trailers_validheadersthen(Respd.deliver_trailer_headersrespdheaders;letresponse_body=Respd.response_body_exnrespdinBody.close_readerresponse_body)else(* From RFC7540§8.1.2.1:
* Pseudo-header fields MUST NOT appear in trailers. Endpoints MUST
* treat a request or response that contains undefined or invalid
* pseudo-header fields as malformed (Section 8.1.2.6). *)letmessage="Pseudo-header fields must not appear in trailers"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolError(* From RFC7540§4.3:
* A decoding error in a header block MUST be treated as a connection
* error (Section 5.4.1) of type COMPRESSION_ERROR. *)|Done(_,Error_)|Partial_->report_connection_errortError_code.CompressionError|Fail(_,_,message)->report_connection_errort~additional_debug_data:messageError_code.CompressionError)elsepartial_headers.parse_state<-parse_state'lethandle_trailer_headers=handle_headers_block~is_trailers:trueletcreate_partial_headerstflagsheaders_block=letend_headers=Flags.test_end_headerflagsinletheaders_block_length=Bigstringaf.lengthheaders_blockinletinitial_buffer_size=ifend_headersthenheaders_block_lengthelse(* Conservative estimate that there's only going to be one CONTINUATION
* frame. *)2*headers_block_lengthin{Stream.parse_state=AB.parse~initial_buffer_size(Hpack.Decoder.decode_headerst.hpack_decoder);end_stream=Flags.test_end_streamflags}lethandle_first_response_byteststreamactive_requestframe_headerheaders_block=let(Scheduler.Stream{descriptor;_})=streaminlet{Frame.flags;stream_id;_}=frame_headerinletpartial_headers=create_partial_headerstflagsheaders_blockinletremote_state=Stream.PartialHeaderspartial_headersindescriptor.Stream.state<-(ifStream.is_opendescriptorthenActive(Openremote_state,active_request)elseActive(HalfClosedremote_state,active_request));ifnot(Flags.test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;handle_headers_blocktstreampartial_headersflagsheaders_blockletprocess_trailer_headerststreamactive_responseframe_headerheaders_block=let(Scheduler.Stream{descriptor=respd;_})=streaminlet{Frame.stream_id;flags;_}=frame_headerinletend_stream=Flags.test_end_streamflagsinifnotend_streamthen(* From RFC7540§8.1:
* A HEADERS frame (and associated CONTINUATION frames) can only appear
* at the start or end of a stream. An endpoint that receives a HEADERS
* frame without the END_STREAM flag set after receiving a final
* (non-informational) status code MUST treat the corresponding request
* or response as malformed (Section 8.1.2.6). *)letmessage="HEADERS frames containing trailers must set the END_STREAM flag"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolErrorelseletpartial_headers={Stream.parse_state=AB.parse(Hpack.Decoder.decode_headerst.hpack_decoder)(* obviously true at this point. *);end_stream}inactive_response.Respd.trailers_parser<-Somepartial_headers;ifnotFlags.(test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;(* trailer headers: RFC7230§4.4 *)handle_trailer_headerststreampartial_headersflagsheaders_blockletprocess_headers_framet{Frame.frame_header;_}headers_block=let{Frame.stream_id;_}=frame_headerinmatchScheduler.get_nodet.streamsstream_idwith|None->(* If we're receiving a response for a stream that's no longer in the
* priority tree, assume this is a network race - we canceled a request
* but a responnse was already in flight.
*
* However, if the stream identifer is greater than the largest stream
* identifier we have produced, they should know better. In this case,
* send an RST_STREAM. *)ifStream_identifier.(stream_id>=t.current_stream_id&&is_requeststream_id)thenreport_stream_errortstream_idError_code.StreamClosed|Some(Scheduler.Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Idle->(* From RFC7540§6.2:
* HEADERS frames can be sent on a stream in the "idle", "reserved
* (local)", "open", or "half-closed (remote)" state. *)report_connection_errortError_code.ProtocolError|Active((OpenWaitingForPeer|HalfClosedWaitingForPeer),active_request)->handle_first_response_byteststreamactive_requestframe_headerheaders_block|Active((Open(FullHeaders|PartialHeaders_)|HalfClosed(FullHeaders|PartialHeaders_)),_)->assertfalse(* if we're getting a HEADERS frame at this point, they must be
* trailers, and the END_STREAM flag needs to be set. *)|Active((Open(ActiveMessageactive_response)|HalfClosed(ActiveMessageactive_response)),_)->process_trailer_headerststreamactive_responseframe_headerheaders_block|Closed{reason=ResetByThem_;_}->(* From RFC7540§5.1:
* closed: [...] An endpoint that receives any frame other than
* PRIORITY after receiving a RST_STREAM MUST treat that as a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError_code.StreamClosed(* From RFC7540§5.1:
* reserved (local): [...] Receiving any type of frame other than
* RST_STREAM, PRIORITY, or WINDOW_UPDATE on a stream in this state
* MUST be treated as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)|Reserved_|Closed_->(* From RFC7540§5.1:
* Similarly, an endpoint that receives any frames after receiving
* a frame with the END_STREAM flag set MUST treat that as a
* connection error (Section 5.4.1) of type STREAM_CLOSED [...]. *)report_connection_errortError_code.StreamClosed)letprocess_data_framet{Frame.frame_header;_}bstr=letopenSchedulerinlet{Frame.flags;stream_id;payload_length;_}=frame_headerinmatchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Active((Open(ActiveMessageresponse_info)|HalfClosed(ActiveMessageresponse_info)),_)->let{Respd.response;response_body;response_body_bytes;_}=response_infoinresponse_info.response_body_bytes<-Int64.(addresponse_body_bytes(of_int(Bigstringaf.lengthbstr)));(* First, calculate whether we're allowed to receive this frame based
* on the _current_ inflow. *)letallowed_to_receive=Scheduler.(allowed_to_receivet.streamsstreampayload_length)in(* Then, deduct inflow from the connection flow-control window, as
* mandated by the protocol.
*
* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always account
* for its contribution against the connection flow-control window,
* unless the receiver treats this as a connection error (Section 5.4.1).
* This is necessary even if the frame is in error. *)Scheduler.deduct_inflowt.streamspayload_length;ifnotallowed_to_receivethen((* From RFC7540§6.9:
* A receiver MAY respond with a stream error (Section 5.4.2) or
* connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if it
* is unable to accept a frame. *)send_window_updatett.streamspayload_length;report_stream_errortstream_idError_code.FlowControlError)else(Scheduler.deduct_inflowstreampayload_length;matchMessage.body_lengthresponse.headerswith|`Fixedlen(* Getting more than the server declared *)whenInt64.compareresponse_info.response_body_byteslen>0->(* Give back connection-level flow-controlled bytes (we use payload
* length to include any padding bytes that the frame might have
* included - which were ignored at parse time). *)send_window_updatett.streamspayload_length;(* From RFC7540§8.1.2.6:
* A request or response is also malformed if the value of a
* content-length header field does not equal the sum of the
* DATA frame payload lengths that form the body. *)set_error_and_handletdescriptor(`Invalid_response_body_lengthresponse)ProtocolError|_->letend_stream=Flags.test_end_streamflagsin(* From RFC7540§6.9.1:
* The receiver of a frame sends a WINDOW_UPDATE frame as it
* consumes data and frees up space in flow-control windows.
* Separate WINDOW_UPDATE frames are sent for the stream- and
* connection-level flow-control windows.
*
* Note: we send these WINDOW_UPDATE frames once the body bytes
* have been surfaced to the application. This is done in the
* record field `done_reading` of `Body.t`. *)letfaraday=Body.unsafe_faradayresponse_bodyinifnot(Faraday.is_closedfaraday)then(Faraday.schedule_bigstringfaradaybstr;ifend_streamthenBody.close_readerresponse_body);Respd.flush_response_bodydescriptor;ifend_stream&¬(Respd.requires_outputdescriptor)then(* From RFC7540§6.1:
* When set, bit 0 indicates that this frame is the last that
* the endpoint will send for the identified stream. Setting
* this flag causes the stream to enter one of the
* "half-closed" states or the "closed" state (Section 5.1).
*
* Transition to the "closed" state if this is the last DATA frame
* that the server will send and we're done sending. *)Stream.finish_streamdescriptorFinished)|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError(* This is technically in the half-closed (local) state *)|Closed{reason=ResetByUsNoError;_}->(* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always
* account for its contribution against the connection flow-control
* window, unless the receiver treats this as a connection error
* (Section 5.4.1). This is necessary even if the frame is in
* error. *)send_window_updatett.streamspayload_length(* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM.
*
* Note: after some writer yields / wake ups, we will have stopped
* keeping state information for the stream. This functions effectively
* as a way of only accepting frames after an RST_STREAM from us up to
* a time limit. *)|_->send_window_updatett.streamspayload_length;(* From RFC7540§6.1:
* If a DATA frame is received whose stream is not in "open" or
* "half-closed (local)" state, the recipient MUST respond with a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError_code.StreamClosed)|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorleton_close_streamtid~activeclosed=ifactivethen(* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_server_streams<-t.current_server_streams-1;Scheduler.mark_for_removalt.streamsidclosedletprocess_priority_framet{Frame.frame_header;_}priority=let{Frame.stream_id;_}=frame_headerinlet{Priority.stream_dependency;_}=priorityinifStream_identifier.(stream_id===stream_dependency)then(* From RFC7540§5.3.1:
* A stream cannot depend on itself. An endpoint MUST treat this as a
* stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortstream_idError_code.ProtocolErrorelsematchScheduler.get_nodet.streamsstream_idwith|Somestream->Scheduler.reprioritize_streamt.streams~prioritystream|None->(* From RFC7540§5.3:
* A client can assign a priority for a new stream by including
* prioritization information in the HEADERS frame (Section 6.2) that
* opens the stream. At any other time, the PRIORITY frame (Section
* 6.3) can be used to change the priority of a stream.
*
* Note: The spec mostly only mentions that clients are the endpoints
* that make use of PRIORITY frames. As such, we don't make too
* much of an effort to process PRIORITY frames coming from a
* server. If we know about a stream, we reprioritize it (meaning
* prioritization is an input to the process of allocating
* resources when flushing request bodies). Otherwise, we ignore
* it. We don't, however, report any errors if the frame is
* well-formed, as section 5. clearly mentions that PRIORITY frames
* must be accepted in all stream states.
*
* From RFC7540§5.1:
* Note that PRIORITY can be sent and received in any stream state. *)()letprocess_rst_stream_framet{Frame.frame_header;_}error_code=let{Frame.stream_id;_}=frame_headerinmatchScheduler.findt.streamsstream_idwith|Somerespd->(matchrespd.state,error_codewith|Idle,_->(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError|Closed_,Error_code.NoError->(* From RFC7540§8.1:
* A server can send a complete response prior to the client sending an
* entire request if the response does not depend on any portion of the
* request that has not been sent and received. When this is true, a
* server MAY request that the client abort transmission of a request
* without error by sending a RST_STREAM with an error code of NO_ERROR
* after sending a complete response (i.e., a frame with the END_STREAM
* flag).
*
* If we're done sending the request there's nothing to do here, allow
* the stream to finish successfully.
*)(* XXX(anmonteiro): When we add logging support, add something here. *)()|Active_,Error_code.NoError->(* If we're active (i.e. not done sending the request body), finish the
* stream, in order to mark it for cleanup.
*
* Note: we don't close the request body here because the client may be
* in the process of writing to it, and while we're not going to send
* those bytes to the output channel, we don't want to fail when writing
* either. *)Stream.finish_streamrespd(ResetByThemerror_code)|_->(* From RFC7540§6.4:
* The RST_STREAM frame fully terminates the referenced stream and
* causes it to enter the "closed" state. After receiving a
* RST_STREAM on a stream, the receiver MUST NOT send additional
* frames for that stream, with the exception of PRIORITY.
*
* Note:
* This match branch also accepts streams in the `Closed` state. We
* do that to comply with the following:
*
* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM. *)Stream.finish_streamrespd(ResetByThemerror_code);(* From RFC7540§5.4.2:
* To avoid looping, an endpoint MUST NOT send a RST_STREAM in response
* to a RST_STREAM frame.
*
* Note: the {!Respd.report_error} function does not send an RST_STREAM
* frame for streams in the closed state. So we close the stream before
* reporting the error. *)set_error_and_handletrespd(`Protocol_error(error_code,""))error_code)|None->(* We might have removed the stream from the hash table. If its stream
* id is smaller than or equal to the max client stream id we've generated,
* then it must have been closed. *)ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR.
*
* Note:
* If we didn't find the stream in the hash table it must be "idle". *)report_connection_errortError_code.ProtocolErrorletprocess_settings_framet{Frame.frame_header;_}settings=letopenSchedulerinlet{Frame.flags;_}=frame_headerin(* We already checked that an acked SETTINGS is empty. Don't need to do
* anything else in that case *)ifFlags.(test_ackflags)then(t.unacked_settings<-t.unacked_settings-1;ift.unacked_settings<0then(* The server is ACKing a SETTINGS frame that we didn't send *)letadditional_debug_data="Received SETTINGS with ACK but no ACK was pending"inreport_connection_errort~additional_debug_dataError_code.ProtocolError)elsematchSettings.check_settings_list~is_client:truesettingswith|Ok()->(* From RFC7540§6.5:
* Each parameter in a SETTINGS frame replaces any existing value for
* that parameter. Parameters are processed in the order in which they
* appear, and a receiver of a SETTINGS frame does not need to maintain
* any state other than the current value of its parameters. *)letnew_settings=List.fold_left(fun(acc:Settings.t)item->matchitemwith|Settings.HeaderTableSize,x->(* From RFC7540§6.5.2:
* Allows the sender to inform the remote endpoint of the maximum
* size of the header compression table used to decode header
* blocks, in octets. *)Hpack.Encoder.set_capacityt.hpack_encoderx;{accwithheader_table_size=x}|EnablePush,x->(* We've already verified that this setting is either 0 or 1 in the
* call to `Settings.check_settings_list` above. *){accwithenable_push=x=1}|MaxConcurrentStreams,x->{accwithmax_concurrent_streams=x}|InitialWindowSize,new_val->(* From RFC7540§6.9.2:
* In addition to changing the flow-control window for streams
* that are not yet active, a SETTINGS frame can alter the
* initial flow-control window size for streams with active
* flow-control windows (that is, streams in the "open" or
* "half-closed (remote)" state). When the value of
* SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
* the size of all stream flow-control windows that it
* maintains by the difference between the new value and the
* old value.
*
* [...] A SETTINGS frame cannot alter the connection
* flow-control window. *)letold_val=t.settings.initial_window_sizeinletgrowth=new_val-old_valinletexceptionLocalin(matchScheduler.iter~f:(funstream->(* From RFC7540§6.9.2:
* An endpoint MUST treat a change to
* SETTINGS_INITIAL_WINDOW_SIZE that causes any
* flow-control window to exceed the maximum size as a
* connection error (Section 5.4.1) of type
* FLOW_CONTROL_ERROR. *)ifnot(Scheduler.add_flowstreamgrowth)thenraiseLocal)t.streamswith|()->()|exceptionLocal->report_connection_errort~additional_debug_data:(Format.sprintf"Window size for stream would exceed %d"Settings.WindowSize.max_window_size)Error_code.FlowControlError);{accwithinitial_window_size=new_val}|MaxFrameSize,x->Scheduler.iter~f:(fun(Stream{descriptor;_})->ifRespd.requires_outputdescriptorthendescriptor.max_frame_size<-x)t.streams;{accwithmax_frame_size=x}|MaxHeaderListSize,x->{accwithmax_header_list_size=Somex})t.settingssettingsint.settings<-new_settings;letframe_info=Writer.make_frame_info~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.5:
* ACK (0x1): [...] When this bit is set, the payload of the SETTINGS
* frame MUST be empty. *)Writer.write_settingst.writerframe_info[];t.unacked_settings<-t.unacked_settings+1;Writer.wakeupt.writer|Errorerror->report_errorterrorletreserve_streamt{Frame.frame_header;_}promised_stream_idheaders_block=let{Frame.flags;stream_id;_}=frame_headerin(* From RFC7540§6.6:
* The PUSH_PROMISE frame (type=0x5) is used to notify the peer endpoint in
* advance of streams the sender intends to initiate. *)letrespd=Stream.createpromised_stream_id~max_frame_size:t.settings.max_frame_sizet.writert.error_handler(on_close_streamtpromised_stream_id)in(* From RFC7540§5.3.5:
* All streams are initially assigned a non-exclusive dependency on stream
* 0x0. Pushed streams (Section 8.2) initially depend on their associated
* stream. In both cases, streams are assigned a default weight of 16. *)letstream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority:{Priority.default_prioritywithstream_dependency=stream_id}~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizerespdinletpartial_headers=create_partial_headerstflagsheaders_blockinrespd.state<-Reserved(PartialHeaderspartial_headers);ifnot(Flags.test_end_headerflags)thent.receiving_headers_for_stream<-Somepromised_stream_id;handle_headers_blocktstreampartial_headersflagsheaders_blockletprocess_push_promise_framet({Frame.frame_header;_}asframe)promised_stream_idheaders_block=let{Frame.stream_id;_}=frame_headerin(* At this point, `promised_stream_id` has already been validated by the
* parser *)ifnott.settings.enable_pushthen(* From RFC7540§6.6:
* PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH setting of
* the peer endpoint is set to 0. An endpoint that has set this setting
* and has received acknowledgement MUST treat the receipt of a
* PUSH_PROMISE frame as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)letadditional_debug_data="Push is not enabled for the connection"inreport_connection_errort~additional_debug_dataError_code.ProtocolErrorelseifnotStream_identifier.(promised_stream_id>t.max_pushed_stream_id)then(* From RFC7540§6.6:
* A receiver MUST treat the receipt of a PUSH_PROMISE that promises an
* illegal stream identifier (Section 5.1.1) as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. Note that an illegal stream
* identifier is an identifier for a stream that is not currently in the
* "idle" state. *)letadditional_debug_data="Illegal stream identifier promised by PUSH_PROMISE"inreport_connection_errort~additional_debug_dataError_code.ProtocolErrorelseletsend_connection_error()=letadditional_debug_data="Received PUSH_PROMISE on a stream that is neither open nor \
half-closed (local)"inreport_connection_errort~additional_debug_dataError_code.ProtocolErrorint.max_pushed_stream_id<-promised_stream_id;matchScheduler.findt.streamsstream_idwith|None->(* From RFC7540§6.6:
* A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that
* is neither "open" nor "half-closed (local)" as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)send_connection_error()|Somerespd->(matchrespd.statewith|Active((Open_|HalfClosed_),_)->reserve_streamtframepromised_stream_idheaders_block|_->send_connection_error())letprocess_ping_framet{Frame.frame_header;_}payload=let{Frame.flags;_}=frame_headerin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response. [...] An endpoint MUST NOT respond to PING frames containing
* this flag. *)ifFlags.test_ackflagsthenmatchQueue.take_optt.pending_pingswith|Somecallback->callback()|None->(* server is ACKing a PING that we didn't send? *)letadditional_debug_data="Unexpected PING acknowledgement"inreport_connection_errort~additional_debug_dataError_code.ProtocolErrorelse(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. PING responses SHOULD be given higher priority than any other
* frame. *)letframe_info=Writer.make_frame_info(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a
* PING response. An endpoint MUST set this flag in PING
* responses. *)~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. *)Writer.write_pingt.writerframe_infopayload;Writer.wakeupt.writerletprocess_goaway_framet_framepayload=let_last_stream_id,_error,debug_data=payloadinletlen=Bigstringaf.lengthdebug_datainletbytes=Bytes.createleninBigstringaf.unsafe_blit_to_bytesdebug_data~src_off:0bytes~dst_off:0~len;(* TODO(anmonteiro): I think we need to allow lower numbered streams to
* complete. *)shutdown_rwtletadd_window_increment:typea.t->aScheduler.PriorityTreeNode.node->int->unit=funtstreamincrement->letopenSchedulerinletdid_add=Scheduler.add_flowstreamincrementinletstream_id=Scheduler.stream_idstreaminletnew_flow=matchstreamwith|Connection{flow;_}->flow|Stream{flow;_}->flowinifdid_addthen(ifnew_flow>0then(* Don't bother waking up the writer if the new flow doesn't allow
* the stream to write. *)Writer.wakeupt.writer)elseifStream_identifier.is_connectionstream_idthenreport_connection_errort~additional_debug_data:(Printf.sprintf"Window size for stream would exceed %d"Settings.WindowSize.max_window_size)Error_code.FlowControlErrorelsereport_stream_errortstream_idError_code.FlowControlErrorletprocess_window_update_framet{Frame.frame_header;_}window_increment=letopenSchedulerinlet{Frame.stream_id;_}=frame_headerin(* From RFC7540§6.9:
* The WINDOW_UPDATE frame can be specific to a stream or to the entire
* connection. In the former case, the frame's stream identifier indicates
* the affected stream; in the latter, the value "0" indicates that the
* entire connection is the subject of the frame. *)ifStream_identifier.is_connectionstream_idthenadd_window_incrementtt.streamswindow_incrementelsematchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream_node)->(matchdescriptor.statewith|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError|Active_(* From RFC7540§5.1:
* reserved (local): [...] A PRIORITY or WINDOW_UPDATE frame MAY be
* received in this state. *)|Reserved_->add_window_incrementtstream_nodewindow_increment|Closed_->(* From RFC7540§5.1:
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received
* in this state, though endpoints MAY choose to treat frames that
* arrive a significant time after sending END_STREAM as a connection
* error (Section 5.4.1) of type PROTOCOL_ERROR. *)())|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorletprocess_continuation_framet{Frame.frame_header;_}headers_block=let{Frame.stream_id;flags;_}=frame_headerinmatchScheduler.get_nodet.streamsstream_idwith|Some(Scheduler.Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Active((Open(PartialHeaderspartial_headers)|HalfClosed(PartialHeaderspartial_headers)),_)->handle_headers_blocktstreampartial_headersflagsheaders_block|Active((Open(ActiveMessage{trailers_parser=Somepartial_headers;_})|HalfClosed(ActiveMessage{trailers_parser=Somepartial_headers;_})),_)->handle_trailer_headerststreampartial_headersflagsheaders_block|_->(* TODO: maybe need to handle the case where the stream has been closed
* due to a stream error. *)(* From RFC7540§6.10:
* A RST_STREAM is the last frame that an endpoint can send on a
* stream. The peer that sends the RST_STREAM frame MUST be prepared
* to receive any frames that were sent or enqueued for sending by
* the remote peer. These frames can be ignored, except where they
* modify connection state (such as the state maintained for header
* compression (Section 4.3) or flow control). *)report_connection_errortError_code.ProtocolError)|None->(* From RFC7540§6.10:
* A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or
* CONTINUATION frame without the END_HEADERS flag set. A recipient that
* observes violation of this rule MUST respond with a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError(* From RFC7540§1:
* HTTP/2 [...] allows interleaving of request and response messages on the
* same connection and uses an efficient coding for HTTP header fields. *)let[@ocaml.warning"-16"]create?(config=Config.default)?push_handler~error_handler=letpush_handler=matchpush_handlerwith|Somepush_handler->push_handler|None->default_push_handlerinletsettings={(Config.to_settingsconfig)with(* If the caller is not going to process PUSH_PROMISE frames, just
* disable it. *)enable_push=config.enable_server_push&&push_handler!=default_push_handler}inletrecconnection_preface_handlerrecv_framesettings_list=lett=Lazy.forcetin(* Now process the client's SETTINGS frame. `process_settings_frame` will
* take care of calling `wakeup_writer`. *)process_settings_frametrecv_framesettings_listandframe_handlerr=lett=Lazy.forcetinmatchrwith|Errore->report_errorte|Ok({Frame.frame_payload;frame_header}asframe)->(matcht.receiving_headers_for_streamwith|Somestream_idwhen(notStream_identifier.(stream_id===frame_header.stream_id))||frame_header.frame_type<>Continuation->(* From RFC7540§6.2:
* A HEADERS frame without the END_HEADERS flag set MUST be followed
* by a CONTINUATION frame for the same stream. A receiver MUST treat
* the receipt of any other type of frame or a frame on a different
* stream as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)report_connection_errort~additional_debug_data:"HEADERS or PUSH_PROMISE without the END_HEADERS flag set must be \
followed by a CONTINUATION frame for the same stream"Error_code.ProtocolError|_->(matchframe_payloadwith|Headers(_priority,headers_block)->process_headers_frametframeheaders_block|Databs->process_data_frametframebs|Prioritypriority->process_priority_frametframepriority|RSTStreamerror_code->process_rst_stream_frametframeerror_code|Settingssettings->process_settings_frametframesettings|PushPromise(promised_stream_id,bs)->process_push_promise_frametframepromised_stream_idbs|Pingdata->process_ping_frametframedata|GoAway(last_stream_id,error,debug_data)->process_goaway_frametframe(last_stream_id,error,debug_data)|WindowUpdatewindow_size->process_window_update_frametframewindow_size|Continuationheaders_block->process_continuation_frametframeheaders_block|Unknown_->(* From RFC7540§5.1:
* Frames of unknown types are ignored. *)()))andt=lazy{settings;config(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers *);current_stream_id=-1l;max_pushed_stream_id=0l;current_server_streams=0;receiving_headers_for_stream=None;did_send_go_away=false;unacked_settings=0;pending_pings=Queue.create();error_handler;push_handler;reader=Reader.client_frames~max_frame_size:settings.max_frame_sizeconnection_preface_handlerframe_handler;writer=Writer.createsettings.max_frame_size;streams=Scheduler.make_root()(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder=Hpack.Encoder.(createsettings.header_table_size);hpack_decoder=Hpack.Decoder.(createsettings.header_table_size)}inlett=Lazy.forcetin(* Check if the settings for the connection are different than the default
* HTTP/2 settings. In the event that they are, we need to send a non-empty
* SETTINGS frame advertising our configuration. *)letsettings=Settings.settings_for_the_connectiont.settingsin(* From RFC7540§6.9.2:
* When an HTTP/2 connection is first established, new streams are created
* with an initial flow-control window size of 65,535 octets. The
* connection flow-control window is also 65,535 octets.
*
* XXX(anmonteiro): the starting setting for the initial window size for
* _sending_ is the default of 65535 octets. We're effectively overwriting it
* here to enforce this default after abusing the settings implementation to
* send our (receiving) in-flow setting to the peer. Throughout other parts
* of the code we (should) refer to it through
* [t.config.initial_window_size]. This should probably be cleaned up in the
* future. *)t.settings<-{t.settingswithinitial_window_size=Settings.default.initial_window_size};(* Send the client connection preface *)Writer.write_connection_prefacet.writersettings;(* If a higher value for initial window size is configured, add more tokens
* to the connection (we have no streams at this point) -- the peer is
* allowed to send more than the defaults.
*
* From RFC7540§6.9.2:
* The connection flow-control window can only be changed using
* WINDOW_UPDATE frames. *)(ift.config.initial_window_size>Settings.default.initial_window_sizethenletdiff=t.config.initial_window_size-Settings.default.initial_window_sizeinsend_window_updatett.streamsdiff);tletcreate_and_add_streamt~error_handler=letmax_frame_size=t.settings.max_frame_sizeint.current_stream_id<-Int32.addt.current_stream_id2l;letstream_id=t.current_stream_idinletrespd=Stream.createstream_id~max_frame_sizet.writererror_handler(on_close_streamtstream_id)in(* TODO: custom priority *)let_stream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority:Priority.default_priority~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizerespdinrespd(* Meant to be called after receiving an HTTP/1.1 `101 Switching_protocols`
* response upgrading to HTTP/2. *)letcreate_h2c?config?push_handler~http_request~error_handler(response_handler,response_error_handler)=let{Httpaf.Request.target;meth;_}=http_requestinmatchHeaders.of_http1http_requestwith|Okheaders->(* From RFC7540§3.2:
* Upon receiving the 101 response, the client MUST send a connection
* preface (Section 3.5), which includes a SETTINGS frame. *)lett=create?config?push_handler~error_handlerinletrespd=create_and_add_streamt~error_handler:response_error_handlerinassert(Stream_identifier.(t.current_stream_id===1l));assert(Stream_identifier.(respd.id===1l));letrequest=Request.create~headers~scheme:"http"methtargetin(* From RFC7540§3.2:
* The HTTP/1.1 request that is sent prior to upgrade is assigned a
* stream identifier of 1 (see Section 5.1.1) with default priority
* values (Section 5.3.5). Stream 1 is implicitly "half-closed" from the
* client toward the server (see Section 5.1), since the request is
* completed as an HTTP/1.1 request. *)respd.state<-Active(HalfClosedWaitingForPeer,{request(* The request body is no more than a placeholder. The HTTP/1.1
* connection that we're upgrading from already sent it to the
* server. Application code knows what it is.
*
* From RFC7540§3.2:
* Requests that contain a payload body MUST be sent in their
* entirety before the client can send HTTP/2 frames. This means
* that a large request can block the use of the connection until
* it is completely sent. *);request_body=Body.empty;response_handler;trailers_handler=ignore});Writer.wakeupt.writer;Okt|Errormsg->Errormsgletrequestt?(trailers_handler=ignore)request~error_handler~response_handler=letmax_frame_size=t.settings.max_frame_sizeinletrespd=create_and_add_streamt~error_handlerinletrequest_body=Body.create_writer(Bigstringaf.createmax_frame_size)~ready_to_write:(fun()->Writer.wakeupt.writer)inletframe_info=Writer.make_frame_info~max_frame_size:t.settings.max_frame_size~flags:Flags.default_flagsrespd.idinWriter.write_request_headerst.writert.hpack_encoder~priority:(* TODO: allow setting the priority of the request. *)Priority.default_priorityframe_inforequest;Writer.flusht.writer(fun()->respd.state<-Active(OpenWaitingForPeer,{request;request_body;response_handler;trailers_handler}));Writer.wakeupt.writer;(* Closing the request body puts the stream in the half-closed (local) state.
* This is handled by {!Respd.flush_request_body}, which transitions the
* state once it verifies that there's no more data to send for the
* stream. *)request_body(* XXX: we store PING callbacks in FIFO order. Would it ever be the case that
* the receipt of a PING frame acknowledges a later callback? If so, we'd need
* to disallow sending custom PING payloads and generate a random payload that
* we store in a Hashtbl. *)letpingt?payload?(off=0)callback=letpayload=matchpayloadwith|None->Serialize.default_ping_payload|Somepayload->ifBigstringaf.lengthpayload-off<8thenfailwith"PING payload must have at least 8 octets in length";payloadin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response.
*
* Note: this is not a PING response, quite the opposite, so we don't set
* the ACK flag. *)letframe_info=Writer.make_frame_infoStream_identifier.connectioninQueue.addcallbackt.pending_pings;Writer.write_pingt.writerframe_info~offpayload;Writer.wakeupt.writerletnext_read_operationt=ifReader.is_closedt.readerthenshutdown_readert;matchReader.nextt.readerwith|(`Read|`Close)asoperation->operation|`Errore->report_errorte;(matchewith|ConnectionError_->(* From RFC7540§5.4.1:
* A connection error is any error that prevents further processing
* of the frame layer or corrupts any connection state. *)`Close|StreamError_->(* From RFC7540§5.4.2:
* A stream error is an error related to a specific stream that does
* not affect processing of other streams. *)`Read)letreadtbs~off~len=Reader.read_with_moret.readerbs~off~lenIncompleteletread_eoftbs~off~len=Reader.read_with_moret.readerbs~off~lenComplete(* XXX(anmonteiro): this function is here to please the Gluten `RUNTIME`
* interface.
*
* We don't expect this function to ever be called. H2 never issues `Yield`
* commands because the connection is multiplexed, and it's therefore always
* looking to read frames from the peer. *)letyield_reader_tk=k()letnext_write_operationt=flush_request_bodyt;Writer.nextt.writerletyield_writertk=Writer.on_wakeup_writert.writerkletreport_write_resulttresult=Writer.report_resultt.writerresult