1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354(*----------------------------------------------------------------------------
* Copyright (c) 2017 Inhabited Type LLC.
* Copyright (c) 2019 Antonio N. Monteiro.
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the author nor the names of his contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleAB=Angstrom.BufferedmoduleReader=Parse.ReadermoduleWriter=Serialize.WritermoduleScheduler=Scheduler.Make(structincludeStreamtypet=Respd.tletflush_write_body=Respd.flush_request_bodyletrequires_output=Respd.requires_outputend)moduleQueue=structincludeQueuelettake_optt=matchis_emptytwithtrue->None|false->Some(taket)endtypeerror=[`Malformed_responseofstring|`Invalid_response_body_lengthofResponse.t|`Protocol_error|`Exnofexn]typeresponse_handler=Response.t->[`read]Body.t->unittypeerror_handler=error->unittypet={settings:Settings.t;reader:Reader.frame;writer:Writer.t;config:Config.t;streams:Scheduler.t;mutablecurrent_stream_id:Stream_identifier.t;mutablemax_pushed_stream_id:Stream_identifier.t;mutablecurrent_server_streams:int;mutablereceiving_headers_for_stream:Stream_identifier.toption;mutabledid_send_go_away:bool;mutableunacked_settings:int;pending_pings:(unit->unit)Queue.t;error_handler:error->unit;push_handler:Request.t->(response_handler,unit)result;wakeup_writer:(unit->unit)ref(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder:Hpack.Encoder.t;hpack_decoder:Hpack.Decoder.t}letdefault_push_handler=Sys.opaque_identity(fun_->Ok(fun__->()))letis_shutdownt=Reader.is_closedt.reader&&Writer.is_closedt.writerletis_closedt=Reader.is_closedt.reader&&Writer.is_closedt.writerleton_wakeup_writertk=ifis_shutdowntthenfailwith"on_wakeup_writer on closed conn"elset.wakeup_writer:=kletdefault_wakeup_writer()=()let_wakeup_writerwakeup_ref=letf=!wakeup_refinwakeup_ref:=default_wakeup_writer;f()letwakeup_writert=_wakeup_writert.wakeup_writerletshutdown_readert=Reader.force_closet.readerletflush_request_bodyt=Scheduler.flusht.streams(t.current_stream_id,t.max_pushed_stream_id)letshutdown_writert=flush_request_bodyt;Writer.closet.writerletshutdown_rwt=shutdown_readert;shutdown_writert(* Handling frames against closed streams is hard. See:
* https://docs.google.com/presentation/d/1iG_U2bKTc9CnKr0jPTrNfmxyLufx_cK2nNh9VjrKH6s
*)letwas_closed_or_implicitly_closedtstream_id=ifStream_identifier.is_requeststream_idthenStream_identifier.(stream_id<=t.current_stream_id)elseStream_identifier.(stream_id<=t.max_pushed_stream_id)letreport_errort=function|Error.ConnectionError(error,data)->ifnott.did_send_go_awaythen((* From RFC7540§5.4.1:
* An endpoint that encounters a connection error SHOULD first send a
* GOAWAY frame (Section 6.8) with the stream identifier of the last
* stream that it successfully received from its peer. The GOAWAY frame
* includes an error code that indicates why the connection is
* terminating. After sending the GOAWAY frame for an error condition,
* the endpoint MUST close the TCP connection. *)letdebug_data=ifString.lengthdata==0thenBigstringaf.emptyelseBigstringaf.of_string~off:0~len:(String.lengthdata)datainletframe_info=Writer.make_frame_infoStream_identifier.connectionin(* TODO: Only write if not already shutdown. *)Writer.write_go_awayt.writerframe_info~debug_data~last_stream_id:(ifStream_identifier.(t.current_stream_id===-1l)thenStream_identifier.connectionelset.current_stream_id)error;iferror<>Error.NoErrorthent.error_handler`Protocol_error;Writer.flusht.writer(fun()->(* XXX: We need to allow lower numbered streams to complete before
* shutting down. *)shutdown_rwt);t.did_send_go_away<-true;wakeup_writert)|StreamError(stream_id,error)->(matchScheduler.findt.streamsstream_idwith|Somerespd->Respd.report_errorrespd`Protocol_errorerror|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* Possible if the stream was going to enter the Idle state (first time
* we saw e.g. a PRIORITY frame for it) but had e.g. a
* FRAME_SIZE_ERROR. *)letframe_info=Writer.make_frame_infostream_idinWriter.write_rst_streamt.writerframe_infoerror);wakeup_writertletreport_connection_errort?(additional_debug_data="")error=report_errort(ConnectionError(error,additional_debug_data))letreport_stream_errortstream_iderror=report_errort(StreamError(stream_id,error))letshutdownt=report_connection_errortError.NoErrorletset_error_and_handletstreamerrorerror_code=Respd.report_errorstreamerrorerror_code;wakeup_writertletreport_exntexn=ifnot(is_shutdownt)thenletadditional_debug_data=Printexc.to_stringexninreport_connection_errort~additional_debug_dataError.InternalErrorlethandle_push_promise_headerstrespdheaders=(* From RFC7540§8.2.2:
* The header fields in PUSH_PROMISE and any subsequent CONTINUATION frames
* MUST be a valid and complete set of request header fields (Section
* 8.1.2.3). *)matchHeaders.method_path_and_scheme_or_malformedheaderswith|`Malformed->(* From RFC7540§8.2.2:
* If a client receives a PUSH_PROMISE that does not include a complete
* and valid set of header fields [...] it MUST respond with a stream
* error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortrespd.Stream.idError.ProtocolError|`Valid(meth,path,scheme)->letmeth=Httpaf.Method.of_stringmethin(matchmeth,Headers.get_pseudoheaders"authority",Message.body_lengthheaderswith|(#Httpaf.Method.standardasmeth),_,_whennotHttpaf.Method.(is_cacheablemeth&&is_safemeth)->report_stream_errortrespd.idError.ProtocolError|_,_,`FixedlenwhenInt64.comparelen0L!=0->(* From RFC7540§8.2:
* Clients that receive a promised request that is not cacheable,
* that is not known to be safe or that indicates the presence of a
* request body MUST reset the promised stream with a stream error
* (Section 5.4.2) of type PROTOCOL_ERROR.
*
* From RFC7231§4.2.3 (Cacheable Methods):
* [...] this specification defines GET, HEAD, and POST as cacheable
* [...].
*
* From RFC7231§4.2.1 (Safe Methods):
* Of the request methods defined by this specification, the GET, HEAD,
* OPTIONS, and TRACE methods are defined to be safe.
*
* Note: the intersection of safe and cacheable are the GET and HEAD
* methods. *)report_stream_errortrespd.idError.ProtocolError(* From RFC7540§8.2:
* The server MUST include a value in the :authority pseudo-header field
* for which the server is authoritative (see Section 10.1). A client
* MUST treat a PUSH_PROMISE for which the server is not authoritative as
* a stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)|_,None,_|_,_,`Error_->report_stream_errortrespd.idError.ProtocolError|_->letrequest=Request.create~scheme~headersmethpathin(matcht.push_handlerrequestwith|Okresponse_handler->(* From RFC7540§8.2:
* Promised requests [...] MUST NOT include a request body. *)letrequest_body=Body.emptyin(* From RFC7540§5.1:
* reserved (remote): [...] Receiving a HEADERS frame causes the
* stream to transition to "half-closed (local)". *)respd.state<-Active(HalfClosedStream.WaitingForPeer(* TODO: should we just store a `unit -> unit` in `t` to avoid
* one closure allocation here? *),{Respd.request;request_body;response_handler;wakeup_writer=(fun()->wakeup_writert)})|Error_->(* From RFC7540§6.6:
* Recipients of PUSH_PROMISE frames can choose to reject promised
* streams by returning a RST_STREAM referencing the promised stream
* identifier back to the sender of the PUSH_PROMISE. *)Stream.reset_streamrespdError.Cancel))lethandle_response_headerstrespd~end_streamactive_requestheaders=(* From RFC7540§8.1.2.6:
* Clients MUST NOT accept a malformed response.
*
* Note: in the branches where a malformed response is detected, the response
* handler is not called. *)matchHeaders.get_multi_pseudoheaders"status"with|[status]->letresponse=Response.create~headers(Status.of_stringstatus)in(matchend_stream,Message.body_lengthheaderswith|true,`FixedlenwhenInt64.comparelen0L!=0->(* From RFC7540§8.1.2.6:
* A request or response is also malformed if the value of a
* content-length header field does not equal the sum of the DATA
* frame payload lengths that form the body. *)set_error_and_handletrespd(`Invalid_response_body_lengthresponse)ProtocolError|_,`Error_->set_error_and_handletrespd(`Invalid_response_body_lengthresponse)ProtocolError|_,body_length->letresponse_body=ifend_streamthenBody.emptyelseletbuffer_size=matchbody_lengthwith|`Fixedn->Int64.to_intn|`Error_|`Unknown->(* Not sure how much data we're gonna get. Use the configured
* value for [response_body_buffer_size]. *)t.config.response_body_buffer_sizeinBody.create(Bigstringaf.createbuffer_size)active_request.Respd.wakeup_writerinletnew_response_state=Respd.create_active_responseresponseresponse_bodyinrespd.state<-Stream.(Active((ifis_openrespdthenOpennew_response_stateelseHalfClosednew_response_state),active_request));active_request.response_handlerresponseresponse_body;ifend_streamthen((* Deliver EOF to the response body, as the handler might be waiting
* on it to act. *)Body.close_readerresponse_body;(* From RFC7540§5.1:
* [...] an endpoint receiving an END_STREAM flag causes the stream
* state to become "half-closed (remote)". *)Respd.close_streamrespd))|_->(* From RFC7540§8.1.2.4:
* For HTTP/2 responses, a single :status pseudo-header field is defined
* that carries the HTTP status code field (see [RFC7231], Section 6).
* This pseudo-header field MUST be included in all responses; otherwise,
* the response is malformed (Section 8.1.2.6). *)letmessage="HTTP/2 responses must include a single `:status` pseudo-header"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolErrorlethandle_headerst~end_streamrespdheaders=(* From RFC7540§5.1.2:
* Endpoints MUST NOT exceed the limit set by their peer. An endpoint that
* receives a HEADERS frame that causes its advertised concurrent stream
* limit to be exceeded MUST treat this as a stream error (Section 5.4.2)
* of type PROTOCOL_ERROR or REFUSED_STREAM. *)ift.current_server_streams+1>t.config.max_concurrent_streamsthenift.unacked_settings>0then(* From RFC7540§8.1.4:
* The REFUSED_STREAM error code can be included in a RST_STREAM frame
* to indicate that the stream is being closed prior to any processing
* having occurred. Any request that was sent on the reset stream can
* be safely retried.
*
* Note: if there are pending SETTINGS to acknowledge, assume there was a
* race condition and let the client retry. *)report_stream_errortrespd.Stream.idError.RefusedStreamelsereport_stream_errortrespd.Stream.idError.ProtocolErrorelse((* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_server_streams<-t.current_server_streams+1;matchrespd.statewith|Reserved_->respd.state<-ReservedStream.FullHeaders;handle_push_promise_headerstrespdheaders|Active(active_state,active_request)->(matchactive_statewith|Open_->respd.state<-Active(OpenFullHeaders,active_request)|HalfClosed_->respd.state<-Active(Stream.(HalfClosedFullHeaders),active_request));handle_response_headerstrespd~end_streamactive_requestheaders|_->(* Unreachable. This function is only invoked if the stream is active. *)assertfalse)lethandle_headers_blockt?(is_trailers=false)respdpartial_headersflagsheaders_block=letopenABinletend_headers=Flags.test_end_headerflagsin(* From RFC7540§6.10:
* An endpoint receiving HEADERS, PUSH_PROMISE, or CONTINUATION
* frames needs to reassemble header blocks and perform decompression
* even if the frames are to be discarded *)letparse_state'=AB.feedpartial_headers.Stream.parse_state(`Bigstringheaders_block)inifend_headersthen(t.receiving_headers_for_stream<-None;letparse_state'=AB.feedparse_state'`Eofinmatchparse_state'with|Done(_,Okheaders)->ifnotis_trailersthen(* `handle_headers` will take care of transitioning the stream state *)letend_stream=partial_headers.end_streaminhandle_headerst~end_streamrespdheaderselseifHeaders.trailers_validheadersthen(Respd.deliver_trailer_headersrespdheaders;letresponse_body=Respd.response_body_exnrespdinBody.close_readerresponse_body)else(* From RFC7540§8.1.2.1:
* Pseudo-header fields MUST NOT appear in trailers. Endpoints MUST
* treat a request or response that contains undefined or invalid
* pseudo-header fields as malformed (Section 8.1.2.6). *)letmessage="Pseudo-header fields must not appear in trailers"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolError(* From RFC7540§4.3:
* A decoding error in a header block MUST be treated as a connection
* error (Section 5.4.1) of type COMPRESSION_ERROR. *)|Done(_,Error_)|Partial_->report_connection_errortError.CompressionError|Fail(_,_,message)->report_connection_errort~additional_debug_data:messageError.CompressionError)elsepartial_headers.parse_state<-parse_state'lethandle_trailer_headers=handle_headers_block~is_trailers:trueletcreate_partial_headerstflagsheaders_block=letend_headers=Flags.test_end_headerflagsinletheaders_block_length=Bigstringaf.lengthheaders_blockinletinitial_buffer_size=ifend_headersthenheaders_block_lengthelse(* Conservative estimate that there's only going to be one CONTINUATION
* frame. *)2*headers_block_lengthin{Stream.parse_state=AB.parse~initial_buffer_size(Hpack.Decoder.decode_headerst.hpack_decoder);end_stream=Flags.test_end_streamflags}lethandle_first_response_byteststreamactive_requestframe_header?priorityheaders_block=let(Scheduler.Stream{descriptor;_})=streaminlet{Frame.flags;stream_id;_}=frame_headerinletpartial_headers=create_partial_headerstflagsheaders_blockinletremote_state=Stream.PartialHeaderspartial_headersindescriptor.Stream.state<-(ifStream.is_opendescriptorthenActive(Openremote_state,active_request)elseActive(HalfClosedremote_state,active_request));ifnot(Flags.test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;(matchprioritywith|None->()|Somepriority->Scheduler.reprioritize_streamt.streams~prioritystream);handle_headers_blocktdescriptorpartial_headersflagsheaders_blockletprocess_trailer_headerstrespdactive_responseframe_headerheaders_block=let{Frame.stream_id;flags;_}=frame_headerinletend_stream=Flags.test_end_streamflagsinifnotend_streamthen(* From RFC7540§8.1:
* A HEADERS frame (and associated CONTINUATION frames) can only appear
* at the start or end of a stream. An endpoint that receives a HEADERS
* frame without the END_STREAM flag set after receiving a final
* (non-informational) status code MUST treat the corresponding request
* or response as malformed (Section 8.1.2.6). *)letmessage="HEADERS frames containing trailers must set the END_STREAM flag"inset_error_and_handletrespd(`Malformed_responsemessage)ProtocolErrorelseletpartial_headers={Stream.parse_state=AB.parse(Hpack.Decoder.decode_headerst.hpack_decoder)(* obviously true at this point. *);end_stream}inactive_response.Respd.trailers_parser<-Somepartial_headers;ifnotFlags.(test_end_headerflags)thent.receiving_headers_for_stream<-Somestream_id;(* trailer headers: RFC7230§4.4 *)handle_trailer_headerstrespdpartial_headersflagsheaders_blockletprocess_headers_framet{Frame.frame_header;_}?priorityheaders_block=let{Frame.stream_id;_}=frame_headerinmatchprioritywith|Some{Priority.stream_dependency;_}whenStream_identifier.(stream_dependency===stream_id)->(* From RFC7540§5.3.1:
* A stream cannot depend on itself. An endpoint MUST treat this as a
* stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortstream_idError.ProtocolError|_->(matchScheduler.get_nodet.streamsstream_idwith|None->(* If we're receiving a response for a stream that's no longer in the
* priority tree, assume this is a network race - we canceled a request
* but a responnse was already in flight.
*
* However, if the stream identifer is greater than the largest stream
* identifier we have produced, they should know better. In this case,
* send an RST_STREAM. *)ifStream_identifier.(stream_id>=t.current_stream_id&&is_requeststream_id)thenreport_stream_errortstream_idError.StreamClosed|Some(Scheduler.Stream{descriptor;_}asstream)->(matchdescriptor.statewith|Idle->(* From RFC7540§6.2:
* HEADERS frames can be sent on a stream in the "idle", "reserved
* (local)", "open", or "half-closed (remote)" state. *)report_connection_errortError.ProtocolError|Active((OpenWaitingForPeer|HalfClosedWaitingForPeer),active_request)->handle_first_response_byteststreamactive_requestframe_header?priorityheaders_block|Active((Open(FullHeaders|PartialHeaders_)|HalfClosed(FullHeaders|PartialHeaders_)),_)->assertfalse(* if we're getting a HEADERS frame at this point, they must be
* trailers, and the END_STREAM flag needs to be set. *)|Active((Open(ActiveMessageactive_response)|HalfClosed(ActiveMessageactive_response)),_)->process_trailer_headerstdescriptoractive_responseframe_headerheaders_block|Closed{reason=ResetByThem_;_}->(* From RFC7540§5.1:
* closed: [...] An endpoint that receives any frame other than
* PRIORITY after receiving a RST_STREAM MUST treat that as a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError.StreamClosed(* From RFC7540§5.1:
* reserved (local): [...] Receiving any type of frame other than
* RST_STREAM, PRIORITY, or WINDOW_UPDATE on a stream in this state
* MUST be treated as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)|Reserved_|Closed_->(* From RFC7540§5.1:
* Similarly, an endpoint that receives any frames after receiving
* a frame with the END_STREAM flag set MUST treat that as a
* connection error (Section 5.4.1) of type STREAM_CLOSED [...]. *)report_connection_errortError.StreamClosed))letsend_window_update:typea.t->aScheduler.PriorityTreeNode.node->int->unit=funtstreamn->letsend_window_update_framestream_idn=letvalid_inflow=Scheduler.add_inflowstreamninassertvalid_inflow;letframe_info=Writer.make_frame_infostream_idinWriter.write_window_updatet.writerframe_infoninifn>0then(letmax_window_size=Settings.WindowSize.max_window_sizeinletstream_id=Scheduler.stream_idstreaminletrecloopn=ifn>max_window_sizethen(send_window_update_framestream_idmax_window_size;loop(n-max_window_size))elsesend_window_update_framestream_idninloopn;wakeup_writert)letprocess_data_framet{Frame.frame_header;_}bstr=letopenSchedulerinlet{Frame.flags;stream_id;payload_length;_}=frame_headerin(* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always account
* for its contribution against the connection flow-control window,
* unless the receiver treats this as a connection error (Section 5.4.1).
* This is necessary even if the frame is in error. *)Scheduler.deduct_inflowt.streamspayload_length;matchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream)->(matchdescriptor.Stream.statewith|Active((Open(ActiveMessageresponse_info)|HalfClosed(ActiveMessageresponse_info)),_)->let{Respd.response;response_body;response_body_bytes;_}=response_infoinresponse_info.response_body_bytes<-Int64.(addresponse_body_bytes(of_int(Bigstringaf.lengthbstr)));ifnotScheduler.(allowed_to_receivet.streamsstreampayload_length)then(* From RFC7540§6.9:
* A receiver MAY respond with a stream error (Section 5.4.2) or
* connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if it
* is unable to accept a frame. *)report_stream_errortstream_idError.FlowControlErrorelse(Scheduler.deduct_inflowstreampayload_length;matchMessage.body_lengthresponse.headerswith|`Fixedlen(* Getting more than the server declared *)whenInt64.compareresponse_info.response_body_byteslen>0->(* Give back connection-level flow-controlled bytes (we use payload
* length to include any padding bytes that the frame might have
* included - which were ignored at parse time). *)send_window_updatett.streamspayload_length;(* From RFC7540§8.1.2.6:
* A request or response is also malformed if the value of a
* content-length header field does not equal the sum of the
* DATA frame payload lengths that form the body. *)set_error_and_handletdescriptor(`Invalid_response_body_lengthresponse)ProtocolError|_->letend_stream=Flags.test_end_streamflagsin(* XXX(anmonteiro): should we only give back flow control after we
* delivered EOF to the response body? There's a potential flow
* control issue right now where we're handing out connection-level
* flow control tokens on the receipt of every DATA frame. This
* might allow servers to send an unbounded number of bytes. Same
* issue on the server (see corresponding comment). *)(* From RFC7540§6.9.1:
* The receiver of a frame sends a WINDOW_UPDATE frame as it
* consumes data and frees up space in flow-control windows.
* Separate WINDOW_UPDATE frames are sent for the stream- and
* connection-level flow-control windows. *)send_window_updatett.streamspayload_length;send_window_updatetstreampayload_length;letfaraday=Body.unsafe_faradayresponse_bodyinifnot(Faraday.is_closedfaraday)then(Faraday.schedule_bigstringfaradaybstr;ifend_streamthenBody.close_readerresponse_body);ifend_stream&¬(Respd.requires_outputdescriptor)then(* From RFC7540§6.1:
* When set, bit 0 indicates that this frame is the last that
* the endpoint will send for the identified stream. Setting
* this flag causes the stream to enter one of the
* "half-closed" states or the "closed" state (Section 5.1).
*
* Transition to the "closed" state if this is the last DATA frame
* that the server will send and we're done sending. *)Stream.finish_streamdescriptorFinished)|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError.ProtocolError(* This is technically in the half-closed (local) state *)|Closed{reason=ResetByUsNoError;_}->(* From RFC7540§6.9:
* A receiver that receives a flow-controlled frame MUST always
* account for its contribution against the connection flow-control
* window, unless the receiver treats this as a connection error
* (Section 5.4.1). This is necessary even if the frame is in
* error. *)send_window_updatett.streamspayload_length(* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM.
*
* Note: after some writer yields / wake ups, we will have stopped
* keeping state information for the stream. This functions effectively
* as a way of only accepting frames after an RST_STREAM from us up to
* a time limit. *)|_->send_window_updatett.streamspayload_length;(* From RFC7540§6.1:
* If a DATA frame is received whose stream is not in "open" or
* "half-closed (local)" state, the recipient MUST respond with a
* stream error (Section 5.4.2) of type STREAM_CLOSED. *)report_stream_errortstream_idError.StreamClosed)|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError.ProtocolErrorleton_close_streamtid~activeclosed=ifactivethen(* From RFC7540§5.1.2:
* Streams that are in the "open" state or in either of the "half-closed"
* states count toward the maximum number of streams that an endpoint is
* permitted to open. *)t.current_server_streams<-t.current_server_streams-1;Scheduler.mark_for_removalt.streamsidclosedletprocess_priority_framet{Frame.frame_header;_}priority=let{Frame.stream_id;_}=frame_headerinlet{Priority.stream_dependency;_}=priorityinifStream_identifier.(stream_id===stream_dependency)then(* From RFC7540§5.3.1:
* A stream cannot depend on itself. An endpoint MUST treat this as a
* stream error (Section 5.4.2) of type PROTOCOL_ERROR. *)report_stream_errortstream_idError.ProtocolErrorelsematchScheduler.get_nodet.streamsstream_idwith|Somestream->Scheduler.reprioritize_streamt.streams~prioritystream|None->(* From RFC7540§5.3:
* A client can assign a priority for a new stream by including
* prioritization information in the HEADERS frame (Section 6.2) that
* opens the stream. At any other time, the PRIORITY frame (Section
* 6.3) can be used to change the priority of a stream.
*
* Note: The spec mostly only mentions that clients are the endpoints
* that make use of PRIORITY frames. As such, we don't make too
* much of an effort to process PRIORITY frames coming from a
* server. If we know about a stream, we reprioritize it (meaning
* prioritization is an input to the process of allocating
* resources when flushing request bodies). Otherwise, we ignore
* it. We don't, however, report any errors if the frame is
* well-formed, as section 5. clearly mentions that PRIORITY frames
* must be accepted in all stream states.
*
* From RFC7540§5.1:
* Note that PRIORITY can be sent and received in any stream state. *)()letprocess_rst_stream_framet{Frame.frame_header;_}error_code=let{Frame.stream_id;_}=frame_headerinmatchScheduler.findt.streamsstream_idwith|Somerespd->(matchrespd.statewith|Idle->(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError.ProtocolError|_->(* From RFC7540§6.4:
* The RST_STREAM frame fully terminates the referenced stream and
* causes it to enter the "closed" state. After receiving a
* RST_STREAM on a stream, the receiver MUST NOT send additional
* frames for that stream, with the exception of PRIORITY.
*
* Note:
* This match branch also accepts streams in the `Closed` state. We
* do that to comply with the following:
*
* From RFC7540§6.4:
* [...] after sending the RST_STREAM, the sending endpoint MUST be
* prepared to receive and process additional frames sent on the
* stream that might have been sent by the peer prior to the arrival
* of the RST_STREAM. *)Stream.finish_streamrespd(ResetByThemerror_code);(* From RFC7540§5.4.2:
* To avoid looping, an endpoint MUST NOT send a RST_STREAM in response
* to a RST_STREAM frame.
*
* Note: the {!Respd.report_error} function does not send an RST_STREAM
* frame for streams in the closed state. So we close the stream before
* reporting the error. *)set_error_and_handletrespd`Protocol_errorerror_code)|None->(* We might have removed the stream from the hash table. If its stream
* id is smaller than or equal to the max client stream id we've generated,
* then it must have been closed. *)ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§6.4:
* RST_STREAM frames MUST NOT be sent for a stream in the "idle"
* state. If a RST_STREAM frame identifying an idle stream is
* received, the recipient MUST treat this as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR.
*
* Note:
* If we didn't find the stream in the hash table it must be "idle". *)report_connection_errortError.ProtocolErrorletprocess_settings_framet{Frame.frame_header;_}settings=letopenSchedulerinlet{Frame.flags;_}=frame_headerin(* We already checked that an acked SETTINGS is empty. Don't need to do
* anything else in that case *)ifFlags.(test_ackflags)then(t.unacked_settings<-t.unacked_settings-1;ift.unacked_settings<0then(* The server is ACKing a SETTINGS frame that we didn't send *)letadditional_debug_data="Received SETTINGS with ACK but no ACK was pending"inreport_connection_errort~additional_debug_dataError.ProtocolError)elsematchSettings.check_settings_list~is_client:truesettingswith|None->(* From RFC7540§6.5:
* Each parameter in a SETTINGS frame replaces any existing value for
* that parameter. Parameters are processed in the order in which they
* appear, and a receiver of a SETTINGS frame does not need to maintain
* any state other than the current value of its parameters. *)List.iter(function|Settings.HeaderTableSize,x->(* From RFC7540§6.5.2:
* Allows the sender to inform the remote endpoint of the maximum
* size of the header compression table used to decode header
* blocks, in octets. *)t.settings.header_table_size<-x;Hpack.Encoder.set_capacityt.hpack_encoderx|EnablePush,x->(* We've already verified that this setting is either 0 or 1 in the
* call to `Settings.check_settings_list` above. *)t.settings.enable_push<-x==1|MaxConcurrentStreams,x->t.settings.max_concurrent_streams<-x|InitialWindowSize,new_val->(* From RFC7540§6.9.2:
* [...] a SETTINGS frame can alter the initial flow-control
* window size for streams with active flow-control windows (that
* is, streams in the "open" or "half-closed (remote)" state).
* When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a
* receiver MUST adjust the size of all stream flow-control
* windows that it maintains by the difference between the new
* pvalue and the old value. *)letold_val=t.settings.initial_window_sizeint.settings.initial_window_size<-new_val;letgrowth=new_val-old_valinletexceptionLocalin(matchScheduler.iter~f:(funstream->(* From RFC7540§6.9.2:
* An endpoint MUST treat a change to
* SETTINGS_INITIAL_WINDOW_SIZE that causes any
* flow-control window to exceed the maximum size as a
* connection error (Section 5.4.1) of type
* FLOW_CONTROL_ERROR. *)ifnot(Scheduler.add_flowstreamgrowth)thenraiseLocal)t.streamswith|()->()|exceptionLocal->report_connection_errort~additional_debug_data:(Format.sprintf"Window size for stream would exceed %d"Settings.WindowSize.max_window_size)Error.FlowControlError)|MaxFrameSize,x->t.settings.max_frame_size<-x;Scheduler.iter~f:(fun(Stream{descriptor;_})->ifRespd.requires_outputdescriptorthendescriptor.max_frame_size<-x)t.streams|MaxHeaderListSize,x->t.settings.max_header_list_size<-Somex)settings;letframe_info=Writer.make_frame_info~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.5:
* ACK (0x1): [...] When this bit is set, the payload of the SETTINGS
* frame MUST be empty. *)Writer.write_settingst.writerframe_info[];t.unacked_settings<-t.unacked_settings+1;wakeup_writert|Someerror->report_errorterrorletreserve_streamt{Frame.frame_header;_}promised_stream_idheaders_block=let{Frame.flags;_}=frame_headerin(* From RFC7540§6.6:
* The PUSH_PROMISE frame (type=0x5) is used to notify the peer endpoint in
* advance of streams the sender intends to initiate. *)letrespd=Stream.createpromised_stream_id~max_frame_size:t.settings.max_frame_sizet.writert.error_handler(on_close_streamtpromised_stream_id)inScheduler.addt.streams~initial_window_size:t.settings.initial_window_sizerespd;letpartial_headers=create_partial_headerstflagsheaders_blockinrespd.state<-Reserved(PartialHeaderspartial_headers);ifnot(Flags.test_end_headerflags)thent.receiving_headers_for_stream<-Somepromised_stream_id;handle_headers_blocktrespdpartial_headersflagsheaders_blockletprocess_push_promise_framet({Frame.frame_header;_}asframe)promised_stream_idheaders_block=let{Frame.stream_id;_}=frame_headerin(* At this point, `promised_stream_id` has already been validated by the
* parser *)ifnott.settings.enable_pushthen(* From RFC7540§6.6:
* PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH setting of
* the peer endpoint is set to 0. An endpoint that has set this setting
* and has received acknowledgement MUST treat the receipt of a
* PUSH_PROMISE frame as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)letadditional_debug_data="Push is not enabled for the connection"inreport_connection_errort~additional_debug_dataError.ProtocolErrorelseifnotStream_identifier.(promised_stream_id>t.max_pushed_stream_id)then(* From RFC7540§6.6:
* A receiver MUST treat the receipt of a PUSH_PROMISE that promises an
* illegal stream identifier (Section 5.1.1) as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. Note that an illegal stream
* identifier is an identifier for a stream that is not currently in the
* "idle" state. *)letadditional_debug_data="Illegal stream identifier promised by PUSH_PROMISE"inreport_connection_errort~additional_debug_dataError.ProtocolErrorelseletsend_connection_error()=letadditional_debug_data="Received PUSH_PROMISE on a stream that is neither open nor \
half-closed (local)"inreport_connection_errort~additional_debug_dataError.ProtocolErrorint.max_pushed_stream_id<-promised_stream_id;matchScheduler.findt.streamsstream_idwith|None->(* From RFC7540§6.6:
* A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that
* is neither "open" nor "half-closed (local)" as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)send_connection_error()|Somerespd->(matchrespd.statewith|Active((Open_|HalfClosed_),_)->reserve_streamtframepromised_stream_idheaders_block|_->send_connection_error())letprocess_ping_framet{Frame.frame_header;_}payload=let{Frame.flags;_}=frame_headerin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response. [...] An endpoint MUST NOT respond to PING frames containing
* this flag. *)ifFlags.test_ackflagsthenmatchQueue.take_optt.pending_pingswith|Somecallback->callback()|None->(* server is ACKing a PING that we didn't send? *)letadditional_debug_data="Unexpected PING acknowledgement"inreport_connection_errort~additional_debug_dataError.ProtocolErrorelse(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. PING responses SHOULD be given higher priority than any other
* frame. *)letframe_info=Writer.make_frame_info(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a
* PING response. An endpoint MUST set this flag in PING
* responses. *)~flags:Flags.(set_ackdefault_flags)Stream_identifier.connectionin(* From RFC7540§6.7:
* Receivers of a PING frame that does not include an ACK flag MUST send
* a PING frame with the ACK flag set in response, with an identical
* payload. *)Writer.write_pingt.writerframe_infopayload;wakeup_writertletprocess_goaway_framet_framepayload=let_last_stream_id,_error,debug_data=payloadinletlen=Bigstringaf.lengthdebug_datainletbytes=Bytes.createleninBigstringaf.unsafe_blit_to_bytesdebug_data~src_off:0bytes~dst_off:0~len;(* TODO(anmonteiro): I think we need to allow lower numbered streams to
* complete. *)shutdown_rwtletadd_window_increment:typea.t->aScheduler.PriorityTreeNode.node->int->unit=funtstreamincrement->letopenSchedulerinletdid_add=Scheduler.add_flowstreamincrementinletstream_id=Scheduler.stream_idstreaminletnew_flow=matchstreamwith|Connection{flow;_}->flow|Stream{flow;_}->flowinifdid_addthen(ifnew_flow>0then(* Don't bother waking up the writer if the new flow doesn't allow
* the stream to write. *)wakeup_writert)elseifStream_identifier.is_connectionstream_idthenreport_connection_errort~additional_debug_data:(Printf.sprintf"Window size for stream would exceed %d"Settings.WindowSize.max_window_size)Error.FlowControlErrorelsereport_stream_errortstream_idError.FlowControlErrorletprocess_window_update_framet{Frame.frame_header;_}window_increment=letopenSchedulerinlet{Frame.stream_id;_}=frame_headerin(* From RFC7540§6.9:
* The WINDOW_UPDATE frame can be specific to a stream or to the entire
* connection. In the former case, the frame's stream identifier indicates
* the affected stream; in the latter, the value "0" indicates that the
* entire connection is the subject of the frame. *)ifStream_identifier.is_connectionstream_idthenadd_window_incrementtt.streamswindow_incrementelsematchScheduler.get_nodet.streamsstream_idwith|Some(Stream{descriptor;_}asstream_node)->(matchdescriptor.statewith|Idle->(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError.ProtocolError|Active_(* From RFC7540§5.1:
* reserved (local): [...] A PRIORITY or WINDOW_UPDATE frame MAY be
* received in this state. *)|Reserved_->add_window_incrementtstream_nodewindow_increment|Closed_->(* From RFC7540§5.1:
* Endpoints MUST ignore WINDOW_UPDATE or RST_STREAM frames received
* in this state, though endpoints MAY choose to treat frames that
* arrive a significant time after sending END_STREAM as a connection
* error (Section 5.4.1) of type PROTOCOL_ERROR. *)())|None->ifnot(was_closed_or_implicitly_closedtstream_id)then(* From RFC7540§5.1:
* idle: [...] Receiving any frame other than HEADERS or PRIORITY on
* a stream in this state MUST be treated as a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError.ProtocolErrorletprocess_continuation_framet{Frame.frame_header;_}headers_block=let{Frame.stream_id;flags;_}=frame_headerinmatchScheduler.findt.streamsstream_idwith|Somestream->(matchstream.Stream.statewith|Active((Open(PartialHeaderspartial_headers)|HalfClosed(PartialHeaderspartial_headers)),_)->handle_headers_blocktstreampartial_headersflagsheaders_block|Active((Open(ActiveMessage{trailers_parser=Somepartial_headers;_})|HalfClosed(ActiveMessage{trailers_parser=Somepartial_headers;_})),_)->handle_trailer_headerststreampartial_headersflagsheaders_block|_->(* TODO: maybe need to handle the case where the stream has been closed
* due to a stream error. *)(* From RFC7540§6.10:
* A RST_STREAM is the last frame that an endpoint can send on a
* stream. The peer that sends the RST_STREAM frame MUST be prepared
* to receive any frames that were sent or enqueued for sending by
* the remote peer. These frames can be ignored, except where they
* modify connection state (such as the state maintained for header
* compression (Section 4.3) or flow control). *)report_connection_errortError.ProtocolError)|None->(* From RFC7540§6.10:
* A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or
* CONTINUATION frame without the END_HEADERS flag set. A recipient that
* observes violation of this rule MUST respond with a connection error
* (Section 5.4.1) of type PROTOCOL_ERROR. *)report_connection_errortError.ProtocolError(* Unlike e.g. http/af's current Client implementation (Oneshot) where a new
* connection is created per request, we create a single connection where all
* requests go through. HTTP/2 allows concurrency to exist on the connection
* level.
*
* From RFC7540§1:
* HTTP/2 [...] allows interleaving of request and response messages on the
* same connection and uses an efficient coding for HTTP header fields. *)letcreate?(config=Config.default)?push_handler~error_handler=letpush_handler=matchpush_handlerwith|Somepush_handler->push_handler|None->default_push_handlerinletsettings={Settings.default_settingswithmax_frame_size=config.read_buffer_size;max_concurrent_streams=config.max_concurrent_streams;initial_window_size=config.initial_window_size;enable_push=(* If the caller is not going to process PUSH_PROMISE frames, just
* disable it. *)config.enable_server_push&&push_handler!=default_push_handler}inletrecconnection_preface_handlerrecv_framesettings_list=lett=Lazy.forcetin(* Now process the client's SETTINGS frame. `process_settings_frame` will
* take care of calling `wakeup_writer`. *)process_settings_frametrecv_framesettings_listandframe_handlerr=lett=Lazy.forcetinmatchrwith|Errore->report_errorte|Ok({Frame.frame_payload;frame_header}asframe)->(matcht.receiving_headers_for_streamwith|Somestream_idwhen(notStream_identifier.(stream_id===frame_header.stream_id))||frame_header.frame_type!=Continuation->(* From RFC7540§6.2:
* A HEADERS frame without the END_HEADERS flag set MUST be followed
* by a CONTINUATION frame for the same stream. A receiver MUST treat
* the receipt of any other type of frame or a frame on a different
* stream as a connection error (Section 5.4.1) of type
* PROTOCOL_ERROR. *)report_connection_errort~additional_debug_data:"HEADERS or PUSH_PROMISE without the END_HEADERS flag set must be \
followed by a CONTINUATION frame for the same stream"Error.ProtocolError|_->(matchframe_payloadwith|Headers(priority,headers_block)->process_headers_frametframe?priorityheaders_block|Databs->process_data_frametframebs|Prioritypriority->process_priority_frametframepriority|RSTStreamerror_code->process_rst_stream_frametframeerror_code|Settingssettings->process_settings_frametframesettings|PushPromise(promised_stream_id,bs)->process_push_promise_frametframepromised_stream_idbs|Pingdata->process_ping_frametframedata|GoAway(last_stream_id,error,debug_data)->process_goaway_frametframe(last_stream_id,error,debug_data)|WindowUpdatewindow_size->process_window_update_frametframewindow_size|Continuationheaders_block->process_continuation_frametframeheaders_block|Unknown_->(* From RFC7540§5.1:
* Frames of unknown types are ignored. *)()))andt=lazy{settings;config(* From RFC7540§5.1.1:
* Streams initiated by a client MUST use odd-numbered stream
* identifiers *);current_stream_id=-1l;max_pushed_stream_id=0l;current_server_streams=0;receiving_headers_for_stream=None;did_send_go_away=false;unacked_settings=0;pending_pings=Queue.create();error_handler;push_handler;reader=Reader.client_framesconnection_preface_handlerframe_handler;writer=Writer.createsettings.max_frame_size;streams=Scheduler.make_root();wakeup_writer=refdefault_wakeup_writer(* From RFC7540§4.3:
* Header compression is stateful. One compression context and one
* decompression context are used for the entire connection. *);hpack_encoder=Hpack.Encoder.(createsettings.header_table_size);hpack_decoder=Hpack.Decoder.(createsettings.header_table_size)}inlett=Lazy.forcetin(* Check if the settings for the connection are different than the default
* HTTP/2 settings. In the event that they are, we need to send a non-empty
* SETTINGS frame advertising our configuration. *)letsettings=Settings.settings_for_the_connectiont.settingsin(* Send the client connection preface *)Writer.write_connection_prefacet.writersettings;(* If a higher value for initial window size is configured, add more
* tokens to the connection (we have no streams at this point). *)(ift.settings.initial_window_size>Settings.default_settings.initial_window_sizethenletdiff=t.settings.initial_window_size-Settings.default_settings.initial_window_sizeinsend_window_updatett.streamsdiff);tletrequesttrequest~error_handler~response_handler=letmax_frame_size=t.settings.max_frame_sizeinletwakeup_stream()=wakeup_writertinletrequest_body=Body.create(Bigstringaf.createmax_frame_size)wakeup_streamint.current_stream_id<-Int32.addt.current_stream_id2l;letstream_id=t.current_stream_idinletrespd=Stream.createstream_id~max_frame_sizet.writererror_handler(on_close_streamtstream_id)in(* TODO: priority *)Scheduler.addt.streams(* ?priority *)~initial_window_size:t.settings.initial_window_sizerespd;letframe_info=Writer.make_frame_info~max_frame_size~flags:Flags.default_flagsstream_idinWriter.write_request_headerst.writert.hpack_encoderframe_inforequest;Writer.flusht.writer(fun()->respd.state<-Active(OpenWaitingForPeer,{request;request_body;response_handler;wakeup_writer=wakeup_stream}));wakeup_writert;(* Closing the request body puts the stream in the half-closed (local) state.
* This is handled by {!Respd.flush_request_body}, which transitions the
* state once it verifies that there's no more data to send for the
* stream. *)request_body(* XXX: we store PING callbacks in FIFO order. Would it ever be the case that
* the receipt of a PING frame acknowledges a later callback? If so, we'd need
* to disallow sending custom PING payloads and generate a random payload that
* we store in a Hashtbl. *)letpingt?payload?(off=0)callback=letpayload=matchpayloadwith|None->Serialize.default_ping_payload|Somepayload->ifBigstringaf.lengthpayload-off<8thenfailwith"PING payload must have at least 8 octets in length";payloadin(* From RFC7540§6.7:
* ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
* response.
*
* Note: this is not a PING response, quite the opposite, so we don't set
* the ACK flag. *)letframe_info=Writer.make_frame_infoStream_identifier.connectioninQueue.addcallbackt.pending_pings;Writer.write_pingt.writerframe_info~offpayload;wakeup_writertletnext_read_operationt=ifReader.is_closedt.readerthenshutdown_readert;matchReader.nextt.readerwith|(`Read|`Close)asoperation->operation|`Errore->report_errorte;(matchewith|ConnectionError_->(* From RFC7540§5.4.1:
* A connection error is any error that prevents further processing
* of the frame layer or corrupts any connection state. *)`Close|StreamError_->(* From RFC7540§5.4.2:
* A stream error is an error related to a specific stream that does
* not affect processing of other streams. *)`Read)letreadtbs~off~len=Reader.read_with_moret.readerbs~off~lenIncompleteletread_eoftbs~off~len=Reader.read_with_moret.readerbs~off~lenCompleteletnext_write_operationt=flush_request_bodyt;Writer.nextt.writerletyield_writertk=ifWriter.is_closedt.writerthenk()elseon_wakeup_writertkletreport_write_resulttresult=Writer.report_resultt.writerresult