12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523openCore_kernelopenAsync_kernelopenRpcmoduleVersioned_direct_stream_writer=structmoduleDirect_stream_writer=Pipe_rpc.Direct_stream_writertype'inputt=|T:{convert:('input->'output);writer:'outputDirect_stream_writer.t}->'inputtletcreate~convert~writer=T{convert;writer}letwrite(T{convert;writer})input=Direct_stream_writer.writewriter(convertinput)letwrite_without_pushback(T{convert;writer})input=Direct_stream_writer.write_without_pushbackwriter(convertinput)letclose(T{convert=_;writer})=Direct_stream_writer.closewriterletis_closed(T{convert=_;writer})=Direct_stream_writer.is_closedwriterletclosed(T{convert=_;writer})=Direct_stream_writer.closedwriterendletfailed_conversionx=Error.create"type conversion failure"x[%sexp_of:[`Msg|`Query|`Response|`Error|`State|`Update]*[`Rpcofstring]*[`Versionofint]*exn]letmultiple_registrationsx=Error.create"multiple rpc registrations"x[%sexp_of:[`Rpcofstring]*[`Versionofint]]letunknown_versionx=Error.create"unknown rpc version"x[%sexp_of:string*int]moduleCallee_converts=structmoduleRpc=structmoduleSimple=structtype('query,'response)adapter={adapt:'state.('state->'query->'responseDeferred.t)->'stateImplementation.t}type('query,'response)t={name:string;adapters:('query,'response)adapterInt.Map.t}[@@derivingfields]letcreate~name={name;adapters=Int.Map.empty}letwrap_errorfn=(funstatequery->fnstatequery>>|function|Okvalue->Okvalue|Errorerror->Error(Error.to_string_humerror))letadd{name;adapters}rpcadapter=ifString.(<>)name(Rpc.namerpc)thenOr_error.error"Rpc names don't agree"(name,Rpc.namerpc)[%sexp_of:string*string]elseletversion=Rpc.versionrpcinmatchMap.findadaptersversionwith|Some_->Or_error.error"Version already exists"(name,version)[%sexp_of:string*int]|None->letadapters=Map.setadapters~key:version~data:adapterinOk{name;adapters}letadd_rpc_versiontold_rpcupgradedowngrade=letadaptfn=letadaptedstateold_query=fnstate(upgradeold_query)>>|funresult->downgraderesultinRpc.implementold_rpcadaptedinaddtold_rpc{adapt}letadd_rpc_version_with_failuretold_rpcupgrade_or_errordowngrade_or_error=letadaptfn=letadaptedstateold_query=letopenDeferred.Result.Monad_infixinreturn(upgrade_or_errorold_query)>>=funquery->fnstatequery>>=funresponse->return(downgrade_or_errorresponse)inRpc.implementold_rpc(wrap_erroradapted)inaddtold_rpc{adapt}letadd_versiont~version~bin_query~bin_responseupgradedowngrade=letrpc=Rpc.create~name:t.name~version~bin_query~bin_responseinadd_rpc_versiontrpcupgradedowngradeletadd_version_with_failuret~version~bin_query~bin_responseupgradedowngrade=letrpc=Rpc.create~name:t.name~version~bin_query~bin_responseinadd_rpc_version_with_failuretrpcupgradedowngradeletimplementtfn=Map.datat.adapters|>List.map~f:(fun{adapt}->adaptfn)endmoduletypeS=sigtypequerytyperesponsevalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->responseDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponseend)=structletname=Model.nametype'simpl='s->version:int->Model.query->Model.responseDeferred.ttypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry:(int,implementer*Any.t)Hashtbl.t=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_rpc)->i.implement~log_versionf)letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Hashtbl.keysregistry)moduleRegister(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalresponse_of_model:Model.response->responseend)=structopenVersion_iletrpc=Rpc.create~name~version~bin_query~bin_responselet()=letimplement~log_versionf=Rpc.implementrpc(funsq->log_versionversion;matchResult.try_with(fun()->Version_i.model_of_queryq)with|Errorexn->Error.raise(failed_conversion(`Query,`Rpcname,`Versionversion,exn))|Okq->fs~versionq>>|funr->matchResult.try_with(fun()->Version_i.response_of_modelr)with|Okr->r|Errorexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Rpcrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endendendmodulePipe_rpc=structmoduletypeS=sigtypequerytyperesponsetypeerrorvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->(responsePipe.Reader.t,error)Result.tDeferred.t)->'stateImplementation.tlistvalimplement_direct_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->responseVersioned_direct_stream_writer.t->(unit,error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponsetypeerrorend)=structletname=Model.nametype'simpl=|Pipeof('s->version:int->Model.query->(Model.responsePipe.Reader.t,Model.error)Result.tDeferred.t)|Directof('s->version:int->Model.query->Model.responseVersioned_direct_stream_writer.t->(unit,Model.error)Result.tDeferred.t)typeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry=Int.Table.create~size:1()letimplement_multi_gen?log_not_previously_seen_versionimpl=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_)->i.implement~log_versionimpl)letimplement_multi?log_not_previously_seen_versionf=implement_multi_gen?log_not_previously_seen_version(Pipef)letimplement_direct_multi?log_not_previously_seen_versionf=implement_multi_gen?log_not_previously_seen_version(Directf)letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalerror_of_model:Model.error->errorvalclient_pushes_back:boolendmoduleMake_shared(Version_i:Version_shared)(Convert:sigvalconvert_elt:(Model.response->Version_i.response)Or_error.tvalconvert_pipe:Model.responsePipe.Reader.t->Version_i.responsePipe.Reader.tend)=structopenVersion_iopenConvertletrpc=Pipe_rpc.create~name~version~bin_query~bin_response~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())()letwrapped_model_of_queryq=matchVersion_i.model_of_queryqwith|exceptionexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))|q->qletwrapped_error_of_modelerror=matchVersion_i.error_of_modelerrorwith|error->Errorerror|exceptionexn->Error.raise(failed_conversion(`Error,`Rpcname,`Versionversion,exn))letimplement~log_versionimpl=matchimplwith|Pipef->Pipe_rpc.implementrpc(funsq->log_versionversion;fs~version(wrapped_model_of_queryq)>>=function|Okpipe->Monitor.handle_errors(fun()->return(Ok(convert_pipepipe)))(funexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))|Errorerror->return(wrapped_error_of_modelerror))|Directf->letconvert_elt=Or_error.ok_exnconvert_eltinPipe_rpc.implement_directrpc(funsqdsw->letwriter=Versioned_direct_stream_writer.create~convert:convert_elt~writer:dswinfs~version(wrapped_model_of_queryq)writer>>|function|Ok()->Ok()|Errorerror->wrapped_error_of_modelerror)let()=matchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Piperpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalresponse_of_model:Model.responsePipe.Reader.t->responsePipe.Reader.tend)=Make_shared(Version_i)(structletconvert_elt=Or_error.error_string"cannot use direct interface with Register_raw"letconvert_pipe=Version_i.response_of_modelend)moduleRegister(Version_i:sigincludeVersion_sharedvalresponse_of_model:Model.response->responseend)=Make_shared(Version_i)(structletconvert_elt=OkVersion_i.response_of_modelletconvert_pipepipe=Pipe.mappipe~f:Version_i.response_of_modelend)endendmoduleState_rpc=structmoduletypeS=sigtypequerytypestatetypeupdatetypeerrorvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('connection_state->version:int->query->(state*updatePipe.Reader.t,error)Result.tDeferred.t)->'connection_stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytypestatetypeupdatetypeerrorend)=structletname=Model.nametype'simpl='s->version:int->Model.query->(Model.state*Model.updatePipe.Reader.t,Model.error)Result.tDeferred.ttypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_)->i.implement~log_versionf)letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalstate_of_model:Model.state->statevalerror_of_model:Model.error->errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalupdate_of_model:Model.state->Model.updatePipe.Reader.t->updatePipe.Reader.tend)=structopenVersion_iletrpc=State_rpc.create~name~version~bin_query~bin_state~bin_update~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())()let()=letimplement~log_versionf=State_rpc.implementrpc(funsq->log_versionversion;matchVersion_i.model_of_queryqwith|exceptionexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))|q->fs~versionq>>=function|Ok(model_state,pipe)->letstate=matchVersion_i.state_of_modelmodel_statewith|state->state|exceptionexn->Error.raise(failed_conversion(`State,`Rpcname,`Versionversion,exn))inMonitor.handle_errors(fun()->return(Ok(state,Version_i.update_of_modelmodel_statepipe)))(funexn->Error.raise(failed_conversion(`Update,`Rpcname,`Versionversion,exn)))|Errorerror->return(matchVersion_i.error_of_modelerrorwith|error->Errorerror|exceptionexn->Error.raise(failed_conversion(`Error,`Rpcname,`Versionversion,exn))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Staterpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endmoduleRegister(Version_i:sigincludeVersion_sharedvalupdate_of_model:Model.update->updateend)=structincludeRegister_raw(structincludeVersion_iletupdate_of_model_statepipe=Pipe.map~f:update_of_modelpipeend)endendendmoduleOne_way=structmoduletypeS=sigtypemsgvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->msg->unit)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypemsgend)=structletname=Model.nametype'simpl='s->version:int->Model.msg->unittypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t;}letregistry:(int,implementer*Any.t)Hashtbl.t=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_rpc)->i.implement~log_versionf)letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Hashtbl.keysregistry)moduleRegister(Version_i:sigtypemsg[@@derivingbin_io]valversion:intvalmodel_of_msg:msg->Model.msgend)=structopenVersion_iletrpc=One_way.create~name~version~bin_msglet()=letimplement~log_versionf=One_way.implementrpc(funsq->log_versionversion;matchResult.try_with(fun()->Version_i.model_of_msgq)with|Errorexn->Error.raise(failed_conversion(`Msg,`Rpcname,`Versionversion,exn))|Okq->fs~versionq)inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.One_wayrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endendendendmoduleMenu=struct(***************** some prohibitions for this module ******************
(1) !!! never prune old versions of this rpc !!!
It is too fundamental to the workings of various versioning
schemes and it probably won't change very much anyway.
(2) !!! only ever say "with bin_io" on built-in ocaml types !!!
Examples of built-in types are int, list, string, etc.
This is to protect ourselves against changes to Core data
structures, for example.
*********************************************************************)moduleModel=structletname="__Versioned_rpc.Menu"typequery=unittyperesponse=Description.tlistendincludeCallee_converts.Rpc.Make(Model)letrpc_name=Model.namemoduleV1=structmoduleT=structletversion=1typequery=unit[@@derivingbin_io]typeresponse=(string*int)list[@@derivingbin_io]letmodel_of_queryq=qletresponse_of_model=List.map~f:(fun{Description.name;version}->(name,version))endincludeTincludeRegister(T)endmoduleCurrent_version=V1letaddimpls=letmenu=List.mapimpls~f:Implementation.descriptioninletmenu_impls=implement_multi(fun_~version:_()->returnmenu)inimpls@menu_implstypet=Int.Set.tString.Table.t[@@derivingsexp_of]letsupported_rpcst=letopenList.Monad_infixinString.Table.to_alistt>>=fun(name,versions)->Int.Set.to_listversions>>|funversion->{Description.name;version}letsupported_versionst~rpc_name=Option.value~default:Int.Set.empty(Hashtbl.findtrpc_name)letof_entriesentries=Hashtbl.map~f:Int.Set.of_list(String.Table.of_alist_multientries)letrequestconn=Rpc.dispatchCurrent_version.rpcconn()>>|funresult->Result.mapresult~f:of_entriesletcreatedescriptions=List.mapdescriptions~f:(fun{Description.name;version}->(name,version))|>of_entriesendmoduleConnection_with_menu=structtypet={connection:Connection.t;menu:Menu.t}[@@derivingfields]letcreateconnection=letopenDeferred.Or_error.Monad_infixinMenu.requestconnection>>|funmenu->{connection;menu}letcreate_directlyconnectionmenu={connection;menu}endmoduleCaller_converts=structletmost_recent_common_version~rpc_name~caller_versions~callee_versions~callee_menu=matchSet.max_elt(Set.intercallee_versionscaller_versions)with|Someversion->Okversion|None->error_s[%message"caller and callee share no common versions for rpc"(rpc_name:string)(caller_versions:Int.Set.t)(callee_versions:Int.Set.t)(callee_menu:Menu.t)]let%expect_test"highest version number is taken in most_recent_common_version"=letrpc_name="the-rpc"inletmenu=Menu.of_entries[rpc_name,2]inletresult=most_recent_common_version~rpc_name~caller_versions:(Int.Set.of_list[1;2;3])~callee_versions:(Int.Set.of_list[2])~callee_menu:menuinprint_s[%sexp(result:intOr_error.t)];[%expect{| (Ok 2) |}];;let%expect_test"error from most_recent_common_version looks reasonable"=letthe_rpc="the-rpc"inletnot_the_rpc="other-rpc"inletmenu=Menu.of_entries[not_the_rpc,1;not_the_rpc,2]inletresult=most_recent_common_version~rpc_name:the_rpc~caller_versions:(Int.Set.of_list[1;2;3])~callee_versions:(Menu.supported_versionsmenu~rpc_name:the_rpc)~callee_menu:menuinprint_s[%sexp(result:intOr_error.t)];[%expect{|
(Error
("caller and callee share no common versions for rpc" (rpc_name the-rpc)
(caller_versions (1 2 3)) (callee_versions ())
(callee_menu ((other-rpc (1 2))))))|}];;moduleDispatch=structmoduleMake(M:Monad)=structopenMletwith_specific_version~version~connection~name~query~dispatcher~registry=matchHashtbl.findregistryversionwith|None->return(Error(unknown_version(name,version)))|Some(dispatch,_rpc)->dispatcherdispatchconnectionqueryletwith_version_menu{Connection_with_menu.connection;menu}query~name~versions~registry~dispatcher=letcallee_versions=Menu.supported_versionsmenu~rpc_name:nameinletcaller_versions=versions()inmatchmost_recent_common_version~rpc_name:name~caller_versions~callee_versions~callee_menu:menuwith|Errore->return(Errore)|Okversion->with_specific_version~version~connection~name~query~registry~dispatcherendmoduleAsync=Make(Deferred)moduleDirect=Make(Monad.Ident)endmoduleRpc=structmoduletypeS=sigtypequerytyperesponsevaldispatch_multi:Connection_with_menu.t->query->responseOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponseend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:Fn.idmoduleRegister'(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_response:Model.query->response->Model.responseend)=structopenVersion_iletrpc=Rpc.create~name~version~bin_query~bin_responselet()=letdispatchconnmq=matchResult.try_with(fun()->Version_i.query_of_modelmq)with|Errorexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|Okq->Rpc.dispatchrpcconnq>>|funresult->Result.bindresult~f:(funr->matchResult.try_with(fun()->Version_i.model_of_responsemqr)with|Okr->Okr|Errorexn->Error(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.Rpcrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endmoduleRegister(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_response:response->Model.responseend)=Register'(structincludeVersion_iletmodel_of_response_r=model_of_responserend)endendmodulePipe_rpc=structmoduletypeS=sigtypequerytyperesponsetypeerrorvaldispatch_multi:Connection_with_menu.t->query->(responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,error)Result.tOr_error.tDeferred.tvaldispatch_iter_multi:Connection_with_menu.t->query->f:(responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,error)Result.tOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponsetypeerrorend)=structtypedispatcher={dispatch:Connection.t->Model.query->(Model.responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,Model.error)Result.tOr_error.tDeferred.t;dispatch_iter:Connection.t->Model.query->f:(Model.responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,Model.error)Result.tOr_error.tDeferred.t}letname=Model.nameletregistry:(dispatcher*Any.t)Int.Table.t=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_iter_multiconn_with_menuquery~f=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:(fun{dispatch_iter;_}connquery->dispatch_iterconnquery~f)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:(fun{dispatch;_}connquery->dispatchconnquery)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_error:error->Model.errorvalclient_pushes_back:boolendmoduleMake_shared(Version_i:Version_shared)(Convert:sigvalconvert_elt:(Version_i.response->Model.response)Or_error.tvalconvert_pipe:Version_i.responsePipe.Reader.t->Model.responseOr_error.tPipe.Reader.tend)=structopenVersion_iopenConvertletrpc=Pipe_rpc.create~name~version~bin_query~bin_response~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())()letwrapped_query_of_modelq=matchVersion_i.query_of_modelqwith|exceptionexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|q->return(Okq)letconvert_resultresult~convert_ok=matchresultwith|Error_ase->e|Ok(Errore)->(matchVersion_i.model_of_errorewith|e'->Ok(Errore')|exceptionexn->Error(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))|Ok(Okok)->Ok(Ok(convert_okok))letdispatchconnq=wrapped_query_of_modelq>>=?funq->Pipe_rpc.dispatchrpcconnq>>|funresult->convert_resultresult~convert_ok:(fun(pipe,id)->(convert_pipepipe,id))letdispatch_iterconnq~f=letconvert_elt=Or_error.ok_exnconvert_eltinwrapped_query_of_modelq>>=?funq->letconvert_message(m:_Pipe_rpc.Pipe_message.t)=matchmwith|Closed_asclosed->closed|Updateu->Update(convert_eltu)inPipe_rpc.dispatch_iterrpcconnq~f:(funmessage->f(convert_messagemessage))>>|funresult->convert_resultresult~convert_ok:Fn.idlet()=matchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({dispatch;dispatch_iter},Any.Piperpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalmodel_of_response:responsePipe.Reader.t->Model.responseOr_error.tPipe.Reader.tend)=Make_shared(Version_i)(structletconvert_elt=Or_error.error_string"Cannot use Direct with Register_raw"letconvert_pipe=Version_i.model_of_responseend)moduleRegister(Version_i:sigincludeVersion_sharedvalmodel_of_response:response->Model.responseend)=Make_shared(Version_i)(structletconvert_elt=OkVersion_i.model_of_responseletconvert_pipers=Pipe.maprs~f:(funr->matchVersion_i.model_of_responserwith|r->Okr|exceptionexn->Error(failed_conversion(`Response,`Rpcname,`VersionVersion_i.version,exn)))end)endendmoduleState_rpc=structmoduletypeS=sigtypequerytypestatetypeupdatetypeerrorvaldispatch_multi:Connection_with_menu.t->query->(state*updateOr_error.tPipe.Reader.t*State_rpc.Metadata.t,error)Result.tOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytypestatetypeupdatetypeerrorend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:Fn.idmoduletypeVersion_shared=sigtypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_state:state->Model.statevalmodel_of_error:error->Model.errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalmodel_of_update:updatePipe.Reader.t->Model.updateOr_error.tPipe.Reader.tend)=structopenVersion_iletrpc=State_rpc.create~name~version~bin_query~bin_state~bin_update~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())()let()=letdispatchconnq=matchVersion_i.query_of_modelqwith|exceptionexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|q->State_rpc.dispatchrpcconnq>>|funresult->matchresultwith|Errorexn->Errorexn|Ok(Errore)->(matchVersion_i.model_of_errorewith|e'->Ok(Errore')|exceptionexn->Error(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))|Ok(Ok(state,pipe,id))->matchVersion_i.model_of_statestatewith|exceptionexn->Error(failed_conversion(`State,`Rpcname,`Versionversion,exn))|state->Ok(Ok(state,Version_i.model_of_updatepipe,id))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.Staterpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endmoduleRegister(Version_i:sigincludeVersion_sharedvalmodel_of_update:update->Model.updateend)=structincludeRegister_raw(structincludeVersion_iletmodel_of_updaters=Pipe.maprs~f:(funr->matchVersion_i.model_of_updaterwith|r->Okr|exceptionexn->Error(failed_conversion(`Update,`Rpcname,`Versionversion,exn)))end)endendendmoduleOne_way=structmoduletypeS=sigtypemsgvaldispatch_multi:Connection_with_menu.t->msg->unitOr_error.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypemsgend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menumsg=Dispatch.Direct.with_version_menuconn_with_menumsg~name~versions~registry~dispatcher:Fn.idmoduleRegister(Version_i:sigtypemsg[@@derivingbin_io]valversion:intvalmsg_of_model:Model.msg->msgend)=structopenVersion_iletrpc=One_way.create~name~version~bin_msglet()=letdispatchconnq=matchResult.try_with(fun()->Version_i.msg_of_modelq)with|Errorexn->Error(failed_conversion(`Msg,`Rpcname,`Versionversion,exn))|Okq->One_way.dispatchrpcconnqinmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.One_wayrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion))endendendendmoduleBoth_convert=structmodulePlain=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_responsetypecallee_responsevaldispatch_multi:Connection_with_menu.t->caller_query->caller_responseOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->callee_responseDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytyperesponseendmoduleCallee:sigtypequerytyperesponseendend)=structopenModelletname=namemoduleCaller=Caller_converts.Rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.Rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduleRegister(Version:sigopenModelvalversion:inttypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valquery_of_caller_model:Caller.query->queryvalcallee_model_of_query:query->Callee.queryvalresponse_of_callee_model:Callee.response->responsevalcaller_model_of_response:response->Caller.responseend)=structincludeCallee.Register(structincludeVersionletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelend)includeCaller.Register(structincludeVersionletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multi(* Note: Caller.versions is the same as Callee.versions, so it doesn't matter which
one we call here. Same for [rpcs]. *)letversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmodulePipe_rpc=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_responsetypecallee_responsetypecaller_errortypecallee_errorvaldispatch_multi:Connection_with_menu.t->caller_query->(caller_responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,caller_error)Result.tOr_error.tDeferred.tvaldispatch_iter_multi:Connection_with_menu.t->caller_query->f:(caller_responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,caller_error)Result.tOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->(callee_responsePipe.Reader.t,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalimplement_direct_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->callee_responseVersioned_direct_stream_writer.t->(unit,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytyperesponsetypeerrorendmoduleCallee:sigtypequerytyperesponsetypeerrorendend)=structopenModelletname=namemoduleCaller=Caller_converts.Pipe_rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.Pipe_rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduletypeVersion_shared=sigvalversion:inttypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valquery_of_caller_model:Model.Caller.query->queryvalcallee_model_of_query:query->Model.Callee.queryvalerror_of_callee_model:Model.Callee.error->errorvalcaller_model_of_error:error->Model.Caller.errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalresponse_of_callee_model:Model.Callee.responsePipe.Reader.t->responsePipe.Reader.tvalcaller_model_of_response:responsePipe.Reader.t->Model.Caller.responseOr_error.tPipe.Reader.tend)=structincludeCallee.Register_raw(structincludeVersion_iletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register_raw(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseletmodel_of_error=caller_model_of_errorend)endmoduleRegister(Version_i:sigincludeVersion_sharedvalresponse_of_callee_model:Model.Callee.response->responsevalcaller_model_of_response:response->Model.Caller.responseend)=structincludeCallee.Register(structincludeVersion_iletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseletmodel_of_error=caller_model_of_errorend)endletdispatch_multi=Caller.dispatch_multiletdispatch_iter_multi=Caller.dispatch_iter_multiletimplement_multi=Callee.implement_multiletimplement_direct_multi=Callee.implement_direct_multiletversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmoduleState_rpc=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_statetypecallee_statetypecaller_updatetypecallee_updatetypecaller_errortypecallee_errorvaldispatch_multi:Connection_with_menu.t->caller_query->(caller_state*caller_updateOr_error.tPipe.Reader.t*State_rpc.Metadata.t,caller_error)Result.tOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->(callee_state*callee_updatePipe.Reader.t,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytypestatetypeupdatetypeerrorendmoduleCallee:sigtypequerytypestatetypeupdatetypeerrorendend)=structopenModelletname=namemoduleCaller=Caller_converts.State_rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.State_rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduletypeVersion_shared=sigvalversion:inttypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valquery_of_caller_model:Model.Caller.query->queryvalcallee_model_of_query:query->Model.Callee.queryvalcaller_model_of_state:state->Model.Caller.statevalstate_of_callee_model:Model.Callee.state->statevalcaller_model_of_error:error->Model.Caller.errorvalerror_of_callee_model:Model.Callee.error->errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalcaller_model_of_update:updatePipe.Reader.t->Model.Caller.updateOr_error.tPipe.Reader.tvalupdate_of_callee_model:Model.Callee.state->Model.Callee.updatePipe.Reader.t->updatePipe.Reader.tend)=structincludeCallee.Register_raw(structincludeVersion_iletmodel_of_query=callee_model_of_queryletstate_of_model=state_of_callee_modelletupdate_of_model=update_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register_raw(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_state=caller_model_of_stateletmodel_of_update=caller_model_of_updateletmodel_of_error=caller_model_of_errorend)endmoduleRegister(Version_i:sigincludeVersion_sharedvalupdate_of_callee_model:Model.Callee.update->updatevalcaller_model_of_update:update->Model.Caller.updateend)=structincludeCallee.Register(structincludeVersion_iletmodel_of_query=callee_model_of_queryletstate_of_model=state_of_callee_modelletupdate_of_model=update_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_state=caller_model_of_stateletmodel_of_update=caller_model_of_updateletmodel_of_error=caller_model_of_errorend)endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multiletversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmoduleOne_way=structmoduletypeS=sigtypecaller_msgtypecallee_msgvaldispatch_multi:Connection_with_menu.t->caller_msg->unitOr_error.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_msg->unit)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypemsgendmoduleCallee:sigtypemsgendend)=structopenModelletname=namemoduleCaller=Caller_converts.One_way.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.One_way.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduleRegister(Version:sigopenModelvalversion:inttypemsg[@@derivingbin_io]valmsg_of_caller_model:Caller.msg->msgvalcallee_model_of_msg:msg->Callee.msgend)=structincludeCallee.Register(structincludeVersionletmodel_of_msg=callee_model_of_msgend)includeCaller.Register(structincludeVersionletmsg_of_model=msg_of_caller_modelend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multi(* Note: Caller.versions is the same as Callee.versions, so it doesn't matter which
one we call here. Same for [rpcs]. *)letversions()=Caller.versions()letrpcs()=Caller.rpcs()endendend