openCoreopenAsync_kernelopenRpcmoduleVersioned_direct_stream_writer=structmoduleDirect_stream_writer=Pipe_rpc.Direct_stream_writertype'inputt=|T:{convert:'input->'output;writer:'outputDirect_stream_writer.t}->'inputtletcreate~convert~writer=T{convert;writer}letwrite(T{convert;writer})input=Direct_stream_writer.writewriter(convertinput);;letwrite_without_pushback(T{convert;writer})input=Direct_stream_writer.write_without_pushbackwriter(convertinput);;letclose(T{convert=_;writer})=Direct_stream_writer.closewriterletis_closed(T{convert=_;writer})=Direct_stream_writer.is_closedwriterletclosed(T{convert=_;writer})=Direct_stream_writer.closedwriterendletfailed_conversionx=Error.create"type conversion failure"x[%sexp_of:[`Msg|`Query|`Response|`Error|`State|`Update]*[`Rpcofstring]*[`Versionofint]*exn];;letmultiple_registrationsx=Error.create"multiple rpc registrations"x[%sexp_of:[`Rpcofstring]*[`Versionofint]];;letunknown_versionx=Error.create"unknown rpc version"x[%sexp_of:string*int]moduleCallee_converts=structmoduleRpc=structmoduleSimple=structtype('query,'response)adapter={adapt:'state.('state->'query->'responseDeferred.t)->'stateImplementation.t}type('query,'response)t={name:string;adapters:('query,'response)adapterInt.Map.t}[@@derivingfields]letcreate~name={name;adapters=Int.Map.empty}letwrap_errorfnstatequery=match%mapfnstatequerywith|Okvalue->Okvalue|Errorerror->Error(Error.to_string_humerror);;letadd{name;adapters}rpcadapter=ifString.(<>)name(Rpc.namerpc)thenOr_error.error"Rpc names don't agree"(name,Rpc.namerpc)[%sexp_of:string*string]else(letversion=Rpc.versionrpcinmatchMap.findadaptersversionwith|Some_->Or_error.error"Version already exists"(name,version)[%sexp_of:string*int]|None->letadapters=Map.setadapters~key:version~data:adapterinOk{name;adapters});;letadd_rpc_versiontold_rpcupgradedowngrade=letadaptfn=letadaptedstateold_query=let%mapresult=fnstate(upgradeold_query)indowngraderesultinRpc.implementold_rpcadaptedinaddtold_rpc{adapt};;letadd_rpc_version_with_failuretold_rpcupgrade_or_errordowngrade_or_error=letadaptfn=letadaptedstateold_query=letopenDeferred.Result.Let_syntaxinlet%bindquery=Deferred.return(upgrade_or_errorold_query)inlet%bindresponse=fnstatequeryinDeferred.return(downgrade_or_errorresponse)inRpc.implementold_rpc(wrap_erroradapted)inaddtold_rpc{adapt};;letadd_versiont~version~bin_query~bin_responseupgradedowngrade=letrpc=Rpc.create~name:t.name~version~bin_query~bin_responseinadd_rpc_versiontrpcupgradedowngrade;;letadd_version_with_failuret~version~bin_query~bin_responseupgradedowngrade=letrpc=Rpc.create~name:t.name~version~bin_query~bin_responseinadd_rpc_version_with_failuretrpcupgradedowngrade;;letimplementtfn=Map.datat.adapters|>List.map~f:(fun{adapt}->adaptfn)endmoduletypeS=sigtypequerytyperesponsevalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->responseDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponseend)=structletname=Model.nametype'simpl='s->version:int->Model.query->Model.responseDeferred.ttypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry:(int,implementer*Any.t)Hashtbl.t=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_rpc)->i.implement~log_versionf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Hashtbl.keysregistry)moduleRegister(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalresponse_of_model:Model.response->responseend)=structopenVersion_iletrpc=Rpc.create~name~version~bin_query~bin_responselet()=letimplement~log_versionf=Rpc.implementrpc(funsq->log_versionversion;matchResult.try_with(fun()->Version_i.model_of_queryq)with|Errorexn->Error.raise(failed_conversion(`Query,`Rpcname,`Versionversion,exn))|Okq->let%mapr=fs~versionqin(matchResult.try_with(fun()->Version_i.response_of_modelr)with|Okr->r|Errorexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Rpcrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endendendmodulePipe_rpc=structmoduletypeS=sigtypequerytyperesponsetypeerrorvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->(responsePipe.Reader.t,error)Result.tDeferred.t)->'stateImplementation.tlistvalimplement_direct_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->responseVersioned_direct_stream_writer.t->(unit,error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponsetypeerrorend)=structletname=Model.nametype'simpl=|Pipeof('s->version:int->Model.query->(Model.responsePipe.Reader.t,Model.error)Result.tDeferred.t)|Directof('s->version:int->Model.query->Model.responseVersioned_direct_stream_writer.t->(unit,Model.error)Result.tDeferred.t)typeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry=Int.Table.create~size:1()letimplement_multi_gen?log_not_previously_seen_versionimpl=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_)->i.implement~log_versionimpl);;letimplement_multi?log_not_previously_seen_versionf=implement_multi_gen?log_not_previously_seen_version(Pipef);;letimplement_direct_multi?log_not_previously_seen_versionf=implement_multi_gen?log_not_previously_seen_version(Directf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalerror_of_model:Model.error->errorvalclient_pushes_back:boolendmoduleMake_shared(Version_i:Version_shared)(Convert:sigvalconvert_elt:(Model.response->Version_i.response)Or_error.tvalconvert_pipe:Model.responsePipe.Reader.t->Version_i.responsePipe.Reader.tend)=structopenVersion_iopenConvertletrpc=Pipe_rpc.create~name~version~bin_query~bin_response~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;letwrapped_model_of_queryq=matchVersion_i.model_of_queryqwith|exceptionexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))|q->q;;letwrapped_error_of_modelerror=matchVersion_i.error_of_modelerrorwith|error->Errorerror|exceptionexn->Error.raise(failed_conversion(`Error,`Rpcname,`Versionversion,exn));;letimplement~log_versionimpl=matchimplwith|Pipef->Pipe_rpc.implementrpc(funsq->log_versionversion;match%bindfs~version(wrapped_model_of_queryq)with|Okpipe->Monitor.handle_errors(fun()->return(Ok(convert_pipepipe)))(funexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))|Errorerror->return(wrapped_error_of_modelerror))|Directf->letconvert_elt=Or_error.ok_exnconvert_eltinPipe_rpc.implement_directrpc(funsqdsw->letwriter=Versioned_direct_stream_writer.create~convert:convert_elt~writer:dswinmatch%mapfs~version(wrapped_model_of_queryq)writerwith|Ok()->Ok()|Errorerror->wrapped_error_of_modelerror);;let()=matchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Piperpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalresponse_of_model:Model.responsePipe.Reader.t->responsePipe.Reader.tend)=Make_shared(Version_i)(structletconvert_elt=Or_error.error_string"cannot use direct interface with Register_raw";;letconvert_pipe=Version_i.response_of_modelend)moduleRegister(Version_i:sigincludeVersion_sharedvalresponse_of_model:Model.response->responseend)=Make_shared(Version_i)(structletconvert_elt=OkVersion_i.response_of_modelletconvert_pipepipe=Pipe.mappipe~f:Version_i.response_of_modelend)endendmoduleState_rpc=structmoduletypeS=sigtypequerytypestatetypeupdatetypeerrorvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('connection_state->version:int->query->(state*updatePipe.Reader.t,error)Result.tDeferred.t)->'connection_stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytypestatetypeupdatetypeerrorend)=structletname=Model.nametype'simpl='s->version:int->Model.query->(Model.state*Model.updatePipe.Reader.t,Model.error)Result.tDeferred.ttypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_)->i.implement~log_versionf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalstate_of_model:Model.state->statevalerror_of_model:Model.error->errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalupdate_of_model:Model.state->Model.updatePipe.Reader.t->updatePipe.Reader.tend)=structopenVersion_iletrpc=State_rpc.create~name~version~bin_query~bin_state~bin_update~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;let()=letimplement~log_versionf=State_rpc.implementrpc(funsq->log_versionversion;matchVersion_i.model_of_queryqwith|exceptionexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))|q->(match%bindfs~versionqwith|Ok(model_state,pipe)->letstate=matchVersion_i.state_of_modelmodel_statewith|state->state|exceptionexn->Error.raise(failed_conversion(`State,`Rpcname,`Versionversion,exn))inMonitor.handle_errors(fun()->return(Ok(state,Version_i.update_of_modelmodel_statepipe)))(funexn->Error.raise(failed_conversion(`Update,`Rpcname,`Versionversion,exn)))|Errorerror->return(matchVersion_i.error_of_modelerrorwith|error->Errorerror|exceptionexn->Error.raise(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Staterpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister(Version_i:sigincludeVersion_sharedvalupdate_of_model:Model.update->updateend)=structincludeRegister_raw(structincludeVersion_iletupdate_of_model_statepipe=Pipe.map~f:update_of_modelpipeend)endendendmoduleOne_way=structmoduletypeS=sigtypemsgvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->msg->unit)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypemsgend)=structletname=Model.nametype'simpl='s->version:int->Model.msg->unittypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry:(int,implementer*Any.t)Hashtbl.t=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_rpc)->i.implement~log_versionf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Hashtbl.keysregistry)moduleRegister(Version_i:sigtypemsg[@@derivingbin_io]valversion:intvalmodel_of_msg:msg->Model.msgend)=structopenVersion_iletrpc=One_way.create~name~version~bin_msglet()=letimplement~log_versionf=One_way.implementrpc(funsq->log_versionversion;matchResult.try_with(fun()->Version_i.model_of_msgq)with|Errorexn->Error.raise(failed_conversion(`Msg,`Rpcname,`Versionversion,exn))|Okq->fs~versionq)inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.One_wayrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endendendendmoduleMenu=struct(***************** some prohibitions for this module ******************
(1) !!! never prune old versions of this rpc !!!
It is too fundamental to the workings of various versioning
schemes and it probably won't change very much anyway.
(2) !!! only ever say "with bin_io" on built-in ocaml types !!!
Examples of built-in types are int, list, string, etc.
This is to protect ourselves against changes to Core data
structures, for example.
*********************************************************************)moduleModel=structletname="__Versioned_rpc.Menu"typequery=unittyperesponse=Description.tlistendincludeCallee_converts.Rpc.Make(Model)letrpc_name=Model.namemoduleV1=structmoduleT=structletversion=1typequery=unit[@@derivingbin_io]typeresponse=(string*int)list[@@derivingbin_io]letmodel_of_queryq=qletresponse_of_model=List.map~f:(fun{Description.name;version}->name,version);;endincludeTincludeRegister(T)endmoduleCurrent_version=V1letaddimpls=letmenu=List.mapimpls~f:Implementation.descriptioninletmenu_impls=implement_multi(fun_~version:_()->returnmenu)inimpls@menu_impls;;typet=Int.Set.tString.Table.t[@@derivingsexp_of]letsupported_rpcst=letopenList.Let_syntaxinlet%bindname,versions=String.Table.to_alisttinlet%mapversion=Int.Set.to_listversionsin{Description.name;version};;letsupported_versionst~rpc_name=Option.value~default:Int.Set.empty(Hashtbl.findtrpc_name);;letof_entriesentries=Hashtbl.map~f:Int.Set.of_list(String.Table.of_alist_multientries);;letrequestconn=let%mapresult=Rpc.dispatchCurrent_version.rpcconn()inResult.mapresult~f:of_entries;;letcreatedescriptions=List.mapdescriptions~f:(fun{Description.name;version}->name,version)|>of_entries;;endmoduleConnection_with_menu=structtypet={connection:Connection.t;menu:Menu.t}[@@derivingfields]letcreateconnection=letopenDeferred.Or_error.Let_syntaxinlet%mapmenu=Menu.requestconnectionin{connection;menu};;letcreate_directlyconnectionmenu={connection;menu}endmoduleCaller_converts=structletmost_recent_common_version~rpc_name~caller_versions~callee_versions~callee_menu=matchSet.max_elt(Set.intercallee_versionscaller_versions)with|Someversion->Okversion|None->error_s[%message"caller and callee share no common versions for rpc"(rpc_name:string)(caller_versions:Int.Set.t)(callee_versions:Int.Set.t)(callee_menu:Menu.t)];;let%expect_test"highest version number is taken in most_recent_common_version"=letrpc_name="the-rpc"inletmenu=Menu.of_entries[rpc_name,2]inletresult=most_recent_common_version~rpc_name~caller_versions:(Int.Set.of_list[1;2;3])~callee_versions:(Int.Set.of_list[2])~callee_menu:menuinprint_s[%sexp(result:intOr_error.t)];[%expect{| (Ok 2) |}];;let%expect_test"error from most_recent_common_version looks reasonable"=letthe_rpc="the-rpc"inletnot_the_rpc="other-rpc"inletmenu=Menu.of_entries[not_the_rpc,1;not_the_rpc,2]inletresult=most_recent_common_version~rpc_name:the_rpc~caller_versions:(Int.Set.of_list[1;2;3])~callee_versions:(Menu.supported_versionsmenu~rpc_name:the_rpc)~callee_menu:menuinprint_s[%sexp(result:intOr_error.t)];[%expect{|
(Error
("caller and callee share no common versions for rpc" (rpc_name the-rpc)
(caller_versions (1 2 3)) (callee_versions ())
(callee_menu ((other-rpc (1 2))))))|}];;moduleDispatch=structmoduleMake(M:Monad)=structopenMletwith_specific_version~version~connection~name~query~dispatcher~registry=matchHashtbl.findregistryversionwith|None->return(Error(unknown_version(name,version)))|Some(dispatch,_rpc)->dispatcherdispatchconnectionquery;;letwith_version_menu{Connection_with_menu.connection;menu}query~name~versions~registry~dispatcher=letcallee_versions=Menu.supported_versionsmenu~rpc_name:nameinletcaller_versions=versions()inmatchmost_recent_common_version~rpc_name:name~caller_versions~callee_versions~callee_menu:menuwith|Errore->return(Errore)|Okversion->with_specific_version~version~connection~name~query~registry~dispatcher;;endmoduleAsync=Make(Deferred)moduleDirect=Make(Monad.Ident)endmoduleRpc=structmoduletypeS=sigtypequerytyperesponsevaldispatch_multi:Connection_with_menu.t->query->responseOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponseend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:Fn.id;;moduleRegister'(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_response:Model.query->response->Model.responseend)=structopenVersion_iletrpc=Rpc.create~name~version~bin_query~bin_responselet()=letdispatchconnmq=matchResult.try_with(fun()->Version_i.query_of_modelmq)with|Errorexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|Okq->let%mapresult=Rpc.dispatchrpcconnqinResult.bindresult~f:(funr->matchResult.try_with(fun()->Version_i.model_of_responsemqr)with|Okr->Okr|Errorexn->Error(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.Rpcrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_response:response->Model.responseend)=Register'(structincludeVersion_iletmodel_of_response_r=model_of_responserend)endendmodulePipe_rpc=structmoduletypeS=sigtypequerytyperesponsetypeerrorvaldispatch_multi:Connection_with_menu.t->query->(responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,error)Result.tOr_error.tDeferred.tvaldispatch_iter_multi:Connection_with_menu.t->query->f:(responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,error)Result.tOr_error.tDeferred.tvalabort_multi:Connection_with_menu.t->Pipe_rpc.Id.t->unitOr_error.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponsetypeerrorend)=structtypedispatcher={abort:Connection.t->Pipe_rpc.Id.t->unit;dispatch:Connection.t->Model.query->(Model.responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,Model.error)Result.tOr_error.tDeferred.t;dispatch_iter:Connection.t->Model.query->f:(Model.responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,Model.error)Result.tOr_error.tDeferred.t}letname=Model.nameletregistry:(dispatcher*Any.t)Int.Table.t=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_iter_multiconn_with_menuquery~f=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:(fun{dispatch_iter;_}connquery->dispatch_iterconnquery~f);;letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:(fun{dispatch;_}connquery->dispatchconnquery);;letabort_multiconn_with_menuid=Dispatch.Direct.with_version_menuconn_with_menuid~name~versions~registry~dispatcher:(fun{abort;_}connid->abortconnid;Ok());;moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_error:error->Model.errorvalclient_pushes_back:boolendmoduleMake_shared(Version_i:Version_shared)(Convert:sigvalconvert_elt:(Version_i.response->Model.response)Or_error.tvalconvert_pipe:Version_i.responsePipe.Reader.t->Model.responseOr_error.tPipe.Reader.tend)=structopenVersion_iopenConvertletrpc=Pipe_rpc.create~name~version~bin_query~bin_response~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;letwrapped_query_of_modelq=matchVersion_i.query_of_modelqwith|exceptionexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|q->return(Okq);;letconvert_resultresult~convert_ok=matchresultwith|Error_ase->e|Ok(Errore)->(matchVersion_i.model_of_errorewith|e'->Ok(Errore')|exceptionexn->Error(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))|Ok(Okok)->Ok(Ok(convert_okok));;letdispatchconnq=wrapped_query_of_modelq>>=?funq->let%mapresult=Pipe_rpc.dispatchrpcconnqinconvert_resultresult~convert_ok:(fun(pipe,id)->convert_pipepipe,id);;letdispatch_iterconnq~f=letconvert_elt=Or_error.ok_exnconvert_eltinwrapped_query_of_modelq>>=?funq->letconvert_message(m:_Pipe_rpc.Pipe_message.t)=matchmwith|Closed_asclosed->closed|Updateu->Update(convert_eltu)inlet%mapresult=Pipe_rpc.dispatch_iterrpcconnq~f:(funmessage->f(convert_messagemessage))inconvert_resultresult~convert_ok:Fn.id;;letabortconnid=Pipe_rpc.abortrpcconnidlet()=matchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({abort;dispatch;dispatch_iter},Any.Piperpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalmodel_of_response:responsePipe.Reader.t->Model.responseOr_error.tPipe.Reader.tend)=Make_shared(Version_i)(structletconvert_elt=Or_error.error_string"Cannot use Direct with Register_raw"letconvert_pipe=Version_i.model_of_responseend)moduleRegister(Version_i:sigincludeVersion_sharedvalmodel_of_response:response->Model.responseend)=Make_shared(Version_i)(structletconvert_elt=OkVersion_i.model_of_responseletconvert_pipers=Pipe.maprs~f:(funr->matchVersion_i.model_of_responserwith|r->Okr|exceptionexn->Error(failed_conversion(`Response,`Rpcname,`VersionVersion_i.version,exn)));;end)endendmoduleState_rpc=structmoduletypeS=sigtypequerytypestatetypeupdatetypeerrorvaldispatch_multi:Connection_with_menu.t->query->(state*updateOr_error.tPipe.Reader.t*State_rpc.Metadata.t,error)Result.tOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytypestatetypeupdatetypeerrorend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:Fn.id;;moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_state:state->Model.statevalmodel_of_error:error->Model.errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalmodel_of_update:updatePipe.Reader.t->Model.updateOr_error.tPipe.Reader.tend)=structopenVersion_iletrpc=State_rpc.create~name~version~bin_query~bin_state~bin_update~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;let()=letdispatchconnq=matchVersion_i.query_of_modelqwith|exceptionexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|q->let%mapresult=State_rpc.dispatchrpcconnqin(matchresultwith|Errorexn->Errorexn|Ok(Errore)->(matchVersion_i.model_of_errorewith|e'->Ok(Errore')|exceptionexn->Error(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))|Ok(Ok(state,pipe,id))->(matchVersion_i.model_of_statestatewith|exceptionexn->Error(failed_conversion(`State,`Rpcname,`Versionversion,exn))|state->Ok(Ok(state,Version_i.model_of_updatepipe,id))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.Staterpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister(Version_i:sigincludeVersion_sharedvalmodel_of_update:update->Model.updateend)=structincludeRegister_raw(structincludeVersion_iletmodel_of_updaters=Pipe.maprs~f:(funr->matchVersion_i.model_of_updaterwith|r->Okr|exceptionexn->Error(failed_conversion(`Update,`Rpcname,`Versionversion,exn)));;end)endendendmoduleOne_way=structmoduletypeS=sigtypemsgvaldispatch_multi:Connection_with_menu.t->msg->unitOr_error.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypemsgend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menumsg=Dispatch.Direct.with_version_menuconn_with_menumsg~name~versions~registry~dispatcher:Fn.id;;moduleRegister(Version_i:sigtypemsg[@@derivingbin_io]valversion:intvalmsg_of_model:Model.msg->msgend)=structopenVersion_iletrpc=One_way.create~name~version~bin_msglet()=letdispatchconnq=matchResult.try_with(fun()->Version_i.msg_of_modelq)with|Errorexn->Error(failed_conversion(`Msg,`Rpcname,`Versionversion,exn))|Okq->One_way.dispatchrpcconnqinmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.One_wayrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endendendendmoduleBoth_convert=structmodulePlain=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_responsetypecallee_responsevaldispatch_multi:Connection_with_menu.t->caller_query->caller_responseOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->callee_responseDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytyperesponseendmoduleCallee:sigtypequerytyperesponseendend)=structopenModelletname=namemoduleCaller=Caller_converts.Rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.Rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduleRegister(Version:sigopenModelvalversion:inttypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valquery_of_caller_model:Caller.query->queryvalcallee_model_of_query:query->Callee.queryvalresponse_of_callee_model:Callee.response->responsevalcaller_model_of_response:response->Caller.responseend)=structincludeCallee.Register(structincludeVersionletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelend)includeCaller.Register(structincludeVersionletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multi(* Note: Caller.versions is the same as Callee.versions, so it doesn't matter which
one we call here. Same for [rpcs]. *)letversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmodulePipe_rpc=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_responsetypecallee_responsetypecaller_errortypecallee_errorvaldispatch_multi:Connection_with_menu.t->caller_query->(caller_responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,caller_error)Result.tOr_error.tDeferred.tvaldispatch_iter_multi:Connection_with_menu.t->caller_query->f:(caller_responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,caller_error)Result.tOr_error.tDeferred.tvalabort_multi:Connection_with_menu.t->Pipe_rpc.Id.t->unitOr_error.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->(callee_responsePipe.Reader.t,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalimplement_direct_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->callee_responseVersioned_direct_stream_writer.t->(unit,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytyperesponsetypeerrorendmoduleCallee:sigtypequerytyperesponsetypeerrorendend)=structopenModelletname=namemoduleCaller=Caller_converts.Pipe_rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.Pipe_rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduletypeVersion_shared=sigvalversion:inttypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valquery_of_caller_model:Model.Caller.query->queryvalcallee_model_of_query:query->Model.Callee.queryvalerror_of_callee_model:Model.Callee.error->errorvalcaller_model_of_error:error->Model.Caller.errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalresponse_of_callee_model:Model.Callee.responsePipe.Reader.t->responsePipe.Reader.tvalcaller_model_of_response:responsePipe.Reader.t->Model.Caller.responseOr_error.tPipe.Reader.tend)=structincludeCallee.Register_raw(structincludeVersion_iletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register_raw(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseletmodel_of_error=caller_model_of_errorend)endmoduleRegister(Version_i:sigincludeVersion_sharedvalresponse_of_callee_model:Model.Callee.response->responsevalcaller_model_of_response:response->Model.Caller.responseend)=structincludeCallee.Register(structincludeVersion_iletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseletmodel_of_error=caller_model_of_errorend)endletdispatch_multi=Caller.dispatch_multiletdispatch_iter_multi=Caller.dispatch_iter_multiletabort_multi=Caller.abort_multiletimplement_multi=Callee.implement_multiletimplement_direct_multi=Callee.implement_direct_multiletversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmoduleState_rpc=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_statetypecallee_statetypecaller_updatetypecallee_updatetypecaller_errortypecallee_errorvaldispatch_multi:Connection_with_menu.t->caller_query->(caller_state*caller_updateOr_error.tPipe.Reader.t*State_rpc.Metadata.t,caller_error)Result.tOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->(callee_state*callee_updatePipe.Reader.t,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytypestatetypeupdatetypeerrorendmoduleCallee:sigtypequerytypestatetypeupdatetypeerrorendend)=structopenModelletname=namemoduleCaller=Caller_converts.State_rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.State_rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduletypeVersion_shared=sigvalversion:inttypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valquery_of_caller_model:Model.Caller.query->queryvalcallee_model_of_query:query->Model.Callee.queryvalcaller_model_of_state:state->Model.Caller.statevalstate_of_callee_model:Model.Callee.state->statevalcaller_model_of_error:error->Model.Caller.errorvalerror_of_callee_model:Model.Callee.error->errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalcaller_model_of_update:updatePipe.Reader.t->Model.Caller.updateOr_error.tPipe.Reader.tvalupdate_of_callee_model:Model.Callee.state->Model.Callee.updatePipe.Reader.t->updatePipe.Reader.tend)=structincludeCallee.Register_raw(structincludeVersion_iletmodel_of_query=callee_model_of_queryletstate_of_model=state_of_callee_modelletupdate_of_model=update_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register_raw(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_state=caller_model_of_stateletmodel_of_update=caller_model_of_updateletmodel_of_error=caller_model_of_errorend)endmoduleRegister(Version_i:sigincludeVersion_sharedvalupdate_of_callee_model:Model.Callee.update->updatevalcaller_model_of_update:update->Model.Caller.updateend)=structincludeCallee.Register(structincludeVersion_iletmodel_of_query=callee_model_of_queryletstate_of_model=state_of_callee_modelletupdate_of_model=update_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_state=caller_model_of_stateletmodel_of_update=caller_model_of_updateletmodel_of_error=caller_model_of_errorend)endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multiletversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmoduleOne_way=structmoduletypeS=sigtypecaller_msgtypecallee_msgvaldispatch_multi:Connection_with_menu.t->caller_msg->unitOr_error.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_msg->unit)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypemsgendmoduleCallee:sigtypemsgendend)=structopenModelletname=namemoduleCaller=Caller_converts.One_way.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.One_way.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduleRegister(Version:sigopenModelvalversion:inttypemsg[@@derivingbin_io]valmsg_of_caller_model:Caller.msg->msgvalcallee_model_of_msg:msg->Callee.msgend)=structincludeCallee.Register(structincludeVersionletmodel_of_msg=callee_model_of_msgend)includeCaller.Register(structincludeVersionletmsg_of_model=msg_of_caller_modelend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multi(* Note: Caller.versions is the same as Callee.versions, so it doesn't matter which
one we call here. Same for [rpcs]. *)letversions()=Caller.versions()letrpcs()=Caller.rpcs()endendend