1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486(*----------------------------------------------------------------------------
* Copyright (c) 2017 Inhabited Type LLC.
* Copyright (c) 2019 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleAB=Angstrom.BufferedmoduleReader=Parse.ReadermoduleWriter=Serialize.WritermoduleStreamsTbl=Scheduler.StreamsTblmoduleScheduler=Scheduler.Make(structincludeStreamtypet=Respd.tletflush_write_body=Respd.flush_request_bodyletrequires_output=Respd.requires_outputend)moduleQueue=structincludeQueuelettake_optt=matchis_emptytwithtrue->None|false->Some(taket)endtypeerror=Respd.errortypetrailers_handler=Headers.t->unittyperesponse_handler=Response.t->Body.Reader.t->unittypeerror_handler=error->unittypet={mutablesettings:Settings.t;reader:Reader.frame;writer:Writer.t;config:Config.t;streams:Scheduler.t;mutablecurrent_stream_id:Stream_identifier.t;mutablemax_pushed_stream_id:Stream_identifier.t;mutablecurrent_server_streams:int;mutablereceiving_headers_for_stream:Stream_identifier.toption;mutabledid_send_go_away:bool;mutableunacked_settings:int;pending_pings:((unit,[`EOF])result->unit)Queue.t;error_handler:error->unit;push_handler:Request.t->(response_handler,unit)result(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder:Hpack.Encoder.t;hpack_decoder:Hpack.Decoder.t}letdefault_push_handler=Sys.opaque_identity(fun_->Ok(fun__->()))letis_closedt=Reader.is_closedt.reader&&Writer.is_closedt.writerletshutdown_readert=Reader.force_closet.readerletflush_request_bodyt=Scheduler.flusht.streams(t.current_stream_id,t.max_pushed_stream_id)letshutdown_writert=flush_request_bodyt;Writer.closet.writerletshutdown_rwt=shutdown_readert;shutdown_writert(* Handling frames against closed streams is hard. See:
* https://docs.google.com/presentation/d/1iG_U2bKTc9CnKr0jPTrNfmxyLufx_cK2nNh9VjrKH6s
*)letwas_closed_or_implicitly_closedtstream_id=ifStream_identifier.is_requeststream_idthenStream_identifier.(stream_id<=t.current_stream_id)elseStream_identifier.(stream_id<=t.max_pushed_stream_id)letreport_errort=function|Error.ConnectionError(error,data)->ifnott.did_send_go_awaythen((* From RFC7540§5.4.1:
* An endpoint that encounters a connection error SHOULD first send a
* GOAWAY frame (Section 6.8) with the stream identifier of the last
* stream that it successfully received from its peer. The GOAWAY frame
* includes an error code that indicates why the connection is
* terminating. After sending the GOAWAY frame for an error condition,
* the endpoint MUST close the TCP connection. *)letdebug_data=ifString.lengthdata=0thenBigstringaf.emptyelseBigstringaf.of_string~off:0~len:(String.lengthdata)datainletframe_info=Writer.make_frame_infoStream_identifier.connectionin(* TODO: Only write if not already shutdown. *)Writer.write_go_awayt.writerframe_info~debug_data~last_stream_id:(ifStream_identifier.(t.current_stream_id===-1l)thenStream_identifier.connectionelset.current_stream_id)error;t.did_send_go_away<-true;iferror<>Error_code.NoErrorthent.error_handler(`Protocol_error(error,data));Writer.flusht.writer(fun()->(* XXX: We need to allow lower numbered streams to complete before
* shutting down. *)shutdown_rwt);Writer.wakeupt.writer)|StreamError(stream_id,error)->(matchScheduler.findt.streamsstream_idwith|Somerespd->Respd.report_errorrespd(`Protocol_error(error,""))error|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* Possible if the stream was going to enter the Idle state (first time
* we saw e.g. a PRIORITY frame for it) but had e.g. a
* FRAME_SIZE_ERROR. *)letframe_info=Writer.make_frame_infostream_idinWriter.write_rst_streamt.writerframe_infoerror);Writer.wakeupt.writerletreport_connection_errort?(reason="")error=report_errort(ConnectionError(error,reason))letreport_stream_errortstream_iderror=report_errort(StreamError(stream_id,error))letshutdownt=(* From RFC7540§6.8:
* A server that is attempting to gracefully shut down a connection SHOULD
* send an initial GOAWAY frame with the last stream identifier set to
* 2^31-1 and a NO_ERROR code. *)report_connection_errortError_code.NoErrorletset_error_and_handletstreamerrorerror_code=Respd.report_errorstreamerrorerror_code;Writer.wakeupt.writerletreport_exntexn=ifnot(is_closedt)thenletreason=Printexc.to_stringexninreport_connection_errort~reasonError_code.InternalErrorletsend_window_update:typea.t->aScheduler.PriorityTreeNode.node->int32->unit=funtstreamn->letsend_window_update_framestream_idn=letvalid_inflow=Scheduler.add_inflowstreamninassertvalid_inflow;letframe_info=Writer.make_frame_infostream_idinWriter.write_window_updatet.writerframe_infoninifInt32.comparen0l>0then(letmax_window_size=Settings.WindowSize.max_window_sizeinletstream_id=Scheduler.stream_idstreaminletrecloopn=ifn>max_window_sizethen(send_window_update_framestream_idmax_window_size;loop(Int32.subnmax_window_size))elsesend_window_update_framestream_idninloopn;Writer.wakeupt.writer)lethandle_push_promise_headerstrespdheaders=(* From RFC7540§8.2.2:
* The header fields in PUSH_PROMISE and any subsequent CONTINUATION frames
* MUST be a valid and complete set of request header fields (Section
* 8.1.2.3). *)matchHeaders.method_path_and_scheme_or_malformedheaderswith|`Malformed->(* From RFC7540§8.2.2:
* If a client receives a PUSH_PROMISE that does not include a complete
* and valid set of header fields [...] it MUST respond with a stream
* error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortrespd.Stream.idError_code.ProtocolError|`Valid(meth,path,scheme)->letmeth=Httpaf.Method.of_stringmethin(matchmeth,Headers.get_pseudoheaders"authority",Message.body_lengthheaderswith|(#Httpaf.Method.standardasmeth),_,_whennotHttpaf.Method.(is_cacheablemeth&&is_safemeth)->report_stream_errortrespd.idError_code.ProtocolError|_,_,`Fixedlenwhennot(Int64.equallen0L)->(* From RFC7540§8.2:
* Clients that receive a promised request that is not cacheable,
* that is not known to be safe or that indicates the presence of a
* request body MUST reset the promised stream with a stream error
* (Section 5.4.2) of type PROTOCOL_ERROR.
*
* From RFC7231§4.2.3 (Cacheable Methods):
* [...] this specification defines GET, HEAD, and POST as cacheable
* [...].
*
* From RFC7231§4.2.1 (Safe Methods):
* Of the request methods defined by this specification, the GET, HEAD,
* OPTIONS, and TRACE methods are defined to be safe.
*
* Note: the intersection of safe and cacheable are the GET and HEAD
* methods. *)report_stream_errortrespd.idError_code.ProtocolError(* From RFC7540§8.2:
* The server MUST include a value in the :authority pseudo-header field
* for which the server is authoritative (see Section 10.1). A client
* MUST treat a PUSH_PROMISE for which the server is not authoritative as
* a stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)|_,None,_|_,_,`Error_->report_stream_errortrespd.idError_code.ProtocolError|_->letrequest=Request.create~scheme~headersmethpathin(matcht.push_handlerrequestwith|Okresponse_handler->(* From RFC7540§8.2:
* Promised requests [...] MUST NOT include a request body. *)letrequest_body=Body.Writer.create_empty~writer:t.writerin(* From RFC7540§5.1:
* reserved (remote): [...] Receiving a HEADERS frame causes the
* stream to transition to "half-closed (local)". *)respd.state<-Active(HalfClosedStream.WaitingForPeer,{Respd.request;request_body;response_handler;trailers_handler=ignore})|Error_->(* From RFC7540§6.6:
* Recipients of PUSH_PROMISE frames can choose to reject promised
* streams by returning a RST_STREAM referencing the promised stream
* identifier back to the sender of the PUSH_PROMISE. *)Stream.reset_streamrespdError_code.Cancel))lethandle_response_headerststream~end_streamactive_requestheaders=let(Scheduler.Stream{descriptor=respd;_})=streamin(* From RFC7540§8.1.2.6:
* Clients MUST NOT accept a malformed response.
*
* Note: in the branches where a malformed response is detected, the response
* handler is not called. *)matchHeaders.get_multi_pseudoheaders"status"with|[status]->letresponse=Response.create~headers(Status.of_stringstatus)in(* Note: we don't need to check for `end_stream` flag + a non-zero body
* length, as the spec allows for non-zero content-length headers and no
* DATA frames.
*
* From RFC7540§8.1.2.6:
* A response that is defined to have no payload, as described in
* [RFC7230], Section 3.3.2, can have a non-zero content-length header
* field, even though no content is included in DATA frames. *)(matchMessage.body_lengthheaderswith|`Error_->set_error_and_handletrespd(`Invalid_response_body_lengthresponse)ProtocolError|`Fixed_|`Unknown->letresponse_body=ifend_streamthenBody.Reader.emptyelseBody.Reader.create(Bigstringaf.createt.config.response_body_buffer_size)~done_reading:(funlen->letlen=Int32.of_intleninsend_window_updatett.streamslen;send_window_updatetstreamlen)inletnew_response_state=Respd.create_active_responseresponseresponse_bodyinrespd.state<-Active((ifStream.is_openrespdthenOpennew_response_stateelseHalfClosednew_response_state),active_request);active_request.response_handlerresponseresponse_body;ifend_streamthen((* Deliver EOF to the response body, as the handler might be waiting
* on it to act. *)Body.Reader.closeresponse_body;(* From RFC7540§5.1:
* [...] an endpoint receiving an END_STREAM flag causes the stream
* state to become "half-closed (remote)". *)Respd.close_streamrespd))|_->(* From RFC7540§8.1.2.4:
* For HTTP/2 responses, a single :status pseudo-header field is defined
* that carries the HTTP status code field (see [RFC7231], Section 6).
* This pseudo-header field MUST be included in all responses; otherwise,
* the response is malformed (Section 8.1.2.6). *)letmessage="HTTP/2 responses must include a single `:status` pseudo-header"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolErrorlethandle_headerst~end_streamstreamheaders=let(Scheduler.Stream{descriptor=respd;_})=streamin(* From RFC7540§5.1.2:
* Endpoints MUST NOT exceed the limit set by their peer. An endpoint that
* receives a HEADERS frame that causes its advertised concurrent stream
* limit to be exceeded MUST treat this as a stream error (Section 5.4.2)
* of type PROTOCOL_ERROR or REFUSED_STREAM. *)ifInt32.(compare(of_int(t.current_server_streams+1))t.config.max_concurrent_streams)>0thenift.unacked_settings>0then(* From RFC7540§8.1.4:
* The REFUSED_STREAM error code can be included in a RST_STREAM frame
* to indicate that the stream is being closed prior to any processing
* having occurred. Any request that was sent on the reset stream can
* be safely retried.
*
* Note: if there are pending SETTINGS to acknowledge, assume there was a
* race condition and let the client retry. *)report_stream_errortrespd.Stream.idError_code.RefusedStreamelsereport_stream_errortrespd.Stream.idError_code.ProtocolErrorelse((* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_server_streams<-t.current_server_streams+1;matchrespd.statewith|Reserved_->respd.state<-ReservedStream.FullHeaders;handle_push_promise_headerstrespdheaders|Active(active_state,active_request)->(matchactive_statewith|Open_->respd.state<-Active(OpenFullHeaders,active_request)|HalfClosed_->respd.state<-Active(HalfClosedFullHeaders,active_request));handle_response_headerststream~end_streamactive_requestheaders|_->(* Unreachable. This function is only invoked if the stream is active. *)assertfalse)lethandle_headers_blockt?(is_trailers=false)streampartial_headersflagsheaders_block=letopenABinlet(Scheduler.Stream{descriptor=respd;_})=streaminletend_headers=Flags.test_end_headerflagsin(* From RFC7540§6.10:
* An endpoint receiving HEADERS, PUSH_PROMISE, or CONTINUATION
* frames needs to reassemble header blocks and perform decompression
* even if the frames are to be discarded *)letparse_state'=AB.feedpartial_headers.Stream.parse_state(`Bigstringheaders_block)inifend_headersthen(t.receiving_headers_for_stream<-None;letparse_state'=AB.feedparse_state'`Eofinmatchparse_state'with|Done(_,Okheaders)->ifnotis_trailersthen(* `handle_headers` will take care of transitioning the stream state *)letend_stream=partial_headers.end_streaminhandle_headerst~end_streamstreamheaderselseifHeaders.trailers_validheadersthen(Respd.deliver_trailer_headersrespdheaders;letresponse_body=Respd.response_body_exnrespdinBody.Reader.closeresponse_body)else(* From RFC7540§8.1.2.1:
* Pseudo-header fields MUST NOT appear in trailers. Endpoints MUST
* treat a request or response that contains undefined or invalid
* pseudo-header fields as malformed (Section 8.1.2.6). *)letmessage="Pseudo-header fields must not appear in trailers"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolError(* From RFC7540§4.3:
* A decoding error in a header block MUST be treated as a connection
* error (Section 5.4.1) of type COMPRESSION_ERROR. *)|Done(_,Error_)|Partial_->report_connection_errortError_code.CompressionError|Fail(_,_,message)->report_connection_errort~reason:messageError_code.CompressionError)elsepartial_headers.parse_state<-parse_state'lethandle_trailer_headers=handle_headers_block~is_trailers:trueletcreate_partial_headerstflagsheaders_block=letend_headers=Flags.test_end_headerflagsinletheaders_block_length=Bigstringaf.lengthheaders_blockinletinitial_buffer_size=ifend_headersthenheaders_block_lengthelse(* Conservative estimate that there's only going to be one CONTINUATION
* frame. *)2*headers_block_lengthin{Stream.parse_state=AB.parse~initial_buffer_size(Hpack.Decoder.decode_headerst.hpack_decoder);end_stream=Flags.test_end_streamflags}lethandle_first_response_byteststreamactive_requestframe_headerheaders_block=let(Scheduler.Stream{descriptor;_})=streaminlet{Frame.flags;stream_id;_}=frame_headerinletpartial_headers=create_partial_headerstflagsheaders_blockinletremote_state=Stream.PartialHeaderspartial_headersindescriptor.Stream.state<-(ifStream.is_opendescriptorthenActive(Openremote_state,active_request)elseActive(HalfClosedremote_state,active_request));ifnot(Flags.test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;handle_headers_blocktstreampartial_headersflagsheaders_blockletprocess_trailer_headerststreamactive_responseframe_headerheaders_block=let(Scheduler.Stream{descriptor=respd;_})=streaminlet{Frame.stream_id;flags;_}=frame_headerinletend_stream=Flags.test_end_streamflagsinifnotend_streamthen(* From RFC7540§8.1:
* A HEADERS frame (and associated CONTINUATION frames) can only appear
* at the start or end of a stream. An endpoint that receives a HEADERS
* frame without the END_STREAM flag set after receiving a final
* (non-informational) status code MUST treat the corresponding request
* or response as malformed (Section 8.1.2.6). *)letmessage="HEADERS frames containing trailers must set the END_STREAM flag"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolErrorelseletpartial_headers={Stream.parse_state=AB.parse(Hpack.Decoder.decode_headerst.hpack_decoder)(* obviously true at this point. *);end_stream}inactive_response.Respd.trailers_parser<-Somepartial_headers;ifnotFlags.(test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;(* trailer headers: RFC7230§4.4 *)handle_trailer_headerststreampartial_headersflagsheaders_blockletprocess_headers_framet{Frame.frame_header;_}headers_block=let{Frame.stream_id;_}=frame_headerinmatchScheduler.get_nodet.streamsstream_idwith|None->(* If we're receiving a response for a stream that's no longer in the
* priority tree, assume this is a network race - we canceled a request
* but a responnse was already in flight.
*
* However, if the stream identifer is greater than the largest stream
* identifier we have produced, they should know better. In this case,
* send an RST_STREAM. *)ifStream_identifier.(stream_id>=t.current_stream_id&&is_requeststream_id)thenreport_stream_errortstream_idError_code.StreamClosed|Some(Scheduler.Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Idle->(* From RFC7540§6.2:
* HEADERS frames can be sent on a stream in the "idle", "reserved
* (local)", "open", or "half-closed (remote)" state. *)report_connection_errortError_code.ProtocolError|Active((OpenWaitingForPeer|HalfClosedWaitingForPeer),active_request)->handle_first_response_byteststreamactive_requestframe_headerheaders_block|Active((Open(FullHeaders|PartialHeaders_)|HalfClosed(FullHeaders|PartialHeaders_)),_)->assertfalse(* if we're getting a HEADERS frame at this point, they must be
* trailers, and the END_STREAM flag needs to be set. *)|Active((Open(ActiveMessageactive_response)|HalfClosed(ActiveMessageactive_response)),_)->process_trailer_headerststreamactive_responseframe_headerheaders_block;Stream.finish_streamdescriptorFinished|Closed{reason=ResetByThem_;_}->(* From RFC7540§5.1:
* closed: [...] An endpoint that receives any frame other than
* PRIORITY after receiving a RST_STREAM MUST treat that as a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError_code.StreamClosed(* From RFC7540§5.1:
* reserved (local): [...] Receiving any type of frame other than
* RST_STREAM, PRIORITY, or WINDOW_UPDATE on a stream in this state
* MUST be treated as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)|Reserved_|Closed_->(* From RFC7540§5.1:
* Similarly, an endpoint that receives any frames after receiving
* a frame with the END_STREAM flag set MUST treat that as a
* connection error (Section 5.4.1) of type STREAM_CLOSED [...]. *)report_connection_errortError_code.StreamClosed)letprocess_data_framet{Frame.frame_header;_}bstr=letopenSchedulerinlet{Frame.flags;stream_id;payload_length;_}=frame_headerinletpayload_len32=Int32.of_intpayload_lengthinmatchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Active((Open(ActiveMessageresponse_info)|HalfClosed(ActiveMessageresponse_info)),_)->let{Respd.response;response_body;response_body_bytes;_}=response_infoinresponse_info.response_body_bytes<-Int64.(addresponse_body_bytes(of_int(Bigstringaf.lengthbstr)));(* First, calculate whether we're allowed to receive this frame based
* on the _current_ inflow. *)letallowed_to_receive=Scheduler.(allowed_to_receivet.streamsstreampayload_len32)in(* Then, deduct inflow from the connection flow-control window, as
* mandated by the protocol.
*
* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always account
* for its contribution against the connection flow-control window,
* unless the receiver treats this as a connection error (Section 5.4.1).
* This is necessary even if the frame is in error. *)Scheduler.deduct_inflowt.streamspayload_len32;ifnotallowed_to_receivethen((* From RFC7540§6.9:
* A receiver MAY respond with a stream error (Section 5.4.2) or
* connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if it
* is unable to accept a frame. *)send_window_updatett.streamspayload_len32;report_stream_errortstream_idError_code.FlowControlError)else(Scheduler.deduct_inflowstreampayload_len32;matchMessage.body_lengthresponse.headerswith|`Fixedlen(* Getting more than the server declared *)whenInt64.compareresponse_info.response_body_byteslen>0->(* Give back connection-level flow-controlled bytes (we use payload
* length to include any padding bytes that the frame might have
* included - which were ignored at parse time). *)send_window_updatett.streamspayload_len32;(* From RFC7540§8.1.2.6:
* A request or response is also malformed if the value of a
* content-length header field does not equal the sum of the
* DATA frame payload lengths that form the body. *)set_error_and_handletdescriptor(`Invalid_response_body_lengthresponse)ProtocolError|_->letend_stream=Flags.test_end_streamflagsin(* From RFC7540§6.9.1:
* The receiver of a frame sends a WINDOW_UPDATE frame as it
* consumes data and frees up space in flow-control windows.
* Separate WINDOW_UPDATE frames are sent for the stream- and
* connection-level flow-control windows.
*
* Note: we send these WINDOW_UPDATE frames once the body bytes
* have been surfaced to the application. This is done in the
* record field `done_reading` of `Body.t`. *)letfaraday=Body.Reader.unsafe_faradayresponse_bodyinifnot(Faraday.is_closedfaraday)then(Faraday.schedule_bigstringfaradaybstr;ifend_streamthenBody.Reader.closeresponse_body);Respd.flush_response_bodydescriptor;ifend_stream&¬(Respd.requires_outputdescriptor)then(* From RFC7540§6.1:
* When set, bit 0 indicates that this frame is the last that
* the endpoint will send for the identified stream. Setting
* this flag causes the stream to enter one of the
* "half-closed" states or the "closed" state (Section 5.1).
*
* Transition to the "closed" state if this is the last DATA frame
* that the server will send and we're done sending. *)Stream.finish_streamdescriptorFinished)|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError(* This is technically in the half-closed (local) state *)|Closed{reason=ResetByUsNoError;_}->(* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always
* account for its contribution against the connection flow-control
* window, unless the receiver treats this as a connection error
* (Section 5.4.1). This is necessary even if the frame is in
* error. *)send_window_updatett.streamspayload_len32(* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM.
*
* Note: after some writer yields / wake ups, we will have stopped
* keeping state information for the stream. This functions effectively
* as a way of only accepting frames after an RST_STREAM from us up to
* a time limit. *)|_->send_window_updatett.streamspayload_len32;(* From RFC7540§6.1:
* If a DATA frame is received whose stream is not in "open" or
* "half-closed (local)" state, the recipient MUST respond with a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError_code.StreamClosed)|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorleton_close_streamtid~activeclosed=ifactivethen(* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_server_streams<-t.current_server_streams-1;Scheduler.mark_for_removalt.streamsidclosedletprocess_priority_framet{Frame.frame_header;_}priority=let{Frame.stream_id;_}=frame_headerinlet{Priority.stream_dependency;_}=priorityinifStream_identifier.(stream_id===stream_dependency)then(* From RFC7540§5.3.1:
* A stream cannot depend on itself. An endpoint MUST treat this as a
* stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortstream_idError_code.ProtocolErrorelsematchScheduler.get_nodet.streamsstream_idwith|Somestream->Scheduler.reprioritize_streamt.streams~prioritystream|None->(* From RFC7540§5.3:
* A client can assign a priority for a new stream by including
* prioritization information in the HEADERS frame (Section 6.2) that
* opens the stream. At any other time, the PRIORITY frame (Section
* 6.3) can be used to change the priority of a stream.
*
* Note: The spec mostly only mentions that clients are the endpoints
* that make use of PRIORITY frames. As such, we don't make too
* much of an effort to process PRIORITY frames coming from a
* server. If we know about a stream, we reprioritize it (meaning
* prioritization is an input to the process of allocating
* resources when flushing request bodies). Otherwise, we ignore
* it. We don't, however, report any errors if the frame is
* well-formed, as section 5. clearly mentions that PRIORITY frames
* must be accepted in all stream states.
*
* From RFC7540§5.1:
* Note that PRIORITY can be sent and received in any stream state. *)()letprocess_rst_stream_framet{Frame.frame_header;_}error_code=let{Frame.stream_id;_}=frame_headerinmatchScheduler.findt.streamsstream_idwith|Somerespd->(matchrespd.state,error_codewith|Idle,_->(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError|Active_,Error_code.NoError->(* If we're active (i.e. not done sending the request body), finish the
* stream, in order to mark it for cleanup.
*
* Note: we don't close the request body here because the client may be
* in the process of writing to it, and while we're not going to send
* those bytes to the output channel, we don't want to fail when writing
* either. *)Stream.finish_streamrespd(ResetByThemerror_code)|Closed_,_->(* From RFC7540§8.1:
* A server can send a complete response prior to the client sending an
* entire request if the response does not depend on any portion of the
* request that has not been sent and received. When this is true, a
* server MAY request that the client abort transmission of a request
* without error by sending a RST_STREAM with an error code of NO_ERROR
* after sending a complete response (i.e., a frame with the END_STREAM
* flag).
*
* If we're done sending the request there's nothing to do here, allow
* the stream to finish successfully.
*
* From RFC7540§5.1:
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received
* in this state, though endpoints MAY choose to treat frames that
* arrive a significant time after sending END_STREAM as a connection
* error (Section 5.4.1) of type PROTOCOL_ERROR.
*
* We ignore further RST_STREAM frames.
*)(* XXX(anmonteiro): When we add logging support, add something here. *)()|_->(* From RFC7540§6.4:
* The RST_STREAM frame fully terminates the referenced stream and
* causes it to enter the "closed" state. After receiving a
* RST_STREAM on a stream, the receiver MUST NOT send additional
* frames for that stream, with the exception of PRIORITY.
*
* Note:
* This match branch also accepts streams in the `Closed` state. We
* do that to comply with the following:
*
* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM. *)Stream.finish_streamrespd(ResetByThemerror_code);(* From RFC7540§5.4.2:
* To avoid looping, an endpoint MUST NOT send a RST_STREAM in response
* to a RST_STREAM frame.
*
* Note: the {!Respd.report_error} function does not send an RST_STREAM
* frame for streams in the closed state. So we close the stream before
* reporting the error. *)set_error_and_handletrespd(`Protocol_error(error_code,""))error_code)|None->(* We might have removed the stream from the hash table. If its stream
* id is smaller than or equal to the max client stream id we've generated,
* then it must have been closed. *)ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR.
*
* Note:
* If we didn't find the stream in the hash table it must be "idle". *)report_connection_errortError_code.ProtocolErrorletprocess_settings_framet{Frame.frame_header;_}settings=letopenSchedulerinlet{Frame.flags;_}=frame_headerin(* We already checked that an acked SETTINGS is empty. Don't need to do
* anything else in that case *)ifFlags.(test_ackflags)then(t.unacked_settings<-t.unacked_settings-1;ift.unacked_settings<0then(* The server is ACKing a SETTINGS frame that we didn't send *)report_connection_errort~reason:"Received unexpected SETTINGS frame with acknowledgement"Error_code.ProtocolError)elsematchSettings.check_settings_list~is_client:truesettingswith|Ok()->(* From RFC7540§6.5:
* Each parameter in a SETTINGS frame replaces any existing value for
* that parameter. Parameters are processed in the order in which they
* appear, and a receiver of a SETTINGS frame does not need to maintain
* any state other than the current value of its parameters. *)letnew_settings=List.fold_left(fun(acc:Settings.t)item->matchitemwith|Settings.HeaderTableSizex->(* From RFC7540§6.5.2:
* Allows the sender to inform the remote endpoint of the maximum
* size of the header compression table used to decode header
* blocks, in octets. *)Hpack.Encoder.set_capacityt.hpack_encoderx;{accwithheader_table_size=x}|EnablePushx->(* We've already verified that this setting is either 0 or 1 in the
* call to `Settings.check_settings_list` above. *){accwithenable_push=x=1}|MaxConcurrentStreamsx->{accwithmax_concurrent_streams=x}|InitialWindowSizenew_val->(* From RFC7540§6.9.2:
* In addition to changing the flow-control window for streams
* that are not yet active, a SETTINGS frame can alter the
* initial flow-control window size for streams with active
* flow-control windows (that is, streams in the "open" or
* "half-closed (remote)" state). When the value of
* SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
* the size of all stream flow-control windows that it
* maintains by the difference between the new value and the
* old value.
*
* [...] A SETTINGS frame cannot alter the connection
* flow-control window. *)letold_val=t.settings.initial_window_sizeinletgrowth=Int32.subnew_valold_valinletexceptionLocalin(matchScheduler.iter~f:(funstream->(* From RFC7540§6.9.2:
* An endpoint MUST treat a change to
* SETTINGS_INITIAL_WINDOW_SIZE that causes any
* flow-control window to exceed the maximum size as a
* connection error (Section 5.4.1) of type
* FLOW_CONTROL_ERROR. *)ifnot(Scheduler.add_flowstreamgrowth)thenraiseLocal)t.streamswith|()->()|exceptionLocal->report_connection_errort~reason:(Format.sprintf"Window size for stream would exceed %ld"Settings.WindowSize.max_window_size)Error_code.FlowControlError);{accwithinitial_window_size=new_val}|MaxFrameSizex->Scheduler.iter~f:(fun(Stream{descriptor;_})->ifRespd.requires_outputdescriptorthendescriptor.max_frame_size<-x)t.streams;{accwithmax_frame_size=x}|MaxHeaderListSizex->{accwithmax_header_list_size=Somex})t.settingssettingsint.settings<-new_settings;letframe_info=Writer.make_frame_info~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.5:
* ACK (0x1): [...] When this bit is set, the payload of the SETTINGS
* frame MUST be empty. *)Writer.write_settingst.writerframe_info[];t.unacked_settings<-t.unacked_settings+1;Writer.wakeupt.writer|Errorerror->report_errorterrorletreserve_streamt{Frame.frame_header;_}promised_stream_idheaders_block=let{Frame.flags;stream_id;_}=frame_headerin(* From RFC7540§6.6:
* The PUSH_PROMISE frame (type=0x5) is used to notify the peer endpoint in
* advance of streams the sender intends to initiate. *)letrespd=Stream.createpromised_stream_id~max_frame_size:t.settings.max_frame_sizet.writert.error_handler(on_close_streamtpromised_stream_id)in(* From RFC7540§5.3.5:
* All streams are initially assigned a non-exclusive dependency on stream
* 0x0. Pushed streams (Section 8.2) initially depend on their associated
* stream. In both cases, streams are assigned a default weight of 16. *)letstream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority:{Priority.default_prioritywithstream_dependency=stream_id}~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizerespdinletpartial_headers=create_partial_headerstflagsheaders_blockinrespd.state<-Reserved(PartialHeaderspartial_headers);ifnot(Flags.test_end_headerflags)thent.receiving_headers_for_stream<-Somepromised_stream_id;handle_headers_blocktstreampartial_headersflagsheaders_blockletprocess_push_promise_framet({Frame.frame_header;_}asframe)promised_stream_idheaders_block=let{Frame.stream_id;_}=frame_headerin(* At this point, `promised_stream_id` has already been validated by the
* parser *)ifnott.settings.enable_pushthen(* From RFC7540§6.6:
* PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH setting of
* the peer endpoint is set to 0. An endpoint that has set this setting
* and has received acknowledgement MUST treat the receipt of a
* PUSH_PROMISE frame as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)report_connection_errort~reason:"Push is disabled for this connection"Error_code.ProtocolErrorelseifnotStream_identifier.(promised_stream_id>t.max_pushed_stream_id)then(* From RFC7540§6.6:
* A receiver MUST treat the receipt of a PUSH_PROMISE that promises an
* illegal stream identifier (Section 5.1.1) as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. Note that an illegal stream
* identifier is an identifier for a stream that is not currently in the
* "idle" state. *)report_connection_errort~reason:"Illegal stream identifier promised by PUSH_PROMISE"Error_code.ProtocolErrorelseletsend_connection_error()=report_connection_errort~reason:"Received PUSH_PROMISE on a stream that is neither open nor \
half-closed (local)"Error_code.ProtocolErrorint.max_pushed_stream_id<-promised_stream_id;matchScheduler.findt.streamsstream_idwith|None->(* From RFC7540§6.6:
* A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that
* is neither "open" nor "half-closed (local)" as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)send_connection_error()|Somerespd->(matchrespd.statewith|Active((Open_|HalfClosed_),_)->reserve_streamtframepromised_stream_idheaders_block|_->send_connection_error())letok=Ok()letprocess_ping_framet{Frame.frame_header;_}payload=let{Frame.flags;_}=frame_headerin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response. [...] An endpoint MUST NOT respond to PING frames containing
* this flag. *)ifFlags.test_ackflagsthenmatchQueue.take_optt.pending_pingswith|Somecallback->callbackok|None->(* server is ACKing a PING that we didn't send? *)report_connection_errort~reason:"Unexpected PING acknowledgement"Error_code.ProtocolErrorelse(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. PING responses SHOULD be given higher priority than any other
* frame. *)letframe_info=Writer.make_frame_info(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a
* PING response. An endpoint MUST set this flag in PING
* responses. *)~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. *)Writer.write_pingt.writerframe_infopayload;Writer.wakeupt.writerletprocess_goaway_framet_framepayload=let_last_stream_id,_error,debug_data=payloadinletlen=Bigstringaf.lengthdebug_datainletbytes=Bytes.createleninBigstringaf.unsafe_blit_to_bytesdebug_data~src_off:0bytes~dst_off:0~len;(* TODO(anmonteiro): I think we need to allow lower numbered streams to
* complete. *)shutdown_rwtletadd_window_increment:typea.t->aScheduler.PriorityTreeNode.node->int32->unit=funtstreamincrement->letopenSchedulerinletdid_add=Scheduler.add_flowstreamincrementinletstream_id=Scheduler.stream_idstreaminletnew_flow=matchstreamwith|Connection{flow;_}->flow|Stream{flow;_}->flowinifdid_addthen(ifInt32.comparenew_flow0l>0then(* Don't bother waking up the writer if the new flow doesn't allow
* the stream to write. *)Writer.wakeupt.writer)elseifStream_identifier.is_connectionstream_idthenreport_connection_errort~reason:(Printf.sprintf"Window size for stream would exceed %ld"Settings.WindowSize.max_window_size)Error_code.FlowControlErrorelsereport_stream_errortstream_idError_code.FlowControlErrorletprocess_window_update_framet{Frame.frame_header;_}window_increment=letopenSchedulerinlet{Frame.stream_id;_}=frame_headerin(* From RFC7540§6.9:
* The WINDOW_UPDATE frame can be specific to a stream or to the entire
* connection. In the former case, the frame's stream identifier indicates
* the affected stream; in the latter, the value "0" indicates that the
* entire connection is the subject of the frame. *)ifStream_identifier.is_connectionstream_idthenadd_window_incrementtt.streamswindow_incrementelsematchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream_node)->(matchdescriptor.statewith|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError|Active_(* From RFC7540§5.1:
* reserved (local): [...] A PRIORITY or WINDOW_UPDATE frame MAY be
* received in this state. *)|Reserved_->add_window_incrementtstream_nodewindow_increment|Closed_->(* From RFC7540§5.1:
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received
* in this state, though endpoints MAY choose to treat frames that
* arrive a significant time after sending END_STREAM as a connection
* error (Section 5.4.1) of type PROTOCOL_ERROR. *)())|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolErrorletprocess_continuation_framet{Frame.frame_header;_}headers_block=let{Frame.stream_id;flags;_}=frame_headerinmatchScheduler.get_nodet.streamsstream_idwith|Some(Scheduler.Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Active((Open(PartialHeaderspartial_headers)|HalfClosed(PartialHeaderspartial_headers)),_)->handle_headers_blocktstreampartial_headersflagsheaders_block|Active((Open(ActiveMessage{trailers_parser=Somepartial_headers;_})|HalfClosed(ActiveMessage{trailers_parser=Somepartial_headers;_})),_)->handle_trailer_headerststreampartial_headersflagsheaders_block|_->(* TODO: maybe need to handle the case where the stream has been closed
* due to a stream error. *)(* From RFC7540§6.10:
* A RST_STREAM is the last frame that an endpoint can send on a
* stream. The peer that sends the RST_STREAM frame MUST be prepared
* to receive any frames that were sent or enqueued for sending by
* the remote peer. These frames can be ignored, except where they
* modify connection state (such as the state maintained for header
* compression (Section 4.3) or flow control). *)report_connection_errortError_code.ProtocolError)|None->(* From RFC7540§6.10:
* A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or
* CONTINUATION frame without the END_HEADERS flag set. A recipient that
* observes violation of this rule MUST respond with a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError_code.ProtocolError(* From RFC7540§1:
* HTTP/2 [...] allows interleaving of request and response messages on the
* same connection and uses an efficient coding for HTTP header fields. *)letcreate?(config=Config.default)?push_handler~error_handler()=letpush_handler=matchpush_handlerwith|Somepush_handler->push_handler|None->default_push_handlerinletsettings={(Config.to_settingsconfig)with(* If the caller is not going to process PUSH_PROMISE frames, just
* disable it. *)enable_push=config.enable_server_push&&push_handler!=default_push_handler}inletrecconnection_preface_handlerrecv_framesettings_list=lett=Lazy.forcetin(* Now process the client's SETTINGS frame. `process_settings_frame` will
* take care of calling `wakeup_writer`. *)process_settings_frametrecv_framesettings_listandframe_handlerr=lett=Lazy.forcetinmatchrwith|Errore->report_errorte|Ok({Frame.frame_payload;frame_header}asframe)->(matcht.receiving_headers_for_streamwith|Somestream_idwhen(notStream_identifier.(stream_id===frame_header.stream_id))||frame_header.frame_type<>Continuation->(* From RFC7540§6.2:
* A HEADERS frame without the END_HEADERS flag set MUST be followed
* by a CONTINUATION frame for the same stream. A receiver MUST treat
* the receipt of any other type of frame or a frame on a different
* stream as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)report_connection_errort~reason:"HEADERS or PUSH_PROMISE without the END_HEADERS flag set must be \
followed by a CONTINUATION frame for the same stream"Error_code.ProtocolError|_->(matchframe_payloadwith|Headers(_priority,headers_block)->process_headers_frametframeheaders_block|Databs->process_data_frametframebs|Prioritypriority->process_priority_frametframepriority|RSTStreamerror_code->process_rst_stream_frametframeerror_code|Settingssettings->process_settings_frametframesettings|PushPromise(promised_stream_id,bs)->process_push_promise_frametframepromised_stream_idbs|Pingdata->process_ping_frametframedata|GoAway(last_stream_id,error,debug_data)->process_goaway_frametframe(last_stream_id,error,debug_data)|WindowUpdatewindow_size->process_window_update_frametframewindow_size|Continuationheaders_block->process_continuation_frametframeheaders_block|Unknown_->(* From RFC7540§5.1:
* Frames of unknown types are ignored. *)()))andt=lazy{settings;config(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers *);current_stream_id=-1l;max_pushed_stream_id=0l;current_server_streams=0;receiving_headers_for_stream=None;did_send_go_away=false;unacked_settings=0;pending_pings=Queue.create();error_handler;push_handler;reader=Reader.client_frames~max_frame_size:settings.max_frame_sizeconnection_preface_handlerframe_handler;writer=Writer.createsettings.max_frame_size;streams=Scheduler.make_root()(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder=Hpack.Encoder.(createsettings.header_table_size);hpack_decoder=Hpack.Decoder.(createsettings.header_table_size)}inlett=Lazy.forcetin(* Check if the settings for the connection are different than the default
* HTTP/2 settings. In the event that they are, we need to send a non-empty
* SETTINGS frame advertising our configuration. *)letsettings=Settings.settings_for_the_connectiont.settingsin(* From RFC7540§6.9.2:
* When an HTTP/2 connection is first established, new streams are created
* with an initial flow-control window size of 65,535 octets. The
* connection flow-control window is also 65,535 octets.
*
* XXX(anmonteiro): the starting setting for the initial window size for
* _sending_ is the default of 65535 octets. We're effectively overwriting it
* here to enforce this default after abusing the settings implementation to
* send our (receiving) in-flow setting to the peer. Throughout other parts
* of the code we (should) refer to it through
* [t.config.initial_window_size]. This should probably be cleaned up in the
* future. *)t.settings<-{t.settingswithinitial_window_size=Settings.default.initial_window_size};(* Send the client connection preface *)Writer.write_connection_prefacet.writersettings;(* If a higher value for initial window size is configured, add more tokens
* to the connection (we have no streams at this point) -- the peer is
* allowed to send more than the defaults.
*
* From RFC7540§6.9.2:
* The connection flow-control window can only be changed using
* WINDOW_UPDATE frames. *)(ift.config.initial_window_size>Settings.default.initial_window_sizethenletdiff=Int32.subt.config.initial_window_sizeSettings.default.initial_window_sizeinsend_window_updatett.streamsdiff);tletcreate_and_add_streamt~error_handler=letmax_frame_size=t.settings.max_frame_sizeint.current_stream_id<-Int32.addt.current_stream_id2l;letstream_id=t.current_stream_idinletrespd=Stream.createstream_id~max_frame_sizet.writererror_handler(on_close_streamtstream_id)in(* TODO: custom priority *)let_stream:Scheduler.nonrootScheduler.node=Scheduler.addt.streams~priority:Priority.default_priority~initial_send_window_size:t.settings.initial_window_size~initial_recv_window_size:t.config.initial_window_sizerespdinrespd(* Meant to be called after receiving an HTTP/1.1 `101 Switching_protocols`
* response upgrading to HTTP/2. *)letcreate_h2c?config?push_handler~http_request~error_handler(response_handler,response_error_handler)=let{Httpaf.Request.target;meth;_}=http_requestinmatchHeaders.of_http1http_requestwith|Okheaders->(* From RFC7540§3.2:
* Upon receiving the 101 response, the client MUST send a connection
* preface (Section 3.5), which includes a SETTINGS frame. *)lett=create?config?push_handler~error_handler()inletrespd=create_and_add_streamt~error_handler:response_error_handlerinassert(Stream_identifier.(t.current_stream_id===1l));assert(Stream_identifier.(respd.id===1l));letrequest=Request.create~headers~scheme:"http"methtargetin(* From RFC7540§3.2:
* The HTTP/1.1 request that is sent prior to upgrade is assigned a
* stream identifier of 1 (see Section 5.1.1) with default priority
* values (Section 5.3.5). Stream 1 is implicitly "half-closed" from the
* client toward the server (see Section 5.1), since the request is
* completed as an HTTP/1.1 request. *)respd.state<-Active(HalfClosedWaitingForPeer,{request(* The request body is no more than a placeholder. The HTTP/1.1
* connection that we're upgrading from already sent it to the
* server. Application code knows what it is.
*
* From RFC7540§3.2:
* Requests that contain a payload body MUST be sent in their
* entirety before the client can send HTTP/2 frames. This means
* that a large request can block the use of the connection until
* it is completely sent. *);request_body=Body.Writer.create_empty~writer:t.writer;response_handler;trailers_handler=ignore});Writer.wakeupt.writer;Okt|Errormsg->Errormsgletrequestt?(flush_headers_immediately=false)?(trailers_handler=ignore)request~error_handler~response_handler=letmax_frame_size=t.settings.max_frame_sizeinletrespd=create_and_add_streamt~error_handlerinletrequest_body=Body.Writer.create(Bigstringaf.createmax_frame_size)~writer:t.writerinletframe_info=Writer.make_frame_info~max_frame_size:t.settings.max_frame_size~flags:Flags.default_flagsrespd.idinWriter.write_request_headerst.writert.hpack_encoder~priority:(* TODO: allow setting the priority of the request. *)Priority.default_priorityframe_inforequest;respd.state<-Active(OpenWaitingForPeer,{request;request_body;response_handler;trailers_handler});ifnotflush_headers_immediatelythenWriter.yieldt.writer;Writer.wakeupt.writer;(* Closing the request body puts the stream in the half-closed (local) state.
* This is handled by {!Respd.flush_request_body}, which transitions the
* state once it verifies that there's no more data to send for the
* stream. *)request_body(* XXX: we store PING callbacks in FIFO order. Would it ever be the case that
* the receipt of a PING frame acknowledges a later callback? If so, we'd need
* to disallow sending custom PING payloads and generate a random payload that
* we store in a Hashtbl. *)letpingt?payload?(off=0)callback=letpayload=matchpayloadwith|None->Serialize.default_ping_payload|Somepayload->ifBigstringaf.lengthpayload-off<8thenfailwith"PING payload must have at least 8 octets in length";payloadin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response.
*
* Note: this is not a PING response, quite the opposite, so we don't set
* the ACK flag. *)letframe_info=Writer.make_frame_infoStream_identifier.connectioninQueue.addcallbackt.pending_pings;Writer.write_pingt.writerframe_info~offpayload;Writer.wakeupt.writerletnext_read_operationt=ifReader.is_closedt.readerthenshutdown_readert;matchReader.nextt.readerwith|(`Read|`Close)asoperation->operation|`Errore->report_errorte;(matchewith|ConnectionError_->(* From RFC7540§5.4.1:
* A connection error is any error that prevents further processing
* of the frame layer or corrupts any connection state. *)`Close|StreamError_->(* From RFC7540§5.4.2:
* A stream error is an error related to a specific stream that does
* not affect processing of other streams. *)`Read)letreadtbs~off~len=Reader.read_with_moret.readerbs~off~lenIncompleteletunexpected_eoft=Scheduler.iter~f:(fun(Stream{descriptor;_})->matchdescriptor.statewith|Idle|Reserved_|Closed_->()|Active_->Respd.report_errordescriptor(`Malformed_response"unexpected eof")ProtocolError)t.streams;Queue.iter(funf->f(Error`EOF))t.pending_pings;report_connection_errort~reason:"unexpected eof"ProtocolErrorletread_eoftbs~off~len=letbytes_read=Reader.read_with_moret.readerbs~off~lenCompleteinunexpected_eoft;bytes_read(* XXX(anmonteiro): this function is here to please the Gluten `RUNTIME`
* interface.
*
* We don't expect this function to ever be called. H2 never issues `Yield`
* commands because the connection is multiplexed, and it's therefore always
* looking to read frames from the peer. *)letyield_reader_tk=k()letnext_write_operationt=flush_request_bodyt;Writer.nextt.writerletyield_writertk=Writer.on_wakeup_writert.writerkletreport_write_resulttresult=Writer.report_resultt.writerresult