123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829openCore_kernelopenAsync_kernelopenUtilmoduleP=ProtocolmoduleDescription=DescriptionmoduleImplementation=ImplementationmoduleImplementations=ImplementationsmoduleTransport=TransportmoduleConnection=Connection(* The Result monad is also used. *)let(>>=~)=Result.(>>=)let(>>|~)=Result.(>>|)moduleRpc_common=structletdispatch_raw'conn~tag~version~bin_writer_query~query~query_id~response_handler=letquery={P.Query.tag;version;id=query_id;data=query}inmatchConnection.dispatchconn~response_handler~bin_writer_query~querywith|Ok()->Ok()|Error`Closed->ErrorRpc_error.Connection_closedletdispatch_rawconn~tag~version~bin_writer_query~query~query_id~f=letresponse_ivar=Ivar.create()inbeginmatchdispatch_raw'conn~tag~version~bin_writer_query~query~query_id~response_handler:(Some(fresponse_ivar))with|Ok()->()|Error_ase->Ivar.fillresponse_ivareend;Ivar.readresponse_ivarendmoduleRpc=structtype('query,'response)t={tag:P.Rpc_tag.t;version:int;bin_query:'queryBin_prot.Type_class.t;bin_response:'responseBin_prot.Type_class.t}letcreate~name~version~bin_query~bin_response={tag=P.Rpc_tag.of_stringname;version;bin_query;bin_response}letnamet=P.Rpc_tag.to_stringt.tagletversiont=t.versionletdescriptiont={Description.name=namet;version=versiont}letbin_queryt=t.bin_queryletbin_responset=t.bin_responseletimplementtf={Implementation.tag=t.tag;version=t.version;f=Rpc(t.bin_query.reader,t.bin_response.writer,f,Deferred)}letimplement'tf={Implementation.tag=t.tag;version=t.version;f=Rpc(t.bin_query.reader,t.bin_response.writer,f,Blocking)}letdispatch'tconnquery=letresponse_handlerivar=fun(response:_P.Response.t)~read_buffer~read_buffer_pos_ref->letresponse=response.data>>=~funlen->bin_read_from_bigstringt.bin_response.readerread_buffer~pos_ref:read_buffer_pos_ref~len~location:"client-side rpc response un-bin-io'ing"inIvar.fillivarresponse;`remove(Ok())inletquery_id=P.Query_id.create()inRpc_common.dispatch_rawconn~tag:t.tag~version:t.version~bin_writer_query:t.bin_query.writer~query~query_id~f:response_handlerletrpc_result_to_or_errortconnresult=Rpc_result.or_errorresult~rpc_tag:t.tag~rpc_version:t.version~connection_description:(Connection.descriptionconn)~connection_close_started:(Connection.close_reason~on_close:`startedconn)letdispatchtconnquery=dispatch'tconnquery>>|funresult->rpc_result_to_or_errortconnresultletdispatch_exntconnquery=dispatchtconnquery>>|Or_error.ok_exnmoduleExpert=structmoduleResponder=Implementations.Expert.Rpc_responderletmake_dispatchdo_dispatchconn~rpc_tag~versionbuf~pos~len~handle_response~handle_error=letresponse_handler:Connection.response_handler=funresponse~read_buffer~read_buffer_pos_ref->matchresponse.datawith|Errore->handle_error(Error.t_of_sexp(Rpc_error.sexp_of_t~get_connection_close_reason:(fun()->[%sexp(Deferred.peek(Connection.close_reason~on_close:`startedconn):Info.toption)])e));`remove(Ok())|Oklen->letlen=(len:Nat0.t:>int)inletd=handle_responseread_buffer~pos:!read_buffer_pos_ref~leninread_buffer_pos_ref:=!read_buffer_pos_ref+len;ifDeferred.is_determineddthen`remove(Ok())else`remove_and_waitdindo_dispatchconn~tag:(P.Rpc_tag.of_stringrpc_tag)~versionbuf~pos~len~response_handler:(Someresponse_handler)letdispatchconn~rpc_tag~versionbuf~pos~len~handle_response~handle_error=matchmake_dispatchConnection.dispatch_bigstringconn~rpc_tag~versionbuf~pos~len~handle_response~handle_errorwith|Ok()->`Ok|Error`Closed->`Connection_closedletschedule_dispatchconn~rpc_tag~versionbuf~pos~len~handle_response~handle_error=matchmake_dispatchConnection.schedule_dispatch_bigstringconn~rpc_tag~versionbuf~pos~len~handle_response~handle_errorwith|Okd->`Flushedd|Error`Closed->`Connection_closedtypeimplementation_result=Implementation.Expert.implementation_result=|Replied|Delayed_responseofunitDeferred.tletimplementtf={Implementation.tag=t.tag;version=t.version;f=Rpc_expert(f,Deferred)}letimplement'tf={Implementation.tag=t.tag;version=t.version;f=Rpc_expert(f,Blocking)}endendmoduleOne_way=structtype'msgt={tag:P.Rpc_tag.t;version:int;bin_msg:'msgBin_prot.Type_class.t}[@@derivingfields]letnamet=P.Rpc_tag.to_stringt.tagletcreate~name~version~bin_msg={tag=P.Rpc_tag.of_stringname;version;bin_msg}letdescriptiont={Description.name=namet;version=versiont}letimplementtf={Implementation.tag=t.tag;version=t.version;f=One_way(t.bin_msg.reader,f)}letdispatch'tconnquery=letquery_id=P.Query_id.create()inRpc_common.dispatch_raw'conn~tag:t.tag~version:t.version~bin_writer_query:t.bin_msg.writer~query~query_id~response_handler:Noneletrpc_result_to_or_errortconnresult=Rpc_result.or_errorresult~rpc_tag:t.tag~rpc_version:t.version~connection_description:(Connection.descriptionconn)~connection_close_started:(Connection.close_reason~on_close:`startedconn)letdispatchtconnquery=dispatch'tconnquery|>funresult->rpc_result_to_or_errortconnresultletdispatch_exntconnquery=Or_error.ok_exn(dispatchtconnquery)moduleExpert=structletimplementtf={Implementation.tag=t.tag;version=t.version;f=One_way_expertf;}letdispatch{tag;version;bin_msg=_}connbuf~pos~len=matchConnection.dispatch_bigstringconn~tag~versionbuf~pos~len~response_handler:Nonewith|Ok()->`Ok|Error`Closed->`Connection_closedletschedule_dispatch{tag;version;bin_msg=_}connbuf~pos~len=matchConnection.schedule_dispatch_bigstringconn~tag~versionbuf~pos~len~response_handler:Nonewith|Okflushed->`Flushedflushed|Error`Closed->`Connection_closedendendmodulePipe_close_reason=structtypet=|Closed_locally|Closed_remotely|ErrorofError.t[@@derivingbin_io,compare,sexp]moduleStable=structmoduleV1=structtypenonrect=t=|Closed_locally|Closed_remotely|ErrorofError.Stable.V2.t[@@derivingbin_io,compare,sexp]endendend(* the basis of the implementations of Pipe_rpc and State_rpc *)moduleStreaming_rpc=structmoduleInitial_message=P.Stream_initial_messagetype('query,'initial_response,'update_response,'error_response)t={tag:P.Rpc_tag.t;version:int;bin_query:'queryBin_prot.Type_class.t;bin_initial_response:'initial_responseBin_prot.Type_class.t;bin_update_response:'update_responseBin_prot.Type_class.t;bin_error_response:'error_responseBin_prot.Type_class.t;client_pushes_back:bool}letcreate?client_pushes_back~name~version~bin_query~bin_initial_response~bin_update_response~bin_error()=letclient_pushes_back=matchclient_pushes_backwith|None->false|Some()->truein{tag=P.Rpc_tag.of_stringname;version;bin_query;bin_initial_response;bin_update_response;bin_error_response=bin_error;client_pushes_back}letmake_initial_messagex={Initial_message.unused_query_id=P.Unused_query_id.t;initial=x}letimplement_gentimpl=letbin_init_writer=Initial_message.bin_writer_tt.bin_initial_response.writert.bin_error_response.writerin{Implementation.tag=t.tag;version=t.version;f=Streaming_rpc(t.bin_query.reader,bin_init_writer,t.bin_update_response.writer,impl);};;letimplementtf=letfcquery=fcquery>>|function|Errorerr->Error(make_initial_message(Errorerr))|Ok(initial,pipe)->Ok(make_initial_message(Okinitial),pipe)inimplement_gent(Pipef);;letimplement_directtf=letfcquerywriter=fcquerywriter>>|function|Error_asx->Error(make_initial_messagex)|Ok_asx->Ok(make_initial_messagex)inimplement_gent(Directf);;letaborttconnid=letquery={P.Query.tag=t.tag;version=t.version;id;data=`Abort}inignore(Connection.dispatchconn~bin_writer_query:P.Stream_query.bin_writer_nat0_t~query~response_handler:None:(unit,[`Closed])Result.t)modulePipe_message=structtype'at=|Updateof'a|Closedof[`By_remote_side|`ErrorofError.t]endmodulePipe_response=structtypet=|Continue|WaitofunitDeferred.tendmodulePipe_metadata=structtypet={query_id:P.Query_id.t;close_reason:Pipe_close_reason.tDeferred.t;}letidt=t.query_idletclose_reasont=t.close_reasonendmoduleResponse_state=structmoduleUpdate_handler=structtype'at='aPipe_message.t->Pipe_response.tendmoduleInitial=structtypenonrec('q,'i,'u,'e,'extra)t={rpc:('q,'i,'u,'e)t;query_id:P.Query_id.t;make_update_handler:(unit->'extra*'uUpdate_handler.t);ivar:(P.Query_id.t*'i*'extra,'e)Result.tRpc_result.tIvar.t;connection:Connection.t;}endmoduleState=structtype'at=|Waiting_for_initial_response:('q,'i,'u,'e,'extra)Initial.t->'ut|Writing_updatesof'aBin_prot.Type_class.reader*'aUpdate_handler.tendtype'at={mutablestate:'aState.t}endletread_error~get_connection_close_reason(handler:_Response_state.Update_handler.t)err=letcore_err=Error.t_of_sexp(Rpc_error.sexp_of_t~get_connection_close_reasonerr)inignore(handler(Closed(`Errorcore_err)):Pipe_response.t);`remove(Errorerr)leteof(handler:_Response_state.Update_handler.t)=ignore(handler(Closed`By_remote_side):Pipe_response.t);`remove(Ok())letresponse_handler~get_connection_close_reasoninitial_state:Connection.response_handler=letopenResponse_stateinletstate={state=Waiting_for_initial_responseinitial_state}infunresponse~read_buffer~read_buffer_pos_ref->matchstate.statewith|Writing_updates(bin_reader_update,handler)->beginmatchresponse.datawith|Errorerr->read_error~get_connection_close_reasonhandlererr|Oklen->letdata=bin_read_from_bigstringP.Stream_response_data.bin_reader_nat0_tread_buffer~pos_ref:read_buffer_pos_ref~len~location:"client-side streaming_rpc response un-bin-io'ing"~add_len:(function`Eof->0|`Ok(len:Nat0.t)->(len:>int))inmatchdatawith|Errorerr->read_error~get_connection_close_reasonhandlererr|Ok`Eof->eofhandler|Ok(`Oklen)->letdata=bin_read_from_bigstringbin_reader_updateread_buffer~pos_ref:read_buffer_pos_ref~len~location:"client-side streaming_rpc response un-bin-io'ing"inmatchdatawith|Errorerr->read_error~get_connection_close_reasonhandlererr|Okdata->matchhandler(Updatedata)with|Continue->`keep|Waitd->`waitdend|State.Waiting_for_initial_responseinitial_handler->(* We never use [`remove (Error _)] here, since that indicates that the
connection should be closed, and these are "normal" errors. (In contrast, the
errors we get in the [Writing_updates_to_pipe] case indicate more serious
problems.) Instead, we just put errors in [ivar]. *)leterrorerr=Ivar.fillinitial_handler.ivar(Errorerr);`remove(Ok())inbeginmatchresponse.datawith|Errorerr->errorerr|Oklen->letinitial=bin_read_from_bigstring(Initial_message.bin_reader_tinitial_handler.rpc.bin_initial_response.readerinitial_handler.rpc.bin_error_response.reader)read_buffer~pos_ref:read_buffer_pos_ref~len~location:"client-side streaming_rpc initial_response un-bin-io'ing"inbeginmatchinitialwith|Errorerr->errorerr|Okinitial_msg->beginmatchinitial_msg.initialwith|Errorerr->Ivar.fillinitial_handler.ivar(Ok(Errorerr));`remove(Ok())|Okinitial->let(extra,handler)=initial_handler.make_update_handler()inIvar.fillinitial_handler.ivar(Ok(Ok(initial_handler.query_id,initial,extra)));state.state<-Writing_updates(initial_handler.rpc.bin_update_response.reader,handler);`keependendend;;letdispatch_gentconnquerymake_update_handler=letbin_writer_query=P.Stream_query.bin_writer_needs_length(Writer_with_length.of_type_classt.bin_query)inletquery=`Queryqueryinletquery_id=P.Query_id.create()inRpc_common.dispatch_rawconn~query_id~tag:t.tag~version:t.version~bin_writer_query~query~f:(funivar->response_handler~get_connection_close_reason:(fun()->[%sexp(Deferred.peek(Connection.close_reason~on_close:`startedconn):Info.toption)]){rpc=t;query_id;connection=conn;ivar;make_update_handler;})>>|Rpc_result.or_error~rpc_tag:t.tag~rpc_version:t.version~connection_description:(Connection.descriptionconn)~connection_close_started:(Connection.close_reason~on_close:`startedconn);;letdispatch_itertconnquery~f=dispatch_gentconnquery(fun()->(),f)>>|function|Error_|Ok(Error_)ase->e|Ok(Ok(id,init,()))->Ok(Ok(id,init));;letdispatchtconnquery=dispatch_gentconnquery(fun()->let(pipe_r,pipe_w)=Pipe.create()in(* Set a small buffer to reduce the number of pushback events *)Pipe.set_size_budgetpipe_w100;letclose_reason:Pipe_close_reason.tIvar.t=Ivar.create()inletf:_Response_state.Update_handler.t=function|Updatedata->ifnot(Pipe.is_closedpipe_w)thenbeginPipe.write_without_pushbackpipe_wdata;ift.client_pushes_back&&Pipe.lengthpipe_w>=Pipe.size_budgetpipe_wthenWait(Pipe.downstream_flushedpipe_w>>|function|`Ok|`Reader_closed->())elseContinueendelseContinue|Closedreason->Ivar.fill_if_emptyclose_reason(matchreasonwith|`By_remote_side->Closed_remotely|`Errorerr->Errorerr);Pipe.closepipe_w;Continuein((pipe_r,close_reason),f))>>|function|Error_|Ok(Error_)ase->e|Ok(Ok(id,init,(pipe_r,close_reason)))->upon(Pipe.closedpipe_r)(fun()->ifnot(Ivar.is_fullclose_reason)thenbeginaborttconnid;Ivar.fillclose_reasonClosed_locally;end);letpipe_metadata:Pipe_metadata.t={query_id=id;close_reason=Ivar.readclose_reason;}inOk(Ok(pipe_metadata,init,pipe_r));;end(* A Pipe_rpc is like a Streaming_rpc, except we don't care about initial state - thus
it is restricted to unit and ultimately ignored *)modulePipe_rpc=structtype('query,'response,'error)t=('query,unit,'response,'error)Streaming_rpc.tmoduleId=P.Query_idmoduleMetadata=Streaming_rpc.Pipe_metadataletcreate?client_pushes_back~name~version~bin_query~bin_response~bin_error()=Streaming_rpc.create?client_pushes_back~name~version~bin_query~bin_initial_response:Unit.bin_t~bin_update_response:bin_response~bin_error()letbin_queryt=t.Streaming_rpc.bin_queryletbin_responset=t.Streaming_rpc.bin_update_responseletbin_errort=t.Streaming_rpc.bin_error_responseletclient_pushes_backt=t.Streaming_rpc.client_pushes_backletimplementtf=Streaming_rpc.implementt(funaquery->faquery>>|funx->x>>|~funx->(),x)moduleDirect_stream_writer=structincludeImplementations.Direct_stream_writermoduleGroup=structmoduleBuffer=structtypet=Bigstring.trefletcreate?(initial_size=4096)()=ifinitial_size<0thenfailwiths"Rpc.Pipe_rpc.Direct_stream_writer.Group.Buffer.create \
got negative buffer size"initial_sizeInt.sexp_of_t;ref(Bigstring.createinitial_size)endtype'adirect_stream_writer='atmoduleT=Implementation_types.Direct_stream_writertype'at='aT.Group.t={mutablecomponents:'adirect_stream_writerBag.t;components_by_id:'acomponentId.Table.t;buffer:Bigstring.tref}and'acomponent='aT.Group.component={writer_element_in_group:'adirect_stream_writerBag.Elt.t;group_element_in_writer:'aT.group_entryBag.Elt.t}letcreate?buffer()=letbuffer=matchbufferwith|None->Buffer.create()|Someb->bin{components=Bag.create();components_by_id=Id.Table.create();buffer};;letlengtht=Bag.lengtht.componentsletadd_exnt(writer:_Implementations.Direct_stream_writer.t)=ifis_closedwriterthenfailwith"Rpc.Pipe_rpc.Direct_stream_writer.Group.add_exn: \
cannot add a closed direct stream writer";ifHashtbl.memt.components_by_idwriter.idthenfailwith"Rpc.Pipe_rpc.Direct_stream_writer.Group.add_exn: \
trying to add a direct stream writer that is already present \
in the group";(matchBag.chooset.componentswith|None->()|Someone->letone=Bag.Elt.valueoneinifnot(phys_equal(bin_writerone)(bin_writerwriter))thenfailwith"Rpc.Pipe_rpc.Direct_stream_writer.Group.add: \
cannot add a direct stream writer with a different bin_writer");letwriter_element_in_group=Bag.addt.componentswriterinletgroup_element_in_writer=Bag.addwriter.groups{group=t;element_in_group=writer_element_in_group}inHashtbl.add_exnt.components_by_id~key:writer.id~data:{writer_element_in_group;group_element_in_writer};;;letremovet(writer:_Implementations.Direct_stream_writer.t)=matchHashtbl.find_and_removet.components_by_idwriter.idwith|None->()|Some{writer_element_in_group;group_element_in_writer}->Bag.removet.componentswriter_element_in_group;Bag.removewriter.groupsgroup_element_in_writer;;;letto_listt=Bag.to_listt.componentsletflushedt=Deferred.all_unit(List.map(to_listt)~f:flushed)moduleExpert=structletwrite_without_pushbackt~buf~pos~len=Bag.itert.components~f:(fundirect_stream_writer->(* Writers are automatically scheduled to be removed from their groups when
closed, so [`Closed] here just means that the removal didn't happen yet. *)ignore(Expert.write_without_pushbackdirect_stream_writer~buf~pos~len:[`Ok|`Closed]));;letwritet~buf~pos~len=write_without_pushbackt~buf~pos~len;flushedt;;endletwrite_without_pushbacktx=matchBag.chooset.componentswith|None->()|Someone->letone=Bag.Elt.valueoneinlet{Bin_prot.Type_class.write;size}=bin_writeroneinletbuffer=!(t.buffer)in(* Optimistic first try *)matchwritebuffer~pos:0xwith|len->Expert.write_without_pushbackt~buf:buffer~pos:0~len|exception_->(* It's likely that the exception is due to a buffer overflow, so resize the
internal buffer and try again. Technically we could match on
[Bin_prot.Common.Buffer_short] only, however we can't easily enforce that
custom bin_write_xxx functions raise this particular exception and not
[Invalid_argument] or [Failure] for instance. *)letlen=sizexinBigstring.unsafe_destroybuffer;letbuffer=Bigstring.create(Int.ceil_pow2len)int.buffer:=buffer;letlen=writebuffer~pos:0xinExpert.write_without_pushbackt~buf:buffer~pos:0~len;;letwritetx=write_without_pushbacktx;flushedt;;endendletimplement_directtf=Streaming_rpc.implement_directtfletdispatchtconnquery=Streaming_rpc.dispatchtconnquery>>|funresponse->response>>|~funx->x>>|~fun(metadata,(),pipe_r)->pipe_r,metadataexceptionPipe_rpc_failedletdispatch_exntconnquery=dispatchtconnquery>>|funresult->matchresultwith|Errorrpc_error->raise(Error.to_exnrpc_error)|Ok(Error_)->raisePipe_rpc_failed|Ok(Okpipe_and_id)->pipe_and_idmodulePipe_message=Streaming_rpc.Pipe_messagemodulePipe_response=Streaming_rpc.Pipe_responseletdispatch_itertconnquery~f=Streaming_rpc.dispatch_itertconnquery~f>>|funresponse->response>>|~funx->x>>|~fun(id,())->idletabort=Streaming_rpc.abortletclose_reason=Streaming_rpc.Pipe_metadata.close_reasonletnamet=P.Rpc_tag.to_stringt.Streaming_rpc.tagletversiont=t.Streaming_rpc.versionletdescriptiont={Description.name=namet;version=versiont}endmoduleState_rpc=structtype('query,'initial,'response,'error)t=('query,'initial,'response,'error)Streaming_rpc.tmoduleId=P.Query_idmoduleMetadata=Streaming_rpc.Pipe_metadataletcreate?client_pushes_back~name~version~bin_query~bin_state~bin_update~bin_error()=Streaming_rpc.create?client_pushes_back~name~version~bin_query~bin_initial_response:bin_state~bin_update_response:bin_update~bin_error()letbin_queryt=t.Streaming_rpc.bin_queryletbin_statet=t.Streaming_rpc.bin_initial_responseletbin_updatet=t.Streaming_rpc.bin_update_responseletbin_errort=t.Streaming_rpc.bin_error_responseletimplement=Streaming_rpc.implementletdispatchtconnquery=Streaming_rpc.dispatchtconnquery>>|funresponse->response>>|~funx->x>>|~fun(metadata,state,update_r)->state,update_r,metadataletabort=Streaming_rpc.abortletclose_reason=Streaming_rpc.Pipe_metadata.close_reasonletnamet=P.Rpc_tag.to_stringt.Streaming_rpc.tagletversiont=t.Streaming_rpc.versionletdescriptiont={Description.name=namet;version=versiont}endmoduleAny=structtypet=|Rpc:('q,'r)Rpc.t->t|Pipe:('q,'r,'e)Pipe_rpc.t->t|State:('q,'s,'u,'e)State_rpc.t->t|One_way:'mOne_way.t->tend