123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226(*----------------------------------------------------------------------------
* Copyright (c) 2019 António Nuno Monteiro
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)(* TODO(anmonteiro): think about whether we wanna expose this module. it might
* be helpful to expose a way to reset streams, and I think we'd need a
* reference to the Respd *)moduleWriter=Serialize.WriteropenStreamtypeerror=[`Malformed_responseofstring|`Invalid_response_body_lengthofResponse.t|`Protocol_errorofError_code.t*string|`Exnofexn]typeerror_handler=error->unittyperesponse_handler=Response.t->Body.Reader.t->unittyperesponse_info={response:Response.t;response_body:Body.Reader.t;mutableresponse_body_bytes:int64;mutabletrailers_parser:Stream.partial_headersoption}typetrailers_handler=Headers.t->unittypeactive_request={request:Request.t;request_body:Body.Writer.t;response_handler:response_handler;trailers_handler:trailers_handler}typeactive_state=(response_info,response_inforemote_state)Stream.active_statetypestate=(active_state,active_request,active_requestStream.remote_state)Stream.statetypet=(state,error,error_handler)Stream.streamletcreate_active_responseresponseresponse_body=ActiveMessage{response;response_body;response_body_bytes=Int64.zero;trailers_parser=None}letresponse_body_exnt=matcht.statewith|Idle|Reserved_->failwith"h2.Respd.response_exn: response has not arrived"|Active((Open(ActiveMessage{response_body;_})|HalfClosed(ActiveMessage{response_body;_})),_)->response_body|Active((Open_|HalfClosed_),_)->failwith"h2.Respd.response_exn: response has not arrived"|Closed_->failwith"h2.Respd.response_exn: stream already closed"(* let close_stream t =
*match t.error_code with
*| _, Some error_code ->
* reset_stream t error_code
*| _, None ->
* (match t.state with
* | HalfClosedLocal (ActiveMessage _) ->
* (* From RFC7540§8.1: A server can send a complete response prior to the
* client sending an entire request if the response does not depend on
* any portion of the request that has not been sent and received. When
* this is true, a server MAY request that the client abort transmission
* of a request without error by sending a RST_STREAM with an error code
* of NO_ERROR after sending a complete response (i.e., a frame with the
* END_STREAM flag). *)
* reset_stream t Error.NoError
* | Open (ActiveMessage _) ->
* Writer.flush t.writer (fun () -> finish_stream t Finished)
* | _ ->
* assert false) *)letclose_streamt=(* TODO: reserved *)matcht.statewith|Active(HalfClosed_,_)->(* easy case, just transition to the closed state. *)finish_streamtFinished|Active(Open_,_)->(* Still not done sending, reset stream with no error? *)(* TODO: *)()|_->()(* returns whether we should send an RST_STREAM frame or not. *)let_report_error(t:t)?response_body(error:error)error_code=matcht.error_codewith|No_error->(matchresponse_bodywith|Someresponse_body->Body.Reader.closeresponse_body;(* do we even need to execute this read? `close_reader` already does
it. *)Body.Reader.execute_readresponse_body|None->());t.error_code<-error_to_codeerrorerror_code;t.error_handlererror;true|Exn_|Other_->(* Already handling error.
* TODO(anmonteiro): Log a message when we add Logs support *)falseletreport_error(t:t)errorerror_code=matcht.statewith|Active((Open(ActiveMessage{response_body;_})|HalfClosed(ActiveMessage{response_body;_})),s)->Body.Writer.closes.request_body;if_report_errort~response_bodyerrorerror_codethenreset_streamterror_code|Reserved(ActiveMessages)|Active(_,s)->Body.Writer.closes.request_body;if_report_errorterrorerror_codethenreset_streamterror_code|Reserved_->(* Streams in the reserved state don't yet have a stream-level error
* handler registered with them *)()|Idle|Closed_->(* Not allowed to send RST_STREAM frames in these states *)ignore(_report_errorterrorerror_code)letrequires_outputt=matcht.statewith|Idle->true|Reserved_->false|Active(Open_,_)->true|Active(HalfClosed_,_)->false|Closed_->falseletflush_request_bodyt~max_bytes=matcht.statewith|Active(Openactive_state,({request_body;_}ass))->ifBody.Writer.has_pending_outputrequest_body&&max_bytes>0thenBody.Writer.transfer_to_writerrequest_bodyt.writer~max_frame_size:t.max_frame_size~max_bytest.idelseifBody.Writer.is_closedrequest_bodythen((* closed and no pending output *)(* From RFC7540§6.9.1:
* Frames with zero length with the END_STREAM flag set (that is, an
* empty DATA frame) MAY be sent if there is no available space in
* either flow-control window. *)letframe_info=Writer.make_frame_info~max_frame_size:t.max_frame_size~flags:Flags.(set_end_streamdefault_flags)t.idinWriter.schedule_datat.writerframe_info~len:0Bigstringaf.empty;t.state<-Active(HalfClosedactive_state,s);0)else(* not closed and no pending output *)0|_->0letdeliver_trailer_headerstheaders=matcht.statewith|Active((Open(ActiveMessage_)|HalfClosed(ActiveMessage_)),{trailers_handler;_})->trailers_handlerheaders|_->assertfalseletflush_response_bodyt=matcht.statewith|Active((Open(ActiveMessage{response_body;_})|HalfClosed(ActiveMessage{response_body;_})),_)->ifBody.Reader.has_pending_outputresponse_bodythen(tryBody.Reader.execute_readresponse_bodywith|exn->report_errort(`Exnexn)InternalError)|_->()