123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767openCoreopenAsync_kernelopenRpcmoduleVersioned_direct_stream_writer=structmoduleDirect_stream_writer=Pipe_rpc.Direct_stream_writertype'inputt=|T:{convert:'input->'output;writer:'outputDirect_stream_writer.t}->'inputtletcreate~convert~writer=T{convert;writer}letwrite(T{convert;writer})input=Direct_stream_writer.writewriter(convertinput);;letwrite_without_pushback(T{convert;writer})input=Direct_stream_writer.write_without_pushbackwriter(convertinput);;letclose(T{convert=_;writer})=Direct_stream_writer.closewriterletis_closed(T{convert=_;writer})=Direct_stream_writer.is_closedwriterletclosed(T{convert=_;writer})=Direct_stream_writer.closedwriterendletfailed_conversionx=Error.create"type conversion failure"x[%sexp_of:[`Msg|`Query|`Response|`Error|`State|`Update]*[`Rpcofstring]*[`Versionofint]*exn];;letmultiple_registrationsx=Error.create"multiple rpc registrations"x[%sexp_of:[`Rpcofstring]*[`Versionofint]];;letunknown_versionx=Error.create"unknown rpc version"x[%sexp_of:string*int]moduleCallee_converts=structmoduleRpc=structmoduleSimple=structtype('query,'response)adapter={adapt:'state.('state->'query->'responseDeferred.t)->'stateImplementation.t}type('query,'response)t={name:string;adapters:('query,'response)adapterInt.Map.t}[@@derivingfields]letcreate~name={name;adapters=Int.Map.empty}letwrap_errorfnstatequery=match%mapfnstatequerywith|Okvalue->Okvalue|Errorerror->Error(Error.to_string_humerror);;letadd{name;adapters}rpcadapter=ifString.(<>)name(Rpc.namerpc)thenOr_error.error"Rpc names don't agree"(name,Rpc.namerpc)[%sexp_of:string*string]else(letversion=Rpc.versionrpcinmatchMap.findadaptersversionwith|Some_->Or_error.error"Version already exists"(name,version)[%sexp_of:string*int]|None->letadapters=Map.setadapters~key:version~data:adapterinOk{name;adapters});;letadd_rpc_versiontold_rpcupgradedowngrade=letadaptfn=letadaptedstateold_query=let%mapresult=fnstate(upgradeold_query)indowngraderesultinRpc.implementold_rpcadaptedinaddtold_rpc{adapt};;letadd_rpc_version_with_failuretold_rpcupgrade_or_errordowngrade_or_error=letadaptfn=letadaptedstateold_query=letopenDeferred.Result.Let_syntaxinlet%bindquery=Deferred.return(upgrade_or_errorold_query)inlet%bindresponse=fnstatequeryinDeferred.return(downgrade_or_errorresponse)inRpc.implementold_rpc(wrap_erroradapted)inaddtold_rpc{adapt};;letadd_versiont~version~bin_query~bin_responseupgradedowngrade=letrpc=Rpc.create~name:t.name~version~bin_query~bin_responseinadd_rpc_versiontrpcupgradedowngrade;;letadd_version_with_failuret~version~bin_query~bin_responseupgradedowngrade=letrpc=Rpc.create~name:t.name~version~bin_query~bin_responseinadd_rpc_version_with_failuretrpcupgradedowngrade;;letimplementtfn=Map.datat.adapters|>List.map~f:(fun{adapt}->adaptfn)endmoduletypeS=sigtypequerytyperesponsevalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->responseDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponseend)=structletname=Model.nametype'simpl='s->version:int->Model.query->Model.responseDeferred.ttypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry:(int,implementer*Any.t)Hashtbl.t=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_rpc)->i.implement~log_versionf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Hashtbl.keysregistry)moduleRegister(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalresponse_of_model:Model.response->responseend)=structopenVersion_iletrpc=Rpc.create~name~version~bin_query~bin_responselet()=letimplement~log_versionf=Rpc.implementrpc(funsq->log_versionversion;matchResult.try_with(fun()->Version_i.model_of_queryq)with|Errorexn->Error.raise(failed_conversion(`Query,`Rpcname,`Versionversion,exn))|Okq->let%mapr=fs~versionqin(matchResult.try_with(fun()->Version_i.response_of_modelr)with|Okr->r|Errorexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Rpcrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endendendmodulePipe_rpc=structmoduletypeS=sigtypequerytyperesponsetypeerrorvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->(responsePipe.Reader.t,error)Result.tDeferred.t)->'stateImplementation.tlistvalimplement_direct_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->query->responseVersioned_direct_stream_writer.t->(unit,error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponsetypeerrorend)=structletname=Model.nametype'simpl=|Pipeof('s->version:int->Model.query->(Model.responsePipe.Reader.t,Model.error)Result.tDeferred.t)|Directof('s->version:int->Model.query->Model.responseVersioned_direct_stream_writer.t->(unit,Model.error)Result.tDeferred.t)typeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry=Int.Table.create~size:1()letimplement_multi_gen?log_not_previously_seen_versionimpl=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_)->i.implement~log_versionimpl);;letimplement_multi?log_not_previously_seen_versionf=implement_multi_gen?log_not_previously_seen_version(Pipef);;letimplement_direct_multi?log_not_previously_seen_versionf=implement_multi_gen?log_not_previously_seen_version(Directf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalerror_of_model:Model.error->errorvalclient_pushes_back:boolendmoduleMake_shared(Version_i:Version_shared)(Convert:sigvalconvert_elt:(Model.response->Version_i.response)Or_error.tvalconvert_pipe:Model.responsePipe.Reader.t->Version_i.responsePipe.Reader.tend)=structopenVersion_iopenConvertletrpc=Pipe_rpc.create~name~version~bin_query~bin_response~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;letwrapped_model_of_queryq=matchVersion_i.model_of_queryqwith|exceptionexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))|q->q;;letwrapped_error_of_modelerror=matchVersion_i.error_of_modelerrorwith|error->Errorerror|exceptionexn->Error.raise(failed_conversion(`Error,`Rpcname,`Versionversion,exn));;letimplement~log_versionimpl=matchimplwith|Pipef->Pipe_rpc.implementrpc(funsq->log_versionversion;match%bindfs~version(wrapped_model_of_queryq)with|Okpipe->Monitor.handle_errors(fun()->return(Ok(convert_pipepipe)))(funexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))|Errorerror->return(wrapped_error_of_modelerror))|Directf->letconvert_elt=Or_error.ok_exnconvert_eltinPipe_rpc.implement_directrpc(funsqdsw->letwriter=Versioned_direct_stream_writer.create~convert:convert_elt~writer:dswinmatch%mapfs~version(wrapped_model_of_queryq)writerwith|Ok()->Ok()|Errorerror->wrapped_error_of_modelerror);;let()=matchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Piperpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalresponse_of_model:Model.responsePipe.Reader.t->responsePipe.Reader.tend)=Make_shared(Version_i)(structletconvert_elt=Or_error.error_string"cannot use direct interface with Register_raw";;letconvert_pipe=Version_i.response_of_modelend)moduleRegister(Version_i:sigincludeVersion_sharedvalresponse_of_model:Model.response->responseend)=Make_shared(Version_i)(structletconvert_elt=OkVersion_i.response_of_modelletconvert_pipepipe=Pipe.mappipe~f:Version_i.response_of_modelend)endendmoduleState_rpc=structmoduletypeS=sigtypequerytypestatetypeupdatetypeerrorvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('connection_state->version:int->query->(state*updatePipe.Reader.t,error)Result.tDeferred.t)->'connection_stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytypestatetypeupdatetypeerrorend)=structletname=Model.nametype'simpl='s->version:int->Model.query->(Model.state*Model.updatePipe.Reader.t,Model.error)Result.tDeferred.ttypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_)->i.implement~log_versionf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalmodel_of_query:query->Model.queryvalstate_of_model:Model.state->statevalerror_of_model:Model.error->errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalupdate_of_model:Model.state->Model.updatePipe.Reader.t->updatePipe.Reader.tend)=structopenVersion_iletrpc=State_rpc.create~name~version~bin_query~bin_state~bin_update~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;let()=letimplement~log_versionf=State_rpc.implementrpc(funsq->log_versionversion;matchVersion_i.model_of_queryqwith|exceptionexn->Error.raise(failed_conversion(`Response,`Rpcname,`Versionversion,exn))|q->(match%bindfs~versionqwith|Ok(model_state,pipe)->letstate=matchVersion_i.state_of_modelmodel_statewith|state->state|exceptionexn->Error.raise(failed_conversion(`State,`Rpcname,`Versionversion,exn))inMonitor.handle_errors(fun()->return(Ok(state,Version_i.update_of_modelmodel_statepipe)))(funexn->Error.raise(failed_conversion(`Update,`Rpcname,`Versionversion,exn)))|Errorerror->return(matchVersion_i.error_of_modelerrorwith|error->Errorerror|exceptionexn->Error.raise(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.Staterpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister(Version_i:sigincludeVersion_sharedvalupdate_of_model:Model.update->updateend)=structincludeRegister_raw(structincludeVersion_iletupdate_of_model_statepipe=Pipe.map~f:update_of_modelpipeend)endendendmoduleOne_way=structmoduletypeS=sigtypemsgvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->msg->unit)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypemsgend)=structletname=Model.nametype'simpl='s->version:int->Model.msg->unittypeimplementer={implement:'s.log_version:(int->unit)->'simpl->'sImplementation.t}letregistry:(int,implementer*Any.t)Hashtbl.t=Int.Table.create~size:1()letimplement_multi?log_not_previously_seen_versionf=letlog_version=matchlog_not_previously_seen_versionwith|None->ignore(* prevent calling [f] more than once per version *)|Somef->Memo.general(f~name)inList.map(Hashtbl.dataregistry)~f:(fun(i,_rpc)->i.implement~log_versionf);;letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Hashtbl.keysregistry)moduleRegister(Version_i:sigtypemsg[@@derivingbin_io]valversion:intvalmodel_of_msg:msg->Model.msgend)=structopenVersion_iletrpc=One_way.create~name~version~bin_msglet()=letimplement~log_versionf=One_way.implementrpc(funsq->log_versionversion;matchResult.try_with(fun()->Version_i.model_of_msgq)with|Errorexn->Error.raise(failed_conversion(`Msg,`Rpcname,`Versionversion,exn))|Okq->fs~versionq)inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({implement},Any.One_wayrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endendendendmoduleMenu=struct(***************** some prohibitions for this module ******************
(1) !!! never prune old versions of this rpc !!!
It is too fundamental to the workings of various versioning
schemes and it probably won't change very much anyway.
(2) !!! only ever say "with bin_io" on built-in ocaml types !!!
Examples of built-in types are int, list, string, etc.
This is to protect ourselves against changes to Core data
structures, for example.
*********************************************************************)moduleModel=structletname="__Versioned_rpc.Menu"typequery=unittyperesponse=Description.tlistendincludeCallee_converts.Rpc.Make(Model)letrpc_name=Model.namemoduleV1=structmoduleT=structletversion=1typequery=unit[@@derivingbin_io]typeresponse=(string*int)list[@@derivingbin_io]letmodel_of_queryq=qletresponse_of_model=List.map~f:(fun{Description.name;version}->name,version);;endincludeTincludeRegister(T)endmoduleCurrent_version=V1letaddimpls=letmenu=List.mapimpls~f:Implementation.descriptioninletmenu_impls=implement_multi(fun_~version:_()->returnmenu)inimpls@menu_impls;;typet=Int.Set.tString.Table.t[@@derivingsexp_of]letsupported_rpcst=letopenList.Let_syntaxinlet%bindname,versions=String.Table.to_alisttinlet%mapversion=Int.Set.to_listversionsin{Description.name;version};;letsupported_versionst~rpc_name=Option.value~default:Int.Set.empty(Hashtbl.findtrpc_name);;letof_entriesentries=Hashtbl.map~f:Int.Set.of_list(String.Table.of_alist_multientries);;letrequestconn=let%mapresult=Rpc.dispatchCurrent_version.rpcconn()inResult.mapresult~f:of_entries;;letcreatedescriptions=List.mapdescriptions~f:(fun{Description.name;version}->name,version)|>of_entries;;endmoduleConnection_with_menu=structtypet={connection:Connection.t;menu:Menu.t}[@@derivingfields]letcreateconnection=letopenDeferred.Or_error.Let_syntaxinlet%mapmenu=Menu.requestconnectionin{connection;menu};;letcreate_directlyconnectionmenu={connection;menu}endmoduleCaller_converts=structletmost_recent_common_version~rpc_name~caller_versions~callee_versions~callee_menu=matchSet.max_elt(Set.intercallee_versionscaller_versions)with|Someversion->Okversion|None->error_s[%message"caller and callee share no common versions for rpc"(rpc_name:string)(caller_versions:Int.Set.t)(callee_versions:Int.Set.t)(callee_menu:Menu.t)];;let%expect_test"highest version number is taken in most_recent_common_version"=letrpc_name="the-rpc"inletmenu=Menu.of_entries[rpc_name,2]inletresult=most_recent_common_version~rpc_name~caller_versions:(Int.Set.of_list[1;2;3])~callee_versions:(Int.Set.of_list[2])~callee_menu:menuinprint_s[%sexp(result:intOr_error.t)];[%expect{| (Ok 2) |}];;let%expect_test"error from most_recent_common_version looks reasonable"=letthe_rpc="the-rpc"inletnot_the_rpc="other-rpc"inletmenu=Menu.of_entries[not_the_rpc,1;not_the_rpc,2]inletresult=most_recent_common_version~rpc_name:the_rpc~caller_versions:(Int.Set.of_list[1;2;3])~callee_versions:(Menu.supported_versionsmenu~rpc_name:the_rpc)~callee_menu:menuinprint_s[%sexp(result:intOr_error.t)];[%expect{|
(Error
("caller and callee share no common versions for rpc" (rpc_name the-rpc)
(caller_versions (1 2 3)) (callee_versions ())
(callee_menu ((other-rpc (1 2))))))|}];;moduleDispatch=structmoduleMake(M:Monad)=structopenMletwith_specific_version~version~connection~name~query~dispatcher~registry=matchHashtbl.findregistryversionwith|None->return(Error(unknown_version(name,version)))|Some(dispatch,_rpc)->dispatcherdispatchconnectionquery;;letwith_version_menu{Connection_with_menu.connection;menu}query~name~versions~registry~dispatcher=letcallee_versions=Menu.supported_versionsmenu~rpc_name:nameinletcaller_versions=versions()inmatchmost_recent_common_version~rpc_name:name~caller_versions~callee_versions~callee_menu:menuwith|Errore->return(Errore)|Okversion->with_specific_version~version~connection~name~query~registry~dispatcher;;endmoduleAsync=Make(Deferred)moduleDirect=Make(Monad.Ident)endmoduleRpc=structmoduletypeS=sigtypequerytyperesponsevaldispatch_multi:Connection_with_menu.t->query->responseOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponseend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:Fn.id;;moduleRegister'(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_response:Model.query->response->Model.responseend)=structopenVersion_iletrpc=Rpc.create~name~version~bin_query~bin_responselet()=letdispatchconnmq=matchResult.try_with(fun()->Version_i.query_of_modelmq)with|Errorexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|Okq->let%mapresult=Rpc.dispatchrpcconnqinResult.bindresult~f:(funr->matchResult.try_with(fun()->Version_i.model_of_responsemqr)with|Okr->Okr|Errorexn->Error(failed_conversion(`Response,`Rpcname,`Versionversion,exn)))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.Rpcrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister(Version_i:sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_response:response->Model.responseend)=Register'(structincludeVersion_iletmodel_of_response_r=model_of_responserend)endendmodulePipe_rpc=structmoduletypeS=sigtypequerytyperesponsetypeerrorvaldispatch_multi:Connection_with_menu.t->query->(responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,error)Result.tOr_error.tDeferred.tvaldispatch_iter_multi:Connection_with_menu.t->query->f:(responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,error)Result.tOr_error.tDeferred.tvalabort_multi:Connection_with_menu.t->Pipe_rpc.Id.t->unitOr_error.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytyperesponsetypeerrorend)=structtypedispatcher={abort:Connection.t->Pipe_rpc.Id.t->unit;dispatch:Connection.t->Model.query->(Model.responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,Model.error)Result.tOr_error.tDeferred.t;dispatch_iter:Connection.t->Model.query->f:(Model.responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,Model.error)Result.tOr_error.tDeferred.t}letname=Model.nameletregistry:(dispatcher*Any.t)Int.Table.t=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_iter_multiconn_with_menuquery~f=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:(fun{dispatch_iter;_}connquery->dispatch_iterconnquery~f);;letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:(fun{dispatch;_}connquery->dispatchconnquery);;letabort_multiconn_with_menuid=Dispatch.Direct.with_version_menuconn_with_menuid~name~versions~registry~dispatcher:(fun{abort;_}connid->abortconnid;Ok());;moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_error:error->Model.errorvalclient_pushes_back:boolendmoduleMake_shared(Version_i:Version_shared)(Convert:sigvalconvert_elt:(Version_i.response->Model.response)Or_error.tvalconvert_pipe:Version_i.responsePipe.Reader.t->Model.responseOr_error.tPipe.Reader.tend)=structopenVersion_iopenConvertletrpc=Pipe_rpc.create~name~version~bin_query~bin_response~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;letwrapped_query_of_modelq=matchVersion_i.query_of_modelqwith|exceptionexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|q->return(Okq);;letconvert_resultresult~convert_ok=matchresultwith|Error_ase->e|Ok(Errore)->(matchVersion_i.model_of_errorewith|e'->Ok(Errore')|exceptionexn->Error(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))|Ok(Okok)->Ok(Ok(convert_okok));;letdispatchconnq=wrapped_query_of_modelq>>=?funq->let%mapresult=Pipe_rpc.dispatchrpcconnqinconvert_resultresult~convert_ok:(fun(pipe,id)->convert_pipepipe,id);;letdispatch_iterconnq~f=letconvert_elt=Or_error.ok_exnconvert_eltinwrapped_query_of_modelq>>=?funq->letconvert_message(m:_Pipe_rpc.Pipe_message.t)=matchmwith|Closed_asclosed->closed|Updateu->Update(convert_eltu)inlet%mapresult=Pipe_rpc.dispatch_iterrpcconnq~f:(funmessage->f(convert_messagemessage))inconvert_resultresult~convert_ok:Fn.id;;letabortconnid=Pipe_rpc.abortrpcconnidlet()=matchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:({abort;dispatch;dispatch_iter},Any.Piperpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalmodel_of_response:responsePipe.Reader.t->Model.responseOr_error.tPipe.Reader.tend)=Make_shared(Version_i)(structletconvert_elt=Or_error.error_string"Cannot use Direct with Register_raw"letconvert_pipe=Version_i.model_of_responseend)moduleRegister(Version_i:sigincludeVersion_sharedvalmodel_of_response:response->Model.responseend)=Make_shared(Version_i)(structletconvert_elt=OkVersion_i.model_of_responseletconvert_pipers=Pipe.maprs~f:(funr->matchVersion_i.model_of_responserwith|r->Okr|exceptionexn->Error(failed_conversion(`Response,`Rpcname,`VersionVersion_i.version,exn)));;end)endendmoduleState_rpc=structmoduletypeS=sigtypequerytypestatetypeupdatetypeerrorvaldispatch_multi:Connection_with_menu.t->query->(state*updateOr_error.tPipe.Reader.t*State_rpc.Metadata.t,error)Result.tOr_error.tDeferred.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypequerytypestatetypeupdatetypeerrorend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menuquery=Dispatch.Async.with_version_menuconn_with_menuquery~name~versions~registry~dispatcher:Fn.id;;moduletypeVersion_shared=sigtypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valversion:intvalquery_of_model:Model.query->queryvalmodel_of_state:state->Model.statevalmodel_of_error:error->Model.errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalmodel_of_update:updatePipe.Reader.t->Model.updateOr_error.tPipe.Reader.tend)=structopenVersion_iletrpc=State_rpc.create~name~version~bin_query~bin_state~bin_update~bin_error?client_pushes_back:(Option.some_ifclient_pushes_back())();;let()=letdispatchconnq=matchVersion_i.query_of_modelqwith|exceptionexn->return(Error(failed_conversion(`Query,`Rpcname,`Versionversion,exn)))|q->let%mapresult=State_rpc.dispatchrpcconnqin(matchresultwith|Errorexn->Errorexn|Ok(Errore)->(matchVersion_i.model_of_errorewith|e'->Ok(Errore')|exceptionexn->Error(failed_conversion(`Error,`Rpcname,`Versionversion,exn)))|Ok(Ok(state,pipe,id))->(matchVersion_i.model_of_statestatewith|exceptionexn->Error(failed_conversion(`State,`Rpcname,`Versionversion,exn))|state->Ok(Ok(state,Version_i.model_of_updatepipe,id))))inmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.Staterpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endmoduleRegister(Version_i:sigincludeVersion_sharedvalmodel_of_update:update->Model.updateend)=structincludeRegister_raw(structincludeVersion_iletmodel_of_updaters=Pipe.maprs~f:(funr->matchVersion_i.model_of_updaterwith|r->Okr|exceptionexn->Error(failed_conversion(`Update,`Rpcname,`Versionversion,exn)));;end)endendendmoduleOne_way=structmoduletypeS=sigtypemsgvaldispatch_multi:Connection_with_menu.t->msg->unitOr_error.tvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringtypemsgend)=structletname=Model.nameletregistry=Int.Table.create~size:1()letrpcs()=List.map(Hashtbl.dataregistry)~f:(fun(_,rpc)->rpc)letversions()=Int.Set.of_list(Int.Table.keysregistry)letdispatch_multiconn_with_menumsg=Dispatch.Direct.with_version_menuconn_with_menumsg~name~versions~registry~dispatcher:Fn.id;;moduleRegister(Version_i:sigtypemsg[@@derivingbin_io]valversion:intvalmsg_of_model:Model.msg->msgend)=structopenVersion_iletrpc=One_way.create~name~version~bin_msglet()=letdispatchconnq=matchResult.try_with(fun()->Version_i.msg_of_modelq)with|Errorexn->Error(failed_conversion(`Msg,`Rpcname,`Versionversion,exn))|Okq->One_way.dispatchrpcconnqinmatchHashtbl.findregistryversionwith|None->Hashtbl.setregistry~key:version~data:(dispatch,Any.One_wayrpc)|Some_->Error.raise(multiple_registrations(`Rpcname,`Versionversion));;endendendendmoduleBoth_convert=structmodulePlain=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_responsetypecallee_responsevaldispatch_multi:Connection_with_menu.t->caller_query->caller_responseOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->callee_responseDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytyperesponseendmoduleCallee:sigtypequerytyperesponseendend)=structopenModelletname=namemoduleCaller=Caller_converts.Rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.Rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduleRegister(Version:sigopenModelvalversion:inttypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]valquery_of_caller_model:Caller.query->queryvalcallee_model_of_query:query->Callee.queryvalresponse_of_callee_model:Callee.response->responsevalcaller_model_of_response:response->Caller.responseend)=structincludeCallee.Register(structincludeVersionletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelend)includeCaller.Register(structincludeVersionletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multi(* Note: Caller.versions is the same as Callee.versions, so it doesn't matter which
one we call here. Same for [rpcs]. *)letversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmodulePipe_rpc=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_responsetypecallee_responsetypecaller_errortypecallee_errorvaldispatch_multi:Connection_with_menu.t->caller_query->(caller_responseOr_error.tPipe.Reader.t*Pipe_rpc.Metadata.t,caller_error)Result.tOr_error.tDeferred.tvaldispatch_iter_multi:Connection_with_menu.t->caller_query->f:(caller_responsePipe_rpc.Pipe_message.t->Pipe_rpc.Pipe_response.t)->(Pipe_rpc.Id.t,caller_error)Result.tOr_error.tDeferred.tvalabort_multi:Connection_with_menu.t->Pipe_rpc.Id.t->unitOr_error.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->(callee_responsePipe.Reader.t,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalimplement_direct_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->callee_responseVersioned_direct_stream_writer.t->(unit,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytyperesponsetypeerrorendmoduleCallee:sigtypequerytyperesponsetypeerrorendend)=structopenModelletname=namemoduleCaller=Caller_converts.Pipe_rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.Pipe_rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduletypeVersion_shared=sigvalversion:inttypequery[@@derivingbin_io]typeresponse[@@derivingbin_io]typeerror[@@derivingbin_io]valquery_of_caller_model:Model.Caller.query->queryvalcallee_model_of_query:query->Model.Callee.queryvalerror_of_callee_model:Model.Callee.error->errorvalcaller_model_of_error:error->Model.Caller.errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalresponse_of_callee_model:Model.Callee.responsePipe.Reader.t->responsePipe.Reader.tvalcaller_model_of_response:responsePipe.Reader.t->Model.Caller.responseOr_error.tPipe.Reader.tend)=structincludeCallee.Register_raw(structincludeVersion_iletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register_raw(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseletmodel_of_error=caller_model_of_errorend)endmoduleRegister(Version_i:sigincludeVersion_sharedvalresponse_of_callee_model:Model.Callee.response->responsevalcaller_model_of_response:response->Model.Caller.responseend)=structincludeCallee.Register(structincludeVersion_iletmodel_of_query=callee_model_of_queryletresponse_of_model=response_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_response=caller_model_of_responseletmodel_of_error=caller_model_of_errorend)endletdispatch_multi=Caller.dispatch_multiletdispatch_iter_multi=Caller.dispatch_iter_multiletabort_multi=Caller.abort_multiletimplement_multi=Callee.implement_multiletimplement_direct_multi=Callee.implement_direct_multiletversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmoduleState_rpc=structmoduletypeS=sigtypecaller_querytypecallee_querytypecaller_statetypecallee_statetypecaller_updatetypecallee_updatetypecaller_errortypecallee_errorvaldispatch_multi:Connection_with_menu.t->caller_query->(caller_state*caller_updateOr_error.tPipe.Reader.t*State_rpc.Metadata.t,caller_error)Result.tOr_error.tDeferred.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_query->(callee_state*callee_updatePipe.Reader.t,callee_error)Result.tDeferred.t)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypequerytypestatetypeupdatetypeerrorendmoduleCallee:sigtypequerytypestatetypeupdatetypeerrorendend)=structopenModelletname=namemoduleCaller=Caller_converts.State_rpc.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.State_rpc.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduletypeVersion_shared=sigvalversion:inttypequery[@@derivingbin_io]typestate[@@derivingbin_io]typeupdate[@@derivingbin_io]typeerror[@@derivingbin_io]valquery_of_caller_model:Model.Caller.query->queryvalcallee_model_of_query:query->Model.Callee.queryvalcaller_model_of_state:state->Model.Caller.statevalstate_of_callee_model:Model.Callee.state->statevalcaller_model_of_error:error->Model.Caller.errorvalerror_of_callee_model:Model.Callee.error->errorvalclient_pushes_back:boolendmoduleRegister_raw(Version_i:sigincludeVersion_sharedvalcaller_model_of_update:updatePipe.Reader.t->Model.Caller.updateOr_error.tPipe.Reader.tvalupdate_of_callee_model:Model.Callee.state->Model.Callee.updatePipe.Reader.t->updatePipe.Reader.tend)=structincludeCallee.Register_raw(structincludeVersion_iletmodel_of_query=callee_model_of_queryletstate_of_model=state_of_callee_modelletupdate_of_model=update_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register_raw(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_state=caller_model_of_stateletmodel_of_update=caller_model_of_updateletmodel_of_error=caller_model_of_errorend)endmoduleRegister(Version_i:sigincludeVersion_sharedvalupdate_of_callee_model:Model.Callee.update->updatevalcaller_model_of_update:update->Model.Caller.updateend)=structincludeCallee.Register(structincludeVersion_iletmodel_of_query=callee_model_of_queryletstate_of_model=state_of_callee_modelletupdate_of_model=update_of_callee_modelleterror_of_model=error_of_callee_modelend)includeCaller.Register(structincludeVersion_iletquery_of_model=query_of_caller_modelletmodel_of_state=caller_model_of_stateletmodel_of_update=caller_model_of_updateletmodel_of_error=caller_model_of_errorend)endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multiletversions()=Caller.versions()letrpcs()=Caller.rpcs()endendmoduleOne_way=structmoduletypeS=sigtypecaller_msgtypecallee_msgvaldispatch_multi:Connection_with_menu.t->caller_msg->unitOr_error.tvalimplement_multi:?log_not_previously_seen_version:(name:string->int->unit)->('state->version:int->callee_msg->unit)->'stateImplementation.tlistvalrpcs:unit->Any.tlistvalversions:unit->Int.Set.tvalname:stringendmoduleMake(Model:sigvalname:stringmoduleCaller:sigtypemsgendmoduleCallee:sigtypemsgendend)=structopenModelletname=namemoduleCaller=Caller_converts.One_way.Make(structletname=nameincludeCallerend)moduleCallee=Callee_converts.One_way.Make(structletname=nameincludeCalleeend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())moduleRegister(Version:sigopenModelvalversion:inttypemsg[@@derivingbin_io]valmsg_of_caller_model:Caller.msg->msgvalcallee_model_of_msg:msg->Callee.msgend)=structincludeCallee.Register(structincludeVersionletmodel_of_msg=callee_model_of_msgend)includeCaller.Register(structincludeVersionletmsg_of_model=msg_of_caller_modelend)let%test_=Int.Set.equal(Caller.versions())(Callee.versions())endletdispatch_multi=Caller.dispatch_multiletimplement_multi=Callee.implement_multi(* Note: Caller.versions is the same as Callee.versions, so it doesn't matter which
one we call here. Same for [rpcs]. *)letversions()=Caller.versions()letrpcs()=Caller.rpcs()endendend