123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281(*----------------------------------------------------------------------------
* Copyright (c) 2019 António Nuno Monteiro
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*---------------------------------------------------------------------------*)(* TODO(anmonteiro): think about whether we wanna expose this module. it might
* be helpful to expose a way to reset streams, and I think we'd need a
* reference to the Respd *)moduleWriter=Serialize.WriteropenStreamtypeerror=[`Malformed_responseofstring|`Invalid_response_body_lengthofResponse.t|`Protocol_error|`Exnofexn]typeerror_handler=error->unittyperesponse_handler=Response.t->[`read]Body.t->unittyperesponse_info={response:Response.t;response_body:[`read]Body.t;mutableresponse_body_bytes:int64(* We're not doing anything with these yet, we could probably have a
* `Reqd.schedule_read_trailers` function that would be called once
* trailer headers are emitted. *);mutabletrailers_parser:Stream.partial_headersoption;mutabletrailers:Headers.toption}typeactive_request={request:Request.t;request_body:[`read]Body.t;response_handler:response_handler;wakeup_writer:unit->unit}typeactive_state=(response_info,response_inforemote_state)Stream.active_statetypestate=(active_state,active_request,active_requestStream.remote_state)Stream.statetypet=(state,[`Ok|error],error_handler)Stream.streamletcreate_active_responseresponseresponse_body=ActiveMessage{response;response_body;response_body_bytes=Int64.zero;trailers_parser=None;trailers=None}letrequestt=matcht.statewith|Idle|Reserved(WaitingForPeer|PartialHeaders_|FullHeaders)->failwith"h2.Respd.request: request is not active"|Reserved(ActiveMessage{request;_})->request|Active(_,{request;_})->request|Closed_->failwith"h2.Respd.request: request has ended"letrequest_bodyt=matcht.statewith|Idle|Reserved(WaitingForPeer|PartialHeaders_|FullHeaders)->failwith"h2.Respd.request: request is not active"|Reserved(ActiveMessage{request_body;_})->request_body|Active(_,{request_body;_})->request_body|Closed_->failwith"h2.Respd.request: request has ended"letresponset=matcht.statewith|Idle->None|Reserved_->None|Active((Open(ActiveMessage{response;_})|HalfClosed(ActiveMessage{response;_})),_)->Someresponse|Active((Open_|HalfClosed_),_)->None|Closed_->Noneletresponse_exnt=matcht.statewith|Idle|Reserved_->failwith"h2.Respd.response_exn: response has not arrived"|Active((Open(ActiveMessage{response;_})|HalfClosed(ActiveMessage{response;_})),_)->response|Active((Open_|HalfClosed_),_)->failwith"h2.Respd.response_exn: response has not arrived"|Closed_->assertfalseletresponse_body_exnt=matcht.statewith|Idle|Reserved_->failwith"h2.Respd.response_exn: response has not arrived"|Active((Open(ActiveMessage{response_body;_})|HalfClosed(ActiveMessage{response_body;_})),_)->response_body|Active((Open_|HalfClosed_),_)->failwith"h2.Respd.response_exn: response has not arrived"|Closed_->assertfalse(* let close_stream t =
*match t.error_code with
*| _, Some error_code ->
* reset_stream t error_code
*| _, None ->
* (match t.state with
* | HalfClosedLocal (ActiveMessage _) ->
* (* From RFC7540§8.1: A server can send a complete response prior to the
* client sending an entire request if the response does not depend on
* any portion of the request that has not been sent and received. When
* this is true, a server MAY request that the client abort transmission
* of a request without error by sending a RST_STREAM with an error code
* of NO_ERROR after sending a complete response (i.e., a frame with the
* END_STREAM flag). *)
* reset_stream t Error.NoError
* | Open (ActiveMessage _) ->
* Writer.flush t.writer (fun () -> finish_stream t Finished)
* | _ ->
* assert false) *)letclose_streamt=(* TODO: reserved *)matcht.statewith|Active(HalfClosed_,_)->(* easy case, just transition to the closed state. *)finish_streamtFinished|Active(Open_,_)->(* Still not done sending, reset stream with no error? *)(* TODO: *)()|_->()(* returns whether we should send an RST_STREAM frame or not. *)let_report_errort?response_bodyexnerror_code=matchfstt.error_codewith|`Ok->(matchresponse_bodywith|Someresponse_body->Body.close_readerresponse_body;(* do we even need to execute this read? `close_reader` already does it. *)Body.execute_readresponse_body|None->());t.error_code<-(exn:>[`Ok|error]),Someerror_code;t.error_handlerexn;true|`Exn_|`Protocol_error|`Invalid_response_body_length_|`Malformed_response_->(* XXX: Is this even possible? *)failwith"h2.Reqd.report_exn: NYI"letreport_errortexnerror_code=matcht.statewith|Active((Open(ActiveMessage{response_body;_})|HalfClosed(ActiveMessage{response_body;_})),s)->Body.close_writers.request_body;if_report_errort~response_bodyexnerror_codethenreset_streamterror_code|Reserved(ActiveMessages)|Active(_,s)->Body.close_writers.request_body;if_report_errortexnerror_codethenreset_streamterror_code|Reserved_->(* Streams in the reserved state don't yet have a stream-level error
* handler registered with them *)()|Idle|Closed_->(* Not allowed to send RST_STREAM frames in these states *)ignore(_report_errortexnerror_code)letclose_request_body{request_body;_}=Body.close_readerrequest_bodyleterror_codet=matchfstt.error_codewith#erroraserror->Someerror|`Ok->Noneletrequest_body_requires_outputresponse_body=(not(Body.is_closedresponse_body))||Body.has_pending_outputresponse_bodyletrequires_outputt=matcht.statewith|Idle->true|Reserved_->false|Active(Open_,{request_body;_})->request_body_requires_outputrequest_body|Active(HalfClosed_,_)->false|Closed_->falseletwrite_buffer_datawriter~off~lenframe_infobuffer=matchbufferwith|`Stringstr->Writer.write_datawriter~off~lenframe_infostr|`Bigstringbstr->Writer.schedule_datawriter~off~lenframe_infobstrletflush_request_bodyt~max_bytes=matcht.statewith|Active(Openactive_state,({request_body;_}ass))->letwritten=Body.transfer_to_writerrequest_bodyt.writer~max_frame_size:t.max_frame_size~max_bytest.idinifnot(request_body_requires_outputrequest_body)thent.state<-Active(HalfClosedactive_state,s);written|_->0letdeliver_trailer_headerstheaders=matcht.statewith|Active((Open(ActiveMessages)|HalfClosed(ActiveMessages)),_)->(* TODO: call the schedule_trailers callback *)s.trailers<-Someheaders|_->assertfalse