123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514(*----------------------------------------------------------------------------
* Copyright (c) 2019 António Nuno Monteiro
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)moduleStreamsTbl=structincludeHashtbl.MakeSeeded(structtypet=Stream_identifier.tletequal=Stream_identifier.(===)lethashik=Hashtbl.seeded_hashikend)let[@inline]find_opthkey=trySome(findhkey)withNot_found->NoneendmoduletypeStreamDescriptor=sigtypetvalid:t->Stream_identifier.tvalrequires_output:t->boolvalflush_write_body:t->max_bytes:int->intvalfinish_stream:t->Stream.closed_reason->unitvalis_idle:t->boolendmoduleMake(Streamd:StreamDescriptor)=structmodulerecPriorityTreeNode:sigtyperoot=Roottypenonroot=NonRoottypestream=nonrootnodeandparent=Parent:_node->parentand_node=(* From RFC7540§5.3.1:
* A stream that is not dependent on any other stream is given a stream
* dependency of 0x0. In other words, the non-existent stream 0 forms
* the root of the tree.
*
* Note:
* We use a GADT because the root of the tree doesn't have an
* associated request descriptor. It has the added advantage of
* allowing us to enforce that all (other) streams in the tree are
* associated with a request descriptor. *)|Connection:{all_streams:streamStreamsTbl.t;mutablet_last:int;mutablechildren:PriorityQueue.t(* Connection-level flow control window.
* outbound flow control, what we're allowed to send.
*
* From RFC7540§6.9.1:
* Two flow-control windows are applicable: the stream
* flow-control window and the connection flow-control
* window. *);mutableflow:Settings.WindowSize.t(* inbound flow control, what the client is allowed to send. *);mutableinflow:Settings.WindowSize.t;mutablemarked_for_removal:(Stream_identifier.t*Stream.closed)list}->rootnode|Stream:{descriptor:Streamd.t;mutablet_last:int;mutablet:int;mutablepriority:Priority.t;mutableparent:parent;mutablechildren:PriorityQueue.t(* Stream-level flow control window. See connection-level above.
*
* From RFC7540§6.9.1:
* Two flow-control windows are applicable: the stream
* flow-control window and the connection flow-control
* window. *);mutableflow:Settings.WindowSize.t;mutableinflow:Settings.WindowSize.t}->nonrootnodeend=PriorityTreeNodeandPriorityQueue:(Psq.Swithtypek=Int32.tandtypep=PriorityTreeNode.stream)=Psq.Make(Int32)(structincludePriorityTreeNodetypet=streamletcompare(Stream{t=t1;_})(Stream{t=t2;_})=comparet1t2end)includePriorityTreeNodetypet=rootnode(* TODO(anmonteiro): change according to SETTINGS_MAX_CONCURRENT_STREAMS? *)letmake_root?(capacity=65536)()=Connection{t_last=0;children=PriorityQueue.empty;all_streams=StreamsTbl.create~random:truecapacity;flow=Settings.WindowSize.default_initial_window_size;inflow=Settings.WindowSize.default_initial_window_size;marked_for_removal=[]}letcreate~parent~initial_window_sizedescriptor=Stream{descriptor;t_last=0;t=0(* From RFC7540§5.3.5:
* All streams are initially assigned a non-exclusive dependency on
* stream 0x0. Pushed streams (Section 8.2) initially depend on
* their associated stream. In both cases, streams are assigned a
* default weight of 16. *);priority=Priority.default_priority;parent;children=PriorityQueue.empty;flow=initial_window_size;inflow=initial_window_size}letpq_addstream_idnodepq=PriorityQueue.addstream_idnodepqletremove_from_parent(Parentparent)id=matchparentwith|Connectionroot->(* From RFC7540§5.3.1:
* A stream that is not dependent on any other stream is given a stream
* dependency of 0x0. In other words, the non-existent stream 0 forms
* the root of the tree. *)root.children<-PriorityQueue.removeidroot.children|Streamstream->stream.children<-PriorityQueue.removeidstream.childrenletchildren:typea.anode->PriorityQueue.t=function|Stream{children;_}->children|Connection{children;_}->childrenletstream_id:typea.anode->int32=function|Connection_->Stream_identifier.connection|Stream{descriptor;_}->Streamd.iddescriptorletset_parentstream_node~exclusivenew_parent=let(Stream({descriptor;_}asstream))=stream_nodeinlet(Parentnew_parent_node)=new_parentinletstream_id=Streamd.iddescriptorinremove_from_parentstream.parentstream_id;stream.parent<-new_parent;letnew_children=letnew_children=childrennew_parent_nodeinifexclusivethen((* From RFC7540§5.3.3:
* Dependent streams move with their parent stream if the parent is
* reprioritized. Setting a dependency with the exclusive flag for a
* reprioritized stream causes all the dependencies of the new parent
* stream to become dependent on the reprioritized stream. *)stream.children<-PriorityQueue.fold(funk(Streampasp_node)pq->p.parent<-Parentstream_node;PriorityQueue.addkp_nodepq)stream.childrennew_children;(* From RFC7540§5.3.1:
* An exclusive flag allows for the insertion of a new level of
* dependencies. The exclusive flag causes the stream to become the
* sole dependency of its parent stream, causing other dependencies
* to become dependent on the exclusive stream. *)PriorityQueue.sgstream_idstream_node)elsepq_addstream_idstream_nodenew_childreninmatchnew_parent_nodewith|Streamstream->stream.children<-new_children|Connectionroot->root.children<-new_childrenletwould_create_cycle~new_parent(Stream{descriptor;_})=letrecinner:typea.anode->bool=function|Connection_->false|Stream{parent=Parentparent;_}whenStream_identifier.(stream_idparent===Streamd.iddescriptor)->true|Stream{parent=Parentparent;_}->innerparentinlet(Parentparent_node)=new_parentininnerparent_nodeletreprioritize_stream(Connectionrootast)~prioritystream_node=let(Streamstream)=stream_nodeinletnew_parent,new_priority=ifStream_identifier.is_connectionpriority.Priority.stream_dependencythenParentt,priorityelsematchStreamsTbl.find_optroot.all_streamspriority.stream_dependencywith|Someparent_stream->Parentparent_stream,priority|None->(* From RFC7540§5.3.1:
* A dependency on a stream that is not currently in the tree —
* such as a stream in the "idle" state — results in that stream
* being given a default priority (Section 5.3.5). *)Parentt,Priority.default_priorityin(* bail early if trying to set the same priority *)ifnot(Priority.equalstream.prioritynew_priority)then(let{Priority.stream_dependency;exclusive;_}=new_priorityinlet(Parentcurrent_parent_node)=stream.parentinletcurrent_parent_id=stream_idcurrent_parent_nodein(* only need to set a different parent if the parent or exclusive status
* changed *)if(notStream_identifier.(stream_dependency===current_parent_id))||exclusive!=stream.priority.exclusivethen(let(Parentnew_parent_node)=new_parentin(matchnew_parent_nodewith|Streamnew_parent_stream->ifwould_create_cycle~new_parentstream_nodethen((* From RFC7540§5.3.3:
* If a stream is made dependent on one of its own dependencies,
* the formerly dependent stream is first moved to be dependent
* on the reprioritized stream's previous parent. The moved
* dependency retains its weight. *)set_parentnew_parent_node~exclusive:falsestream.parent;new_parent_stream.priority<-{new_parent_stream.prioritywithstream_dependency=current_parent_id})|Connection_->(* The root node cannot be dependent on any other streams, so we
* don't need to worry about it creating cycles. *)());(* From RFC7540§5.3.1:
* When assigning a dependency on another stream, the stream is added
* as a new dependency of the parent stream. *)set_parentstream_node~exclusivenew_parent);stream.priority<-priority)letadd(Connectionrootast)?priority~initial_window_sizedescriptor=letstream=create~parent:(Parentt)~initial_window_sizedescriptorinletstream_id=Streamd.iddescriptorinStreamsTbl.addroot.all_streamsstream_idstream;root.children<-pq_addstream_idstreamroot.children;matchprioritywith|Somepriority->reprioritize_streamt~prioritystream|None->()letget_node(Connectionroot)stream_id=StreamsTbl.find_optroot.all_streamsstream_idletfindtstream_id=matchget_nodetstream_idwith|Some(Stream{descriptor;_})->Somedescriptor|None->Noneletiter(Connection{all_streams;_})~f=StreamsTbl.iter(fun_id->f)all_streamsletallowed_to_transmit(Connectionroot)(Streamstream)=root.flow>0&&stream.flow>0letallowed_to_receive(Connectionroot)(Streamstream)size=size<root.inflow&&size<stream.inflowletwrite(Connectionrootast)stream_node=let(Stream({descriptor;_}asstream))=stream_nodein(* From RFC7540§6.9.1:
* Two flow-control windows are applicable: the stream flow-control
* window and the connection flow-control window. The sender MUST NOT
* send a flow-controlled frame with a length that exceeds the space
* available in either of the flow-control windows advertised by the
* receiver. *)ifallowed_to_transmittstream_nodethen(letallowed_bytes=minroot.flowstream.flowinletwritten=Streamd.flush_write_body~max_bytes:allowed_bytesdescriptorin(* From RFC7540§6.9.1:
* After sending a flow-controlled frame, the sender reduces the space
* available in both windows by the length of the transmitted frame. *)root.flow<-root.flow-written;stream.flow<-stream.flow-written;written)else0letupdate_tstreamn=let(Stream({parent=Parentparent;_}asstream))=streaminlettlast_p=matchparentwith|Connection{t_last;_}->t_last|Stream{t_last;_}->t_lastinstream.t<-tlast_p+(n*256/stream.priority.weight)letmark_for_removal(Connectionroot)idclosed=root.marked_for_removal<-(id,closed)::root.marked_for_removalletimplicitly_close_idle_streamdescriptormax_seen_ids=letimplicitly_close_streamdescriptor=ifStreamd.is_idledescriptorthen(* From RFC7540§5.1.1:
* The first use of a new stream identifier implicitly closes all
* streams in the "idle" state that might have been initiated by
* that peer with a lower-valued stream identifier. *)Streamd.finish_streamdescriptorFinishedinletmax_client_stream_id,max_pushed_stream_id=max_seen_idsinletstream_id=Streamd.iddescriptorinifStream_identifier.is_requeststream_idthen(ifstream_id<max_client_stream_idthenimplicitly_close_streamdescriptor)elseifstream_id<max_pushed_stream_idthenimplicitly_close_streamdescriptor(* Scheduling algorithm from https://goo.gl/3sSHXJ (based on nghttp2):
*
* 1 def schedule(p):
* 2 if stream #p has data to send:
* 3 send data for #p, update nsent[p]
* 4 return
* 5 if #p's queue is empty:
* 6 return
* 7 pop #i from queue
* 8 update t_last[p] = t[i]
* 9 schedule(i)
* 10 if #i or its descendant is "active":
* 11 update t[i] and push it into queue again
* 12
* 13 schedule(0)
*)letflushtmax_seen_ids=letrecschedule:typea.anode->int*bool=function|Connectionp->(* The root can never send data. *)(matchPriorityQueue.popp.childrenwith|Some((id,(Streamiasi_node)),children')->p.t_last<-i.t;letwritten,subtree_is_active=schedulei_nodeinifsubtree_is_activethen(update_ti_nodewritten;p.children<-PriorityQueue.addidi_nodechildren')else(implicitly_close_idle_streami.descriptormax_seen_ids;(* XXX(anmonteiro): we may not want to remove from the tree right
* away. *)p.children<-children');written,subtree_is_active|None->(* Queue is empty, see line 6 above. *)0,false)|Stream({descriptor;_}asp)asp_node->ifStreamd.requires_outputdescriptorthen(* In this branch, flow-control has no bearing on activity, otherwise
* a flow-controlled stream would be considered inactive (because it
* can't make progress at the moment) and removed from the priority
* tree altogether. *)letwritten=writetp_nodein(* We check for activity again, because the stream may have gone
* inactive after the call to `write` above. *)written,Streamd.requires_outputdescriptorelse(matchPriorityQueue.popp.childrenwith|Some((id,(Streamiasi_node)),children')->p.t_last<-i.t;letwritten,subtree_is_active=schedulei_nodeinifsubtree_is_activethen(update_ti_nodewritten;p.children<-PriorityQueue.addidi_nodechildren')else(implicitly_close_idle_streami.descriptormax_seen_ids;p.children<-children');written,subtree_is_active|None->(* Queue is empty, see line 6 above. *)0,false)inlet(Connectionroot)=tinignore(schedulet);root.marked_for_removal<-List.fold_left(funacc(id,closed)->(* When a stream completes, i.e. doesn't require more output and
* enters the `Closed` state, we set a TTL value which represents
* the * number of writer yields that the stream has before it is
* removed * from the connection Hash Table. By doing this we avoid
* losing some * potentially useful information regarding the
* stream's state at the * cost of keeping it around for a little
* while longer. *)ifclosed.Stream.ttl==0then(StreamsTbl.removeroot.all_streamsid;acc)else(closed.ttl<-closed.ttl-1;(id,closed)::acc))[]root.marked_for_removalletcheck_flowflowgrowthflow'=(* Check for overflow on 32-bit systems. *)flow'>growth==(flow>0)&&flow'<=Settings.WindowSize.max_window_sizeletadd_flow:typea.anode->int->bool=funtgrowth->matchtwith|Connection({flow;_}asroot)->letflow'=flow+growthinletvalid_flow=check_flowflowgrowthflow'inifvalid_flowthenroot.flow<-flow';valid_flow|Stream({flow;_}asstream)->letflow'=flow+growthinletvalid_flow=check_flowflowgrowthflow'inifvalid_flowthenstream.flow<-flow';valid_flowletadd_inflow:typea.anode->int->bool=funtgrowth->matchtwith|Connection({inflow;_}asroot)->letinflow'=inflow+growthinletvalid_inflow=check_flowinflowgrowthinflow'inifvalid_inflowthenroot.inflow<-inflow';valid_inflow|Stream({inflow;_}asstream)->letinflow'=inflow+growthinletvalid_inflow=check_flowinflowgrowthinflow'inifvalid_inflowthenstream.inflow<-inflow';valid_inflowletdeduct_inflow:typea.anode->int->unit=funtsize->matchtwith|Connection({inflow;_}asroot)->(* no need to check, we verify that the peer is allowed to send. *)root.inflow<-inflow-size|Stream({inflow;_}asstream)->stream.inflow<-inflow-sizeletpp_humfmtt=letrecpp_hum_innerlevelfmtt=letpp_bindingfmt(i,Stream{children;t;_})=Format.fprintffmt"\n%s%ld, %d -> [%a]"(String.make(level*2)' ')it(pp_hum_inner(level+1))childreninPriorityQueue.pppp_bindingfmttinpp_hum_inner0fmttend