123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546openImportopenTypesmoduleVersion_error=structtypet={payload:Csexp.toption;message:string}letpayloadt=t.payloadletmessaget=t.messageletto_dyn{payload;message}=Dyn.record["message",Dyn.stringmessage;"payload",Dyn.(optionSexp.to_dyn)payload];;letcreate?payload~message()={payload;message}exceptionEoftlet()=Printexc.register_printer(function|E{payload;message}->Some(letmessages=matchpayloadwith|None->[]|Somepayload->[Sexp.pppayload]inFormat.asprintf"%a@."Pp.to_fmt@@Pp.concat@@(Pp.textf"Version_error: %s"message::messages))|_->None);;letto_response_error{payload;message}=Response.Error.create~kind:Invalid_request?payload~message();;endmoduleStaged=structtype('req,'resp)request={encode_req:'req->Call.t;decode_resp:Csexp.t->('resp,Response.Error.t)result}type'payloadnotification={encode:'payload->Call.t}endletraise_version_bug~method_~selected~verb~known=Code_error.raise"bug with version negotiation; selected bad method version"["message",Dyn.String("version is "^verb);"method",Dyn.Stringmethod_;"implemented versions",Dyn.listDyn.intknown;"selected version",Dyn.Intselected];;(* Pack a universal map key. See below. We can afford to erase the type of
the key, because we only care about the keyset of the stored generation
listing. *)typepacked=T:'aMethod.Version.Map.tUniv_map.Key.t->packedmoduletypeS=sigtype'afibermoduleHandler:sigtype'statetvalhandle_request:'statet->'state->Request.t->Response.tfibervalhandle_notification:'statet->'state->Call.t->(unit,Response.Error.t)resultfibervalprepare_request:'at->('req,'resp)Decl.Request.witness->(('req,'resp)Staged.request,Version_error.t)resultvalprepare_notification:'at->'payloadDecl.Notification.witness->('payloadStaged.notification,Version_error.t)resultendmoduleBuilder:sigtype'statetvalto_handler:'statet->session_version:('state->Version.t)->menu:Menu.t->'stateHandler.tvalcreate:unit->'statetvalregistered_procedures:'at->(Method.Name.t*Method.Version.tlist)listvaldeclare_notification:'statet->'payloadDecl.notification->unitvaldeclare_request:'statet->('req,'resp)Decl.request->unitvalimplement_notification:'statet->'payloadDecl.notification->('state->'payload->unitfiber)->unitvalimplement_request:'statet->('req,'resp)Decl.request->('state->'req->'respfiber)->unitendendmoduleMake(Fiber:Fiber_intf.S)=structmoduleHandler=structtype'statet={menu:Menu.t;handle_request:Menu.t->'state->Types.Request.t->Response.tFiber.t;handle_notification:Menu.t->'state->Call.t->(unit,Response.Error.t)resultFiber.t;prepare_request:'req'resp.Menu.t->('req,'resp)Decl.Request.witness->(('req,'resp)Staged.request,Version_error.t)result;prepare_notification:'a.Menu.t->'aDecl.Notification.witness->('aStaged.notification,Version_error.t)result}lethandle_requestt=t.handle_requestt.menulethandle_notificationt=t.handle_notificationt.menuletprepare_requestt=t.prepare_requestt.menuletprepare_notificationt=t.prepare_notificationt.menuend(* TODO: This module involves some convoluted and difficult-to-understand
types, with multiple levels of GADTs and type packing, in the
(possibly-misguided) twin aims of ensuring type safety and maximizing reuse
of the actual generation management code. *)moduleBuilder=structopenDecl(* A [('req, 'resp) Decl.Generation.t] contains the information necessary to
convert from a [Csexp.t] to a ['req]. The [_handler] packings are to
enable storing the callbacks in a homogeneous data structure (namely, the
[Method.Name.Table.t]. It's alright to erase these types, because these
callbacks are intended to be used by the receiving endpoint, which only
sees a [Csexp.t], and we only discover the correct type to deserialize to
at runtime. *)type'sr_handler=|R:('s->'req->'respFiber.t)*('req,'resp)Decl.Generation.t->'sr_handlertype'sn_handler=|N:('s->'payload->unitFiber.t)*('payload,unit)Decl.Generation.t->'sn_handler(* The declarations and implementations serve dual purposes with dual
requirements.
When storing implementations, we erase the type of the callback, because
we cannot know what type to deserialize to until runtime, and so all that
matters is whether some handler with the correct type exists.
On the other hand, declarations must keep some type information in an
externally-retrievable way. This is because when invoking an RPC of type
[('req, 'resp)], we are *given* a value of type ['req], so the object
being stored in the map cannot have its type erased. Instead, we use a
[Univ_map] (with the key being stored in the [Decl.t]) so we can retrieve
a correctly-typed [Generation.t] mapping later.
However, unlike a string table, the use of a [Univ_map.t] means that we
cannot examine the map alone to get a list of all declared procedures and
versions. This is bad, because we need that information to perform
version negotiation for the session. To resolve this, we also keep a
mapping of all known keys and their associated method names, which we use
to construct the initial version menu, then discard. *)type'statet={mutabledeclared_requests:packedlistMethod.Name.Map.t*Univ_map.t;mutabledeclared_notifications:packedlistMethod.Name.Map.t*Univ_map.t;implemented_requests:'stater_handlerMethod.Version.Map.tMethod.Name.Table.t;implemented_notifications:'staten_handlerMethod.Version.Map.tMethod.Name.Table.t}(* A [('state, 'key, 'output) field_witness] is a first-class representation
of a field of a ['state t]. Each field is morally a mutable table holding
['output Method.Version.Map.t]s (mapping generation numbers to
['output]s), indexed by ['key]s.
The mental model isn't strictly correct (mostly due to needing the "all
known registered keys" hack described above), but is accurate enough that
the types of [get] and [set] below should become readable.
By doing things this way, we can abstract away the logic of
- Checking the corresponding registry (the declarations table when
implementing, and vice versa) for duplicate entries
- Checking the provided generation listings for overlap
and
- Looking up a method name and generation number
from the type-erasure implementation shenanigans described above, letting
all related operations (declaring, implementing, dispatching) share
uniform implementations as much as possible. *)type(_,_,_)field_witness=|Declared_requests:(_,Method.Name.t*('req,'resp)Decl.Generation.tMethod.Version.Map.tUniv_map.Key.t,('req,'resp)Decl.Generation.t)field_witness|Declared_notifs:(_,Method.Name.t*('a,unit)Decl.Generation.tMethod.Version.Map.tUniv_map.Key.t,('a,unit)Decl.Generation.t)field_witness|Impl_requests:('state,string,'stater_handler)field_witness|Impl_notifs:('state,string,'staten_handler)field_witnessletget(typestab)(t:stt)(witness:(st,a,b)field_witness)(key:a):bMethod.Version.Map.toption=matchwitnesswith|Declared_requests->let_,key=keyinlet_,table=t.declared_requestsinUniv_map.findtablekey|Declared_notifs->let_,key=keyinlet_,table=t.declared_notificationsinUniv_map.findtablekey|Impl_requests->Method.Name.Table.findt.implemented_requestskey|Impl_notifs->Method.Name.Table.findt.implemented_notificationskey;;letset(typestab)(t:stt)(witness:(st,a,b)field_witness)(key:a)(value:bMethod.Version.Map.t)=matchwitnesswith|Declared_requests->letname,key=keyinletknown_keys,table=t.declared_requestsint.declared_requests<-Method.Name.Map.add_multiknown_keysname(Tkey),Univ_map.settablekeyvalue|Declared_notifs->letname,key=keyinletknown_keys,table=t.declared_notificationsint.declared_notifications<-Method.Name.Map.add_multiknown_keysname(Tkey),Univ_map.settablekeyvalue|Impl_requests->Method.Name.Table.sett.implemented_requestskeyvalue|Impl_notifs->Method.Name.Table.sett.implemented_notificationskeyvalue;;letregistered_procedures{declared_requests=declared_request_keys,declared_request_table;declared_notifications=declared_notification_keys,declared_notification_table;implemented_requests;implemented_notifications}=letbatch_declarationswhichdeclared_keysdeclaration_table=Method.Name.Map.foldideclared_keys~init:[]~f:(funnamekeysacc->letgenerations=List.fold_leftkeys~init:[]~f:(funacc(Tkey)->matchUniv_map.finddeclaration_tablekeywith|Somelisting->Method.Version.Map.keyslisting@acc|None->Code_error.raise"versioning: method found in versioning table without actually being \
declared"["method_",Dyn.Stringname;"table",Dyn.String("known_"^which^"_table")])in(name,generations)::acc)inletdeclared_requests=batch_declarations"request"declared_request_keysdeclared_request_tableinletdeclared_notifications=batch_declarations"notification"declared_notification_keysdeclared_notification_tableinletbatch_implementationstable=Method.Name.Table.folditable~init:[]~f:(funnamelistingacc->(name,Method.Version.Map.keyslisting)::acc)inletimplemented_requests=batch_implementationsimplemented_requestsinletimplemented_notifications=batch_implementationsimplemented_notificationsinList.concat[declared_requests;declared_notifications;implemented_requests;implemented_notifications];;letcreate()=letdeclared_requests=Method.Name.Map.empty,Univ_map.emptyinletdeclared_notifications=Method.Name.Map.empty,Univ_map.emptyinletimplemented_requests=Method.Name.Table.create16inletimplemented_notifications=Method.Name.Table.create16in{declared_requests;declared_notifications;implemented_requests;implemented_notifications};;letregister_generict~method_~generations~registry~registry_key~other~other_key~pack=let()=gettotherother_key|>Option.iter~f:(fun_->Code_error.raise"attempted to implement and declare method"["method",Dyn.Stringmethod_])inletprior_registered_generations=gettregistryregistry_key|>Option.value~default:Method.Version.Map.emptyinletall_generations,duplicate_generations=List.fold_leftgenerations~init:(prior_registered_generations,Method.Version.Set.empty)~f:(fun(acc,dups)(n,gen)->matchMethod.Version.Map.addaccn(packgen)with|Error_->acc,Method.Version.Set.adddupsn|Okacc'->acc',dups)inifMethod.Version.Set.is_emptyduplicate_generationsthensettregistryregistry_keyall_generationselseCode_error.raise"attempted to register duplicate generations for RPC method"["method",Dyn.Stringmethod_;"duplicated",Method.Version.Set.to_dynduplicate_generations];;letdeclare_requesttproc=register_generict~method_:proc.Request.decl.method_~generations:proc.Request.generations~registry:Declared_requests~other:Impl_requests~registry_key:(proc.Request.decl.method_,proc.decl.key)~other_key:proc.Request.decl.method_~pack:Fun.id;;letdeclare_notificationt(proc:_notification)=register_generict~method_:proc.decl.method_~generations:proc.generations~registry:Declared_notifs~other:Impl_notifs~registry_key:(proc.decl.method_,proc.decl.key)~other_key:proc.decl.method_~pack:Fun.id;;letimplement_requestt(proc:_request)f=register_generict~method_:proc.decl.method_~generations:proc.generations~registry:Impl_requests~other:Declared_requests~registry_key:proc.decl.method_~other_key:(proc.decl.method_,proc.decl.key)~pack:(funr->R(f,r));;letimplement_notificationt(proc:_notification)f=register_generict~method_:proc.decl.method_~generations:proc.generations~registry:Impl_notifs~other:Declared_notifs~registry_key:proc.decl.method_~other_key:(proc.decl.method_,proc.decl.key)~pack:(funn->N(f,n));;letlookup_method_generict~menu~table~key~method_ks=matchgetttablekey,Menu.findmenumethod_with|Somesubtable,Someversion->s(subtable,version)|None,_->letpayload=Sexp.record["method",Atommethod_]ink(Version_error.create~message:"invalid method"~payload())|_,None->letpayload=Sexp.record["method",Atommethod_]ink(Version_error.create~message:"remote and local have no common version for method"~payload());;letto_handlert~session_version=letopenFiber.Oinlethandle_requestmenustate(_id,(n:Call.t))=lookup_method_generict~menu~table:Impl_requests~key:n.method_~method_:n.method_(fune->Fiber.return(Error(Version_error.to_response_errore)))(fun(handlers,version)->matchMethod.Version.Map.findhandlersversionwith|None->raise_version_bug~method_:n.method_~selected:version~verb:"unimplemented"~known:(Method.Version.Map.keyshandlers)|Some(R(f,Tgen))->(matchConv.of_sexpgen.req~version:(session_versionstate)n.paramswith|Errore->Fiber.return(Error(Response.Error.of_conve))|Okreq->let+resp=fstate(gen.upgrade_reqreq)inOk(Conv.to_sexpgen.resp(gen.downgrade_respresp))))inlethandle_notificationmenustate(n:Call.t)=lookup_method_generict~menu~table:Impl_notifs~key:n.method_~method_:n.method_(fune->Fiber.return(Error(Version_error.to_response_errore)))(fun(handlers,version)->matchMethod.Version.Map.findhandlersversionwith|None->raise_version_bug~method_:n.method_~selected:version~verb:"unimplemented"~known:(Method.Version.Map.keyshandlers)|Some(N(f,Tgen))->(matchConv.of_sexpgen.req~version:(session_versionstate)n.paramswith|Errore->Fiber.return(Error(Response.Error.of_conve))|Okreq->let+()=fstate(gen.upgrade_reqreq)inOk()))inletprepare_request(typeab)menu(decl:(a,b)Decl.Request.witness):((a,b)Staged.request,Version_error.t)result=letmethod_=decl.method_inlookup_method_generict~menu~table:Declared_requests~key:(method_,decl.key)~method_(fune->Errore)(fun(decls,version)->matchMethod.Version.Map.finddeclsversionwith|None->raise_version_bug~method_~selected:version~verb:"undeclared"~known:(Method.Version.Map.keysdecls)|Some(Tgen)->letencode_req(req:a)={Call.method_;params=Conv.to_sexpgen.req(gen.downgrade_reqreq)}inletdecode_respsexp=matchConv.of_sexpgen.resp~version:(3,0)sexpwith|Okresp->Ok(gen.upgrade_respresp)|Errore->Error(Response.Error.of_conve)inOk{Staged.encode_req;decode_resp})inletprepare_notification(typea)menu(decl:aDecl.Notification.witness):(aStaged.notification,Version_error.t)result=letmethod_=decl.method_inlookup_method_generict~menu~table:Declared_notifs~key:(method_,decl.key)~method_(fune->Errore)(fun(decls,version)->matchMethod.Version.Map.finddeclsversionwith|None->raise_version_bug~method_~selected:version~verb:"undeclared"~known:(Method.Version.Map.keysdecls)|Some(Tgen)->letencode(req:a)={Call.method_;params=Conv.to_sexpgen.req(gen.downgrade_reqreq)}inOk{Staged.encode})infun~menu->{Handler.menu;handle_request;handle_notification;prepare_request;prepare_notification};;endend