1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846moduleLog=Debug.LogmoduleIntMap=Map.Make(structtypet=intletcompare(a:int)b=compareabend)letrecfilter_mapf=function|[]->[]|x::xs->matchfxwith|None->filter_mapfxs|Somey->y::filter_mapfxsletpp_checkcheckf(k,v)=trycheckkvwithex->Fmt.pff"@,[%a] %a"Fmt.(styled`Redstring)"ERROR"Debug.pp_exnexletpp_weakf=function|None->Fmt.pff"(GC'd weak pointer)"|Somex->x#ppflethashtbl_dump~keyppfitems=letaddkvresults=(k,v)::resultsinHashtbl.foldadditems[]|>List.sort(funab->compare(key(snda))(key(sndb)))|>Fmt.Dump.listppfmoduleEmbargoId=Message_types.EmbargoIdmoduleMake(EP:Message_types.ENDPOINT)=structmoduleCore_types=EP.Core_typesmoduleWire=Core_types.WiremoduleCap_proxy=Cap_proxy.Make(Core_types)moduleStruct_proxy=Struct_proxy.Make(Core_types)moduleLocal_struct_promise=Local_struct_promise.Make(Core_types)moduleOut=EP.OutmoduleIn=EP.Inletinc_ref=Core_types.inc_refletdec_ref=Core_types.dec_refletwith_inc_refx=Core_types.inc_refx;xopenEP.TablemodulePathSet=Set.Make(Wire.Path)moduleEmbargoes=Table.Allocating(EmbargoId)(* Maps used cap indexes to their paths *)letcaps_used~msgpaths_used=PathSet.elementspaths_used|>filter_map(funpath->matchWire.Response.cap_indexmsgpathwith|None->None|Somei->Some(i,path))|>IntMap.of_listmoduleQuestions=Table.Allocating(QuestionId)moduleAnswers=Table.Tracking(AnswerId)moduleExports=Table.Allocating(ExportId)moduleImports=Table.Tracking(ImportId)moduleImport=struct(* An entry in the imports table has a corresponding switchable, which the user of the library holds.
There are three possible events:
- The user reduces the switchable's ref-count to zero, indicating that
they don't need the import any longer.
- The peer resolves the import to something else (or the connection is
lost, resolving it to an exception).
- The peer quotes the same import ID again.
If we hadn't sent a release before this, it must be for the same object.
If we had sent a release then it might or might not be the same object.
We hold a weak-ref to the switchable so that if the user leaks it we will notice.
[ref_count] is zero iff [count] is zero:
- Initially, both are one.
- If we set [ref_count] to zero, we send a release and set [count] to zero.
- There is no other way for [count] to become zero.
*)typet={id:ImportId.t;mutableref_count:RC.t;(* The switchable holds one until resolved, plus each [resolve_target] adds one. *)mutablecount:int;(* Number of times remote sent us this. *)mutableused:bool;(* We have sent a message to this target (embargo needed on resolve). *)settled:bool;(* This was a SenderHosted - it can't resolve except to an exception. *)mutableresolution:disembargo_info;proxy:Cap_proxy.resolver_capWeak_ptr.t;(* Our switchable ([Weak_ptr.t] is mutable). *)strong_proxy:<>optionref;(* Keeps the switchable alive if there are callbacks registered. *)}anddisembargo_info=[|`Unresolved|`Local|`Importoft(* Holds ref *)|`Error]letidt=t.idletppft=Fmt.pff"i%a"ImportId.ppt.idletdumpft=Fmt.pff"%a -> %a"RC.ppt.ref_countpp_weak(Weak_ptr.gett.proxy)letget_proxyt=Weak_ptr.gett.proxy(* A new export from the peer. *)letinc_countt=assert(t.count>0);t.count<-t.count+1(* A new local reference (resolve_target). *)letinc_reft=assert(t.count>0);letppf=ppftint.ref_count<-RC.succ~ppt.ref_countletclear_proxyt=Weak_ptr.cleart.proxy(* A [resolve_target] or our switchable no longer needs us. *)letrecdec_reft=letppf=ppftint.ref_count<-RC.pred~ppt.ref_count;ifRC.is_zerot.ref_countthen(assert(t.count>0);letcount=t.countint.used<-false;t.count<-0;letfree_resolution=matcht.resolutionwith|`Importi->dec_refi|`Unresolved|`Local|`Error->[]in`Release(t,count)::free_resolution)else[]letmark_usedt=t.used<-trueletusedt=t.usedletmark_resolvedt~get_importresult=ift.resolution<>`UnresolvedthenFmt.failwith"Got Resolve for already-resolved import %a"pptelsematchresultwith|Error_->t.resolution<-`Error|Okdesc->letinfo=matchdescwith|`None->`Error(* Probably shouldn't happen *)|`Error_->`Error(* Don't need to embargo errors *)|`SenderPromiseid|`SenderHostedid->`Import(get_importid)|`ThirdPartyHosted_->failwith"todo: disembargo_reply: ThirdPartyHosted"|`ReceiverAnswer_|`ReceiverHosted_->`Localint.resolution<-infoletmessage_targett=`ReceiverHostedt.idletembargo_patht=ift.usedthenSome(message_targett)elseNone(* We have a reference pointing at this target and may need to perform an embargo.
This could be either because we pipelined messages to this import, or we pipelined
messages to another remote object which we know forwarded them to this one. *)letdisembargo_targett=matcht.resolutionwith|`Unresolved->None(* No embargoes needed yet *)|`Error->None(* Don't need to embargo errors *)|`Import_asi->Somei|`Local->(* The import resolved to a local service. Send a loopback disembargo to flush the path. *)Some(message_targett)letinit_proxytproxy=assert(get_proxyt=None);Weak_ptr.sett.proxyproxyletcheckt=letppf=ppftinRC.check~ppt.ref_count;(* Count starts at one and is only incremented, or set to zero when ref_count is zero. *)ift.count<1thenDebug.invariant_broken(funf->Fmt.pff"Import remote count < 1, but still in table: %a"dumpt);matchget_proxytwith|Somex->x#check_invariants|None->()letlost_connectiont~broken_cap=matchget_proxytwith|Someswitchablewhenswitchable#problem=None->switchable#resolvebroken_cap|_->()letv~mark_dirty~settledid={ref_count=RC.one;count=1;id=id;proxy=Weak_ptr.empty();strong_proxy=refNone;settled;resolution=`Unresolved;used=mark_dirty;}endmoduleQuestion=struct(* State model for questions:
1. waiting: returned=false, rc>0 (initial state)
We're waiting for some results. (Initially rc=1, from our remote promise)
On rc=0: send Finish, goto cancelled
On return: goto lingering
2. cancelled: returned=false, rc=0
We told the peer we don't care about the results.
It may return them anyway or return "cancelled".
We will never send another message referring to this question.
On return: goto complete (with results=cancelled)
3. lingering: returned=true, rc>0
We've got the results and want to finish, but we can't yet because rc>0.
On rc=0: send Finish, goto complete
4: complete: returned=true, rc=0 (final state)
Finish has been sent and we are no longer in the questions table.
We will never send or receive another message referring to this question.
The user releases the remote promise to indicate that they no longer care
about the result. The remote promise will then decrement rc.
The "return" event means the peer sent us a Return message.
rc>0 means that we may still want to send messages about this question.
This could be either messages sent via the remote promise, or disembargo
replies.
The disembargo case happens when e.g. we told the peer that a capability
in the answer to a question they asked us will come from this question
that we asked them.
Initially, our [answer] for the peer's question holds a reference to us
via our remote promise so that it can forward any calls. This will
prevent us from reaching rc=0. Once we get the results, the answer will
update to point directly at the new target and the remote promise may be
released then.
However, the peer may still need to send a disembargo request to the
answer, which must be sent back to its *original* target (this
questions's answer). For this case, we must keep our question alive a
little longer, until the peer finishes their question.
We expect this to happen soon because there's no point in the peer
holding on to their question now that they've got the answer.
Every path contains exactly one Finish, exactly one Return, and exactly
one release (rc=0) event.
*)typedisembargo_info=[|`Error|`Elsewhere|`Local|`ResultsofWire.Response.t*[|`Local|`ImportofImport.t(* Holds ref *)|`None]RO_array.t]typestate=|Waiting|Cancelled|Lingeringofdisembargo_info|Completetypet={id:QuestionId.t;mutableremote_promise:[`ResolverofCore_types.struct_resolver|`Released];mutableref_count:RC.t;mutablestate:state;params_for_release:ExportId.tlist;(* For level 0 peers *)mutablepipelined_fields:PathSet.t;(* Fields used while unresolved; will need embargoes *)}letidt=t.idletppfq=Fmt.pff"q%a"QuestionId.ppq.idletpp_disembargo_descf=function|`Local->Fmt.stringf"local"|`Importi->Import.ppfi|`None->Fmt.stringf"none"letpp_disembargo_infof:disembargo_info->unit=function|`Error->Fmt.stringf"error"|`Elsewhere->Fmt.stringf"elsewhere"|`Local->Fmt.stringf"local"|`Results(_,descs)->(RO_array.pppp_disembargo_desc)fdescsletfree_disembargo_info=function|`Error->[]|`Elsewhere->[]|`Local->[]|`Results(_,descs)->RO_array.fold_left(funacc->function|`None|`Local->acc|`Importi->`Release_importi::acc)[]descsletpp_promisefq=matchq.remote_promisewith|`Released->Fmt.stringf"(released)"|`Resolverp->Fmt.pff"%t"p#ppletpp_statef=function|Waiting->Fmt.stringf"waiting"|Cancelled->Fmt.stringf"cancelled"|Lingeringi->Fmt.pff"lingering:%a"pp_disembargo_infoi|Complete->Fmt.stringf"complete"letdumpft=Fmt.pff"(%a) %a"pp_statet.statepp_promisetletsent_finisht=matcht.statewith|Waiting|Lingering_->false|Cancelled|Complete->trueletinc_reft=assert(not(sent_finisht));letppf=ppftint.ref_count<-RC.succt.ref_count~ppletdec_reft=letppf=ppftint.ref_count<-RC.predt.ref_count~pp;ifRC.is_zerot.ref_countthenmatcht.statewith|Waiting->t.state<-Cancelled;[`Send_finish]|Lingeringret->t.state<-Complete;`Send_finish::`Release_table_entry::free_disembargo_inforet|Complete->[](* (only happens with lost_connection) *)|Cancelled->failwith"Can't hold refs while cancelled!"else[]letrelease_proxyt=t.remote_promise<-`Released;dec_reftletset_cap_usedtpath=t.pipelined_fields<-PathSet.addpatht.pipelined_fieldsletcap_usedtpath=PathSet.mempatht.pipelined_fieldsletmessage_targettpath=`ReceiverAnswer(t.id,path)(* Extract some useful parts of the response that we may need for embargoes. *)letextract_resolution~get_import=function|#Error.t->`Error|`ResultsSentElsewhere->`Elsewhere(* We don't care about the result, so should never embargo *)|`TakeFromOtherQuestion_->`Local(* Remote will forward back to our answer *)|`AcceptFromThirdParty->failwith"todo: answer_cap_needs_disembargo: AcceptFromThirdParty"|`Results(msg,descs)->letextract_cap=function|`ReceiverAnswer_|`ReceiverHosted_->`Local(* Remote will forward back to us *)|`SenderPromiseid|`SenderHostedid->`Import(get_importid)(* Disembargo via import *)|`ThirdPartyHosted_->failwith"todo: answer_cap_needs_disembargo: ThirdPartyHosted"|`None->`Nonein`Results(msg,RO_array.mapextract_capdescs)(* Something resolved to a promised answer, and we sent a disembargo (because the promise was local).
When the disembargo response arrived, the local promise had resolved to this question.
Do we need another embargo? Return the path down which to send the disembargo request, if so. *)letanswer_cap_disembargotpath=ifcap_usedtpaththen(matcht.statewith|Waiting|Cancelled->None(* Can't disembargo if not returned *)|Complete->failwith"Already finished!"(* Previous disembargo should have kept us alive *)|Lingeringret->matchretwith|`Error->None|`Results(msg,descs)->(* If the answer was local, we need to embargo. *)beginmatchCore_types.Wire.Response.cap_indexmsgpathwith|None->None|Somei->matchRO_array.get~oob:`Nonedescsiwith|`Local->Some(message_targettpath)|`Importi->Some(`Importi)|`None->Noneend;|`Elsewhere->None(* We don't care about the result (except for forwarding) *)|`Local->Some(message_targettpath))elseNone(* Not used, so no embargo needed *)(* A [Return] message has arrived. *)letreturntret=matcht.statewith|Waiting->t.state<-Lingeringret;[]|Cancelled->t.state<-Complete;`Release_table_entry::free_disembargo_inforet|Lingering_|Complete->failwith"Already returned!"letpaths_usedt=t.pipelined_fieldsletcheckt=letppf=ppftinmatcht.statewith|Waiting|Lingering_->RC.check~ppt.ref_count(* rc > 0 *)|Cancelled->assert(RC.is_zerot.ref_count)|Complete->Debug.invariant_broken(funf->Fmt.pff"Complete question %a still in table!"dumpt)letlost_connectiont~ex=(* The resolve might get a "release" event, but we'll ignore it in the Complete state.
We don't care about the final state, as the whole table is being dropped anyway. *)t.state<-Complete;matcht.remote_promisewith|`Released->()|`Resolverp->Core_types.resolve_payloadp(Error(`Exceptionex))letresolvetpayload=matcht.remote_promisewith|`Resolverp->Core_types.resolve_payloadppayload|`Released->matchpayloadwith|Okpayload->Core_types.Response_payload.releasepayload|Error_->()letv~params_for_release~remote_promiseid={id;remote_promise=`Resolverremote_promise;ref_count=RC.one;(* Held by [remote_promise] *)state=Waiting;params_for_release;pipelined_fields=PathSet.empty}endtypemessage_target_cap=[|`ImportofImport.t|`QuestionCapofQuestion.t*Wire.Path.t]letpp_message_target_capf=function|`Importi->Import.ppfi|`QuestionCap(q,p)->Fmt.pff"%a[%a]"Question.ppqWire.Path.ppp(* When we answer a question or provide an export, we may tell the peer that the
resource is local to it. In that case, we need to keep a reference to the remote
object so that we can forward disembargo requests. *)typeresolve_target=[message_target_cap|`Local(* Resolved to a local object, not one at the peer. *)|`None(* This capability is not resolved yet *)]moduleAnswer=struct(* State model for answers:
When a call (or bootstrap) arrives, we add a new answer to the answers table.
It is removed from the table when we receive a Finish message for it.
While in the table, any of the following may happen:
- We calculate the answer. We send it back to the peer, and also keep it locally
so that we can forward any pipelined messages or disembargo requests.
- We get a Finish from the peer. The question is cancelled and removed from the table.
We reply with a cancelled message.
- We tell the peer to take the answer from some question we asked.
We forward any pipelined messages to the new question until we get a Finish.
We respond to disembargo requests by sending the response back to the new question.
*)typeactive={answer:Core_types.struct_ref;resolution:[|`Unresolved|`Resolvedofresolve_targetRO_array.t*(* Targets for disembargo replies and forwarding *)ExportId.tlist(* Caps to release if we get releaseResultCaps *)|`ForwardedofQuestion.t(* We replied with TakeFromOtherQuestion *)];}typet={id:AnswerId.t;mutablestate:[|`Activeofactive(* No Finish message has arrived yet. *)|`Finished(* We got a Finish. A return has been sent. *)]}letidt=t.idletanswer_structt=matcht.statewith|`Finished->`Finished|`Activex->`Promisex.answerletppft=Fmt.pff"a%a"AnswerId.ppt.idletpp_resolutionf=function|`Unresolved->Fmt.stringf"(unresolved)"|`Resolved_->Fmt.stringf"(resolved)"|`Forwardedq->Fmt.pff"(forwarded to %a)"Question.ppqletdumpft=matcht.statewith|`Finished->Fmt.pff"(finished)"|`Active{resolution;answer}->Fmt.pff"%a %t"pp_resolutionresolutionanswer#ppletcheckt=matcht.statewith|`Finished->failwith"Finished answer still in table!"|`Active{answer;resolution}->answer#check_invariants;matchresolutionwith|`Forwardedq->Question.checkq|`Unresolved|`Resolved_->()letneeds_returnt=t.state<>`Finished(* We're sending a return message. *)letreturn_resolvedt~exports_for_release~resolve_targets=matcht.statewith|`Finished->Fmt.failwith"Can't return finished answer %a!"ppt|`Activex->assert(x.resolution=`Unresolved);t.state<-`Active{xwithresolution=`Resolved(resolve_targets,exports_for_release)}(* We're sending a TakeFromOtherQuestion return message. *)letreturn_take_from_questiontquestion=matcht.statewith|`Finished->Fmt.failwith"Can't return finished answer %a!"ppt|`Activex->assert(x.resolution=`Unresolved);Question.inc_refquestion;t.state<-`Active{xwithresolution=`Forwardedquestion}(* Remove from Answers table after calling this. *)letfinisht~release_result_caps=matcht.statewith|`Finished->Fmt.failwith"Can't finish already-finished answer %a"ppt|`Active{answer;resolution}->t.state<-`Finished;dec_refanswer;matchresolutionwith|`Unresolved->[`Return_cancelled]|`Resolved(resolve_targets,exports_for_release)->`Release_resolve_targetsresolve_targets::(ifrelease_result_capsthen[`Release_exportsexports_for_release]else[])|`Forwardedq->beginmatchq.Question.remote_promisewith|`Resolverr->(* Notify anyone waiting on the results. *)Core_types.resolve_exnr(Exception.v"Results sent elsewhere")|`Released->()end;[`Question_actions(q,Question.dec_refq)]letlost_connectiont=finish~release_result_caps:truet|>filter_map(function|`Question_actions_|`Release_resolve_targets_asaction->Someaction|`Return_cancelled->None|`Release_exports(_:ExportId.tlist)->None(* There are local to the connection anyway *))(* The target of [path] that we returned as the answer.
[None] if we didn't return yet. *)letresolve_targettpath=matcht.statewith|`Finished->Fmt.failwith"Answer %a is finished!"ppt|`Active{answer;resolution}->matchresolutionwith|`Unresolved->None|`Forwardedq->Some(Ok(`QuestionCap(q,path)))|`Resolved(resolve_targets,_)->matchanswer#responsewith|None->Fmt.failwith"Answer %a is resolved, but no response recorded!"ppt|Some(Error_)ase->e|Some(Okmsg)->matchCore_types.Wire.Response.cap_indexmsgpathwith|Someiwheni>=0&&i<RO_array.lengthresolve_targets->Some(Ok(RO_array.get_exnresolve_targetsi))|Somei->Some(Error(Error.exn"Invalid answer cap index %d"i))|None->Some(Ok`None)letdisembargo_targettpath=matchresolve_targettpathwith|None->Fmt.failwith"Got disembargo request for unresolved answer %a!"ppt|Some(Error_)->failwith"Got disembargo for an exception!"|Some(Oktarget)->targetletcreateaid~answer={id=aid;state=`Active{answer;resolution=`Unresolved};}letcreate_uninitialisedaid={id=aid;state=`Finished;}letinittanswer=matcht.statewith|`Finished->t.state<-`Active{answer;resolution=`Unresolved}|`Active_->Fmt.failwith"Answer %a already initialised!"pptendmoduleExport=structtypet={id:ExportId.t;mutablecount:int;(* Number of times sent to remote and not yet released *)mutableservice:Core_types.cap;mutableresolve_target:resolve_target;(* [`None] if not yet resolved *)}letdumpft=Fmt.pff"%t"t.service#ppletppft=Fmt.pff"e%a"ExportId.ppt.idletinc_reft=t.count<-t.count+1letidt=t.idletcountt=t.countletresolve_targett=t.resolve_targetletservicet=t.serviceletresolvettarget=t.resolve_target<-targetletreleased=Core_types.broken_cap(Exception.v"(released)")letreleasetref_count=assert(t.count>=ref_count);letcount=t.count-ref_countint.count<-count;ifcount>0then`Do_nothingelse(letservice=t.serviceint.service<-released;`Send_release(service,t.resolve_target))letlost_connectiont=dec_reft.service;t.service<-released;t.count<-0letcheckt=t.service#check_invariantsletv~serviceid={count=1;service;id;resolve_target=`None}endtypedescr=[message_target_cap|`ThirdPartyHostedofOut.third_party_desc|`LocalofCore_types.cap]letpp_cap:[<descr]Fmt.t=funf->function|`Importimport->Fmt.pff"%a"Import.ppimport|`QuestionCap(question,p)->Fmt.pff"%a[%a]"Question.ppquestionWire.Path.ppp|`ThirdPartyHosted_third_party_desc->Fmt.pff"ThirdPartyHosted"|`Locallocal->Fmt.pff"local:%t"local#pptyperestorer=((Core_types.cap,Exception.t)result->unit)->string->unittypet={mutablequeue_send:(EP.Out.t->unit);(* (mutable for shutdown) *)tags:Logs.Tag.set;embargoes:(EmbargoId.t*Cap_proxy.resolver_cap)Embargoes.t;restore:restorer;fork:(unit->unit)->unit;questions:Question.tQuestions.t;answers:Answer.tAnswers.t;exports:Export.tExports.t;imports:Import.tImports.t;exported_caps:(Core_types.cap,ExportId.t)Hashtbl.t;mutabledisconnected:Exception.toption;(* If set, this connection is finished. *)}type'aS.brand+=CapTP:(t*message_target_cap)S.brand(* The [CapTP] message asks a capability to tell us its CapTP target, if any. *)type'aS.brand+=CapTP_results:(t*Answer.t)S.brand(* The [CapTP_results] message asks a struct_resolver to tell us if it's for a CapTP answer.
If so, we may be able to send the results directly. *)lettarget_of(x:#Core_types.cap)=x#shortest#sealed_dispatchCapTPletmy_target_oft(x:#Core_types.cap)=matchtarget_ofxwith|Some(t',target)whent==t'->Sometarget|_->Noneletstatst={Stats.n_questions=Questions.activet.questions;n_answers=Answers.activet.answers;n_imports=Imports.activet.imports;n_exports=Exports.activet.exports;}letdefault_restorek_object_id=k@@Error(Exception.v"This vat has no restorer")letcreate?(restore=default_restore)~tags~fork~queue_send={queue_send=(queue_send:>EP.Out.t->unit);tags;restore=restore;fork;questions=Questions.make();answers=Answers.make();imports=Imports.make();exports=Exports.make();embargoes=Embargoes.make();exported_caps=Hashtbl.create30;disconnected=None;}[@@ocaml.warning"-16"](* Too late to change the API now. *)letwith_qidqidt=Logs.Tag.addDebug.qid_tag(QuestionId.uint32qid)t.tagsletwith_aidaidt=Logs.Tag.addDebug.qid_tag(EP.Table.AnswerId.uint32aid)t.tagslettagst=t.tagsletpp_promisef=function|Someq->Question.ppfq|None->Fmt.stringf"(not initialised)"letcheck_connectedt=matcht.disconnectedwith|None->()|Someex->Fmt.failwith"CapTP connection is disconnected (%a)"Exception.ppexmoduleSend:sig(** Converts struct pointers into integer table indexes, ready for sending.
The indexes are only valid until the next message is sent. *)openEP.Core_typesvalbootstrap:t->struct_resolver->Question.tvalcall:t->struct_resolver->message_target_cap->Wire.Request.t->results_to:EP.Out.send_results_to->Question.tvalreturn:t->Answer.t->Core_types.Response_payload.tCore_types.or_error->unitvalrelease:t->Import.t->int->unit(** [release t i count] tells the peer that [i] is no longer needed by us,
decreasing the ref-count by [count]. *)valfinish:t->Question.t->unit(** [finish t q] sends a Finish message (with [releaseResultCaps=false]). *)end=struct(** We are sending [cap] to the client, either in a Return or Resolve message.
Return the [resolve_target] for it, and increment any required
ref-count to keep it alive. *)letget_resolve_targettcap=matchtarget_ofcapwith|Some(t',target)whent==t'->beginmatchtargetwith|`QuestionCap(question,_)astarget->Question.inc_refquestion;target|`Importimportastarget->Import.inc_refimport;targetend|Some_(* todo: third-party *)|None->`Local(** [export ~broken_caps t cap] is a descriptor for [cap].
If [cap] is a proxy object for a service at the peer, tell the peer the target directly.
Otherwise, export it to the peer (reusing an existing export, if any).
If the cap is broken and needs a fresh export, we queue up a suitable resolve message
on [broken_caps]. This is needed for return messages. *)letrecexport?broken_caps:t->Core_types.cap->Out.desc=funtcap->letcap=cap#shortestinmatchmy_target_oftcapwith|Some(`Importimport)->Import.message_targetimport|Some(`QuestionCap(question,i))->Question.message_targetquestioni|None->letproblem=cap#problemin(* Exceptions are considered unsettled because we need to resolve them to
an error later (due to a protocol limitation). *)letsettled=problem=None&&cap#blocker=Noneinletex=matchHashtbl.findt.exported_capscapwith|id->letex=Exports.find_exnt.exportsidinExport.inc_refex;ex|exceptionNot_found->Core_types.inc_refcap;letex=Exports.alloct.exports(Export.v~service:cap)inletid=Export.idexinHashtbl.addt.exported_capscapid;beginmatchproblem,broken_capswith|Someproblem,Somebroken_caps->Queue.add(ex,problem)broken_caps|Some_,_->failwith"Cap is broken, but [broken_caps] not provided!"|None,_whensettled->()|None,_->Log.debug(funf->f~tags:t.tags"Monitoring promise export %a -> %a"Export.ppexExport.dumpex);cap#when_more_resolved(funx->ifExport.countex>0then(letx=x#shortestinmatchx#problemwith|Someproblem->Log.debug(funf->f~tags:t.tags"Export %a resolved to %t - sending exception"Export.ppexx#pp);t.queue_send(`Resolve(Export.idex,Errorproblem));|None->letnew_export=exporttxinLog.debug(funf->f~tags:t.tags"Export %a resolved to %t - sending notification to use %a"Export.ppexx#ppOut.pp_descnew_export);Export.resolveex(get_resolve_targettx);t.queue_send(`Resolve(Export.idex,Oknew_export)););(* else: no longer an export *)Core_types.dec_refx)end;exinletid=Export.idexinifsettledthen`SenderHostedidelse`SenderPromiseidletbootstraptremote_promise=Questions.alloct.questions(Question.v~params_for_release:[]~remote_promise)(* This is for level 0 implementations, which don't understand about releasing caps. *)letexports_of=RO_array.fold_left(funacc->function|`SenderPromiseid|`SenderHostedid|`ThirdPartyHosted(_,id)->id::acc|`None|`ReceiverAnswer_|`ReceiverHosted_->acc)[](* For some reason, we can't send broken caps in a payload message. So, if
any of them are broken, we send a resolve for them immediately afterwards. *)letresolve_brokent=Queue.iter@@fun(ex,problem)->Log.debug(funf->f~tags:t.tags"Sending resolve for already-broken export %a : %a"Export.ppexExport.dumpex);t.queue_send(`Resolve(Export.idex,Errorproblem))letcalltremote_promise(target:message_target_cap)msg~results_to=letbroken_caps=Queue.create()inletcaps=Core_types.Request_payload.snapshot_capsmsginletdescs=RO_array.map(export~broken_capst)capsinletquestion=Questions.alloct.questions(Question.v~params_for_release:(exports_ofdescs)~remote_promise)inletmessage_target=matchtargetwith|`Importimport->Import.mark_usedimport;Import.message_targetimport|`QuestionCap(question,i)->Question.set_cap_usedquestioni;Question.message_targetquestioniinletqid=Question.idquestioninLog.debug(funf->f~tags:(with_qidqidt)"Sending: (%a).call %a"pp_captargetCore_types.Request_payload.ppmsg);t.queue_send(`Call(qid,message_target,msg,descs,results_to));resolve_brokentbroken_caps;questionletreturn_resultstanswermsg=letcaps=Core_types.Response_payload.snapshot_capsmsginletaid=Answer.idanswerinletcaps=RO_array.map(func->c#shortest)capsinLog.debug(funf->f~tags:(with_aidaidt)"Returning results: %a"Core_types.Response_payload.ppmsg);RO_array.iterCore_types.inc_refcaps;(* Copy everything stored in [msg]. *)letbroken_caps=Queue.create()inletdescs=RO_array.map(export~broken_capst)capsinletexports_for_release=exports_ofdescsinletresolve_targets=RO_array.map(get_resolve_targett)capsinAnswer.return_resolvedanswer~exports_for_release~resolve_targets;RO_array.iterCore_types.dec_refcaps;letret=`Results(msg,descs)inLog.debug(funf->f~tags:(with_aidaidt)"Wire results: %a"Out.pp_returnret);t.queue_send(`Return(aid,ret,false));resolve_brokentbroken_capsletreturntanswerret=letaid=Answer.idanswerinmatchretwith|Okpayload->return_resultstanswerpayload|Errorerr->letret=(err:Error.t:>Out.return)inLog.debug(funf->f~tags:(with_aidaidt)"Returning error: %a"Error.pperr);Answer.return_resolvedanswer~exports_for_release:[]~resolve_targets:RO_array.empty;t.queue_send(`Return(aid,ret,false))letreleasetimportcount=Imports.releaset.imports(Import.idimport);Log.debug(funf->f~tags:t.tags"Sending release of %a"Import.ppimport);t.queue_send(`Release(Import.idimport,count))letfinishtquestion=letqid=Question.idquestioninLog.debug(funf->f~tags:(with_qidqidt)"Send finish %a"Question.pp_promisequestion);t.queue_send(`Finish(qid,false))endletapply_import_actionst=List.iter@@function|`Release(i,count)->Send.releaseticountletapply_question_actionstq=List.iter@@function|`Send_finish->Send.finishtq|`Release_table_entry->Questions.releaset.questions(Question.idq)|`Release_importi->Import.dec_refi|>apply_import_actionstletrelease_resolve_targett=function|`None|`Local->()|`QuestionCap(q,_)->Question.dec_refq|>apply_question_actionstq;|`Importi->Import.dec_refi|>apply_import_actionstletreleasetexport_id~ref_count=assert(ref_count>0);letexport=Exports.find_exnt.exportsexport_idinmatchExport.releaseexportref_countwith|`Do_nothing->()|`Send_release(service,resolve_target)->Log.debug(funf->f~tags:t.tags"Releasing export %a"Export.ppexport);Hashtbl.removet.exported_capsservice;Exports.releaset.exportsexport_id;dec_refservice;release_resolve_targettresolve_targetletapply_answer_actionstanswer=List.iter@@function|`Return_cancelled->letaid=Answer.idanswerinLog.debug(funf->f~tags:(with_aidaidt)"Returning cancelled");t.queue_send(`Return(aid,`Cancelled,false));|`Release_exportsexports->List.iter(releaset~ref_count:1)exports|`Release_resolve_targetstargets->RO_array.iter(release_resolve_targett)targets|`Question_actions(q,actions)->apply_question_actionstqactions(* Note: takes ownership of [msg] *)letrecsend_callttarget(results:Core_types.struct_resolver)msg=matcht.disconnectedwith|Someex->results#resolve(Core_types.broken_struct(`Exceptionex))|None->matchresults#sealed_dispatchCapTP_resultswith|Some(_,answer)whennot(Answer.needs_returnanswer)->(* The only reason for making this call is to answer a cancelled question (we already
replied to it to confirm the cancellation). No-one cares about the answer, so do nothing. *)Core_types.Request_payload.releasemsg;Core_types.resolve_exnresultsException.cancelled(* (might be useful for debugging) *)|Some(t',answer)whent==t'->letremote_promise=make_remote_promisetin(* [results] is at [target]. We can tell the peer to send the results to itself directly. *)letquestion=Send.callt(remote_promise:>Core_types.struct_resolver)targetmsg~results_to:`YourselfinCore_types.Request_payload.releasemsg;remote_promise#set_questionquestion;Answer.return_take_from_questionanswerquestion;letaid=Answer.idanswerinLog.debug(funf->f~tags:(with_aidaidt)"Returning take-from-other-question %a"Question.ppquestion);t.queue_send(`Return(aid,`TakeFromOtherQuestion(Question.idquestion),false));(* Hook this up so we still forward any pipelined calls *)results#resolve(remote_promise:>Core_types.struct_ref)|_->letremote_promise=make_remote_promisetinletquestion=Send.callt(remote_promise:>Core_types.struct_resolver)targetmsg~results_to:`CallerinCore_types.Request_payload.releasemsg;remote_promise#set_questionquestion;results#resolve(remote_promise:>Core_types.struct_ref)(* A cap that sends to a promised answer's cap at other *)andmake_remote_promiset=object(self:#Core_types.struct_resolver)inherit[Question.toption]Struct_proxy.tNonevalmutablereleased_question=false(* We send Finish in two cases:
- We are unresolved and the user wants to cancel.
- We got the result and want the server to free up the question.
Send Finish on whichever happens first, but not both.
*)methodprivateensure_releasedq=ifnotreleased_questionthen(released_question<-true;Question.release_proxyq|>apply_question_actionstq)methoddo_pipelinequestioniresultsmsg=matchquestionwith|Sometarget_q->lettarget=`QuestionCap(target_q,i)insend_callttargetresultsmsg|None->failwith"Not initialised!"methodon_resolveq_=matchqwith|None->failwith"Not initialised!"|Someq->self#ensure_releasedqvalname="remote-promise"methodpp_unresolved=pp_promisemethodset_questionq=self#update_target(Someq)methodsend_cancel=function|None->failwith"Not initialised!"|Someq->self#ensure_releasedqmethodfield_sealed_dispatch:typea.Wire.Path.t->aS.brand->aoption=funpath->function|CapTP->beginmatchstatewith|Unresolvedu->beginmatchu.targetwith|None->failwith"Not intialised!"|Sometarget_q->Some(t,`QuestionCap(target_q,path))end|_->failwith"Not a promise!"end;|_->Noneendletdisembargotrequest=Log.debug(funf->f~tags:t.tags"Sending disembargo %a"EP.Out.pp_disembargo_requestrequest);t.queue_send(`Disembargo_requestrequest)letbootstraptobject_id=matcht.disconnectedwith|Someex->Core_types.broken_capex|None->letresult=make_remote_promisetinletquestion=Send.bootstrapt(result:>Core_types.struct_resolver)inresult#set_questionquestion;letqid=Question.idquestioninLog.debug(funf->f~tags:(with_qidqidt)"Sending: bootstrap");t.queue_send(`Bootstrap(qid,object_id));letservice=result#capWire.Path.rootindec_refresult;servicemoduleSwitchable=structclasstypehandler=objectmethodpp:Format.formatter->unitmethodsealed_dispatch:'a.'aS.brand->'aoptionmethodcall:Core_types.struct_resolver->Wire.Request.t->unitendtypeunset={mutablerc:RC.t;handler:handler;(* Will forward calls here until set *)on_set:(Core_types.cap->unit)Queue.t;on_release:(unit->unit)Queue.t;}typestate=|Unsetofunset|SetofCore_types.capletreleased=Core_types.broken_cap(Exception.v"(released)")letpp_statef=function|Unsetx->Fmt.pff"(unset, %a) -> %t"RC.ppx.rcx.handler#pp|Setx->Fmt.pff"(set) -> %t"x#pplettarget=function|Unsetx->x.handler|Setx->(x:>handler)type'aS.brand+=Gc:unitS.brand(* Forwards messages to [init] until resolved.
Forces [release] when resolved or released.
Stores a reference to itself in [strong_proxy] when unresolved but with queued callbacks. *)letmake~(release:unitLazy.t)~settled~strong_proxyinit=object(self:#Core_types.cap)valid=Debug.OID.next()valthread_id=Thread.(id(self()))valmutablestate=Unset{rc=RC.one;handler=init;on_set=Queue.create();on_release=Queue.create()}methodcallmsgcaps=(targetstate)#callmsgcapsmethodupdate_rcd=matchstatewith|Unsetu->u.rc<-RC.sumu.rcd~pp:(funf->self#ppf);ifRC.is_zerou.rcthen(Lazy.forcerelease;letold_state=stateinstrong_proxy:=None;state<-Setreleased;matchold_statewith|Unsetu->Queue.iter(funf->f())u.on_release|Setx->dec_refx)|Setx->x#update_rcdmethodresolvecap=matchstatewith|Set_->Fmt.failwith"Can't resolve already-set switchable %t to %t!"self#ppcap#pp|Unset{handler=_;rc;on_set;on_release}->letppf=self#ppfinRC.check~pprc;state<-Setcap;strong_proxy:=None;beginmatchRC.to_intrcwith|Somerc->cap#update_rc(rc-1);(* Transfer our ref-count *)|None->()end;Queue.iter(funf->f(with_inc_refcap))on_set;Queue.iter(funf->cap#when_releasedf)on_release;Lazy.forcereleasemethodbreakex=self#resolve(Core_types.broken_capex)methodshortest=matchstatewith|Unset_->(self:>Core_types.cap)(* Can't shorten, as we may change later *)|Setx->x#shortestmethodblocker=matchstatewith|Unset_whensettled->None|Unset_->Some(self:>Core_types.base_ref)|Setx->x#blockermethodproblem=matchstatewith|Unset_->None|Setx->x#problemmethodwhen_more_resolvedfn=matchstatewith|Unsetx->Queue.addfnx.on_set;strong_proxy:=Some(self:><>)|Setx->x#when_more_resolvedfnmethodwhen_releasedfn=matchstatewith|Unsetx->Queue.addfnx.on_release;strong_proxy:=Some(self:><>)|Setx->x#when_releasedfn(* When trying to find the target for export, it's OK to expose our current
target, even though [shortest] shouldn't.
In particular, we need this to deal with disembargo requests. *)methodsealed_dispatch:typea.aS.brand->aoption=function|CapTP->beginmatchstatewith|Unsetx->x.handler#sealed_dispatchCapTP|Setx->x#shortest#sealed_dispatchCapTPend|Gc->beginmatchstatewith|Unsetx->Core_types.Wire.ref_leak_detectedthread_id(fun()->ifRC.is_zerox.rcthen(Log.warn(funf->f"@[<v2>Reference GC'd with non-zero ref-count!@,%t@,\
But, ref-count is now zero, so a previous GC leak must have fixed it.@]"self#pp);)else(Log.warn(funf->f"@[<v2>Reference GC'd with %a!@,%t@]"RC.ppx.rcself#pp);x.rc<-RC.leaked;state<-Setreleased;(* Don't call resolve; rc is now invalid *)assert(!strong_proxy=None);(* Otherwise we wouldn't have been GC'd *)Queue.iter(funf->freleased)x.on_set;Lazy.forcerelease));|Set_->()end;Some()|_->Nonemethodcheck_invariants=matchstatewith|Unsetu->letppf=self#ppfinassert((!strong_proxy=None)=(Queue.is_emptyu.on_set&&Queue.is_emptyu.on_release));RC.check~ppu.rc|Setx->assert(!strong_proxy=None);x#check_invariantsmethodppf=Fmt.pff"switchable(%a) %a"Debug.OID.ppidpp_statestateinitializerGc.finalise(fun(self:#Core_types.base_ref)->ignore(self#sealed_dispatchGc))selfendendmoduleInput:sigopenEP.Core_typesvalcall:t->In.QuestionId.t->In.message_target->Wire.Request.t->In.descRO_array.t->results_to:In.send_results_to->unitvalbootstrap:t->In.QuestionId.t->string->unitvalreturn:t->In.AnswerId.t->In.return->release_param_caps:bool->unitvalfinish:t->In.QuestionId.t->release_result_caps:bool->unitvalrelease:t->In.ImportId.t->ref_count:int->unitvaldisembargo_request:t->In.disembargo_request->unitvaldisembargo_reply:t->In.message_target->Message_types.EmbargoId.t->unitvalresolve:t->In.ExportId.t->(In.desc,Exception.t)result->unitend=structletrelease=releaseletset_import_proxyt~settledimport=letmessage_target=`Importimportinletcap=object(_:Switchable.handler)valid=Debug.OID.next()methodcallresultsmsg=send_calltmessage_targetresultsmsgmethodppf=ifsettledthenFmt.pff"far-ref(%a) -> %a"Debug.OID.ppidpp_capmessage_targetelseFmt.pff"remote-promise(%a) -> %a"Debug.OID.ppidpp_capmessage_targetmethodsealed_dispatch:typea.aS.brand->aoption=function|CapTP->Some(t,message_target)|_->Noneendinletrelease=lazy(Import.clear_proxyimport;Import.dec_refimport|>apply_import_actionst;)in(* Imports can resolve to another cap (if unsettled) or break. *)letswitchable=Switchable.make~release~settled~strong_proxy:import.strong_proxycapinImport.init_proxyimportswitchable;switchableletimport_sendert~mark_dirty~settledid=letnew_import()=letimport=Import.vid~mark_dirty~settledinImports.sett.importsidimport;(set_import_proxyt~settledimport:>Core_types.cap)inmatchImports.findt.importsidwith|None->new_import()|Someimport->Import.inc_countimport;ifmark_dirtythenImport.mark_usedimport;matchImport.get_proxyimportwith|Someproxy->Core_types.inc_refproxy;(proxy:>Core_types.cap)|None->(* The switchable got GC'd. It has already dec-ref'd the import.
Make a new proxy. *)Import.inc_refimport;(set_import_proxyt~settledimport:>Core_types.cap)(* We previously pipelined messages to [old_path], which now turns out to be
local object [x]. Create an embargo proxy for [x] and send a disembargo
for it down [old_path]. *)letlocal_embargot~old_pathx=ifx#problem<>Nonethenx(* Don't embargo errors *)else(letembargo=Cap_proxy.local_promise()in(* Store in table *)inc_refembargo;let(embargo_id,_)=Embargoes.alloct.embargoes(funid->(id,embargo))in(* Send disembargo request *)letdisembargo_request=`Loopback(old_path,embargo_id)inLog.debug(funf->f~tags:t.tags"Embargo %t until %a is delivered"x#ppEP.Out.pp_disembargo_requestdisembargo_request);disembargotdisembargo_request;(* We don't store [x], because when the reply comes back further disembargoes
may be needed. *)dec_refx;(embargo:>Core_types.cap))letmaybe_embargot~old_pathx=matchold_pathwith|None->x|Someold_path->local_embargot~old_pathx(* Turn a connection-scoped cap reference received from our peer into a general-purpose
cap for users. The caller owns the new reference and should [dec_ref] it when done.
This is used in several places:
- When we get a Resolve message (which must be for a SenderPromise or SenderHosted)
- When we get a Return message (resolving each PromisedAnswer from the question)
- When we receive a Call (no embargoes needed in that case; you can't pipeline over the
arguments of a call you haven't yet received)
- When we get a Disembargo_reply (might need another embargo)
If [embargo_path] is passed then we have already pipelined messages over this cap, and
may therefore need to embargo it (sending the disembargo via the old [embargo_path]).
This can only happen when a local promise at the peer has resolved, and we are given its
new target. *)letimportt?embargo_path:In.desc->Core_types.cap=function|`SenderPromiseid->import_sendertid~settled:false~mark_dirty:(embargo_path<>None)|`SenderHostedid->(* We don't have to worry about embargoes here because anything we've already sent to the peer will
arrive before anything we send in the future, and it can't change. *)import_sendertid~settled:true~mark_dirty:false|`ReceiverHostedid->letexport=Exports.find_exnt.exportsidin(* We host the target (which may be another promise).
We need to flush any pipelined calls still in flight before we can use it. *)maybe_embargot~old_path:embargo_path(with_inc_ref(Export.serviceexport))|`ReceiverAnswer(id,path)->letanswer=Answers.find_exnt.answersidinbeginmatchAnswer.answer_structanswerwith|`Finished->failwith"Got ReceiverAnswer for a finished promise!"|`Promiseanswer_promise->matchanswer_promise#responsewith|None->(* We don't know the answer, so we can't have replied yet.
We can send a disembargo request now and the peer will get it before
any answer and return it to us. *)maybe_embargot~old_path:embargo_path(answer_promise#cappath)|Some(Error_)->answer_promise#cappath(* No need to embargo errors *)|Some(Okpayload)->(* We've already replied to this question. If we returned a capability at the
requested path then send a disembargo request to it. If not, we can't embargo,
because the peer has no way to return the response, but we don't need to for errors. *)matchCore_types.Response_payload.fieldpayloadpathwith|None->answer_promise#cappath(* Don't embargo None *)|Somec->dec_refc;maybe_embargot~old_path:embargo_path(answer_promise#cappath)end|`None->Core_types.null|`ThirdPartyHosted_->failwith"todo: import"letmake_answer_promisetanswer~results_to=letaid=Answer.idanswerinletpromise,resolver=Local_struct_promise.make()inlet()=promise#when_resolved@@funx->ifAnswer.needs_returnanswer&&t.disconnected=Nonethen(matchresults_towith|`Caller->Send.returntanswerx|`Yourself->Log.debug(funf->f~tags:(with_aidaidt)"Returning results-sent-elsewhere");Answer.return_resolvedanswer~exports_for_release:[]~resolve_targets:RO_array.empty;t.queue_send(`Return(aid,`ResultsSentElsewhere,false))|`ThirdParty_->failwith"todo: handle call by sending results to ThirdParty")(* (else cancelled; reply already sent) *)inletresolver=ifresults_to=`Callerthen(object(_:Core_types.struct_resolver)methodppf=Fmt.pff"answer %a <- %t"AnswerId.ppaidresolver#ppmethodresolve=resolver#resolvemethodset_blocker=resolver#set_blockermethodclear_blocker=resolver#clear_blockermethodsealed_dispatch:typea.aS.brand->aoption=function|CapTP_results->Some(t,answer)|_->Noneend)elseresolverinpromise,resolverletcalltaid(message_target:In.message_target)msgdescs~results_to=(* todo: allowThirdPartyTailCall *)letanswer=Answer.create_uninitialisedaidinletanswer_promise,answer_resolver=make_answer_promisetanswer~results_toinAnswer.initansweranswer_promise;Answers.sett.answersaidanswer;lettarget=matchmessage_targetwith|`ReceiverHostedid->letexport=Exports.find_exnt.exportsidinbeginmatchExport.resolve_targetexportwith|#message_target_capastarget->target|`Local|`None->`Local(with_inc_ref(Export.serviceexport))end|`ReceiverAnswer(id,path)->letanswer=Answers.find_exnt.answersidinmatchAnswer.resolve_targetanswerpathwith|Some(Ok(#message_target_capastarget))->target|Some(Ok(`Local|`None))|Some(Error_)|None->(* Unresolved, error or local - queue on our promise *)matchAnswer.answer_structanswerwith|`Finished->failwith"Call to finished answer (shouldn't be in table!)"|`Promiseanswer_promise->`Local(answer_promise#cappath)inletcaps=RO_array.map(importt)descsinletmsg=Core_types.Request_payload.with_capscapsmsginmatchtargetwith|`Localtarget->Log.debug(funf->f~tags:t.tags"Handling call: (%t).call %a"target#ppCore_types.Request_payload.ppmsg);t.fork(fun()->target#callanswer_resolvermsg;(* Takes ownership of [caps]. *)dec_reftarget)|#message_target_capastarget->Log.debug(funf->f~tags:t.tags"Forwarding call: (%a).call %a"pp_message_target_captargetCore_types.Request_payload.ppmsg);send_callttargetanswer_resolvermsgletbootstraptidobject_id=letpromise,answer_resolver=Local_struct_promise.make()inletanswer=Answer.createid~answer:promiseinAnswers.sett.answersidanswer;t.fork@@fun()->object_id|>t.restore@@funservice->ifAnswer.needs_returnanswer&&t.disconnected=Nonethen(letresults=matchservicewith|Errorex->Error(`Exceptionex)|Okservice->letmsg=Wire.Response.bootstrap()|>Core_types.Response_payload.with_caps(RO_array.of_list[service])inOkmsginCore_types.resolve_payloadanswer_resolverresults;Send.returntanswerresults)else(Result.iterdec_refservice)letreturn_resultstquestionmsgdescrs=letcaps_used=Question.paths_usedquestion|>caps_used~msginletimport_with_embargoescap_indexd=letembargo_path=matchIntMap.find_optcap_indexcaps_usedwith|None->None|Somepath->Some(Question.message_targetquestionpath)inimporttd?embargo_pathinRO_array.mapiimport_with_embargoesdescrs(* Turn wire descriptors into local objects, creating embargoes and sending
disembargo messages where necessary. *)letimport_return_capstquestionret=matchretwith|`Results(msg,descs)->letcaps=return_resultstquestionmsgdescsin`Results(Core_types.Response_payload.with_capscapsmsg)|#Error.taserr->err|`ResultsSentElsewhere->`ResultsSentElsewhere|`AcceptFromThirdParty->failwith"todo: AcceptFromThirdParty"|`TakeFromOtherQuestionaid->matchAnswer.answer_struct(Answers.find_exnt.answersaid)with|`Finished->Fmt.failwith"Can't take from answer %a - it's already finished!"AnswerId.ppaid|`Promiseother->matchquestion.remote_promisewith|`Released->`TakeFromCancelledQuestion|`Resolverremote_promise->inc_refother;letpaths_used=question.Question.pipelined_fieldsinifPathSet.is_emptypaths_usedthen`TakeFromOtherQuestion(other,remote_promise)else((* Embargoes needed for paths that were used when we got the original return. *)Question.inc_refquestion;`TakeFromOtherQuestionAndEmbargo(other,remote_promise,paths_used)(* Note: ref's question *))letreturntqidret~release_param_caps=letquestion=Questions.find_exnt.questionsqidinifrelease_param_capsthenList.iter(releaset~ref_count:1)question.params_for_release;letret2=import_return_capstquestionretin(* Any disembargo requests have now been sent, so we may finish the question. *)letget_importid=leti=Imports.find_exnt.importsidinImport.inc_refi;iinletdisembargo_info=Question.extract_resolution~get_importretinQuestion.returnquestiondisembargo_info|>apply_question_actionstquestion;beginmatchret2with|`Resultsmsg->Log.debug(funf->f~tags:(with_qidqidt)"Got results: %a"Core_types.Response_payload.ppmsg);Question.resolvequestion(Okmsg)|`TakeFromCancelledQuestion->()(* Ignore response to cancelled Question *)|`TakeFromOtherQuestion(other,remote_promise)->(* (we have added a ref on [other]) *)remote_promise#resolve(other:>Core_types.struct_ref)|`TakeFromOtherQuestionAndEmbargo(other,remote_promise,paths_used)->(* (we have added a ref on [other] and on [question]) *)(* Embargoes needed for paths that were used when we got the original return. *)(* We must resolve the remote promise to a local embargo now, so nothing else
gets sent to it. *)letembargo,embargo_resolver=Local_struct_promise.make()inbeginmatchembargo_resolver#set_blocker(other:>Core_types.base_ref)with|Error`Cycle->assertfalse(* Can't happen; we just made it! *)|Ok()->remote_promise#resolveembargo;other#when_resolved(funpayload->(* XXX: Do we need a when_more_resolved here instead?
We might need multiple disembargoes as things resolve further... *)embargo_resolver#clear_blocker;beginmatchpayloadwith|Error_ase->Core_types.resolve_payloadembargo_resolvere;|Okmsg->letembargoes_needed=caps_used~msgpaths_usedinletmaybe_embargocap_indexcap=inc_refcap;matchIntMap.find_optcap_indexembargoes_neededwith|None->cap|Somepath->letold_path=Question.message_targetquestionpathinlocal_embargot~old_pathcapinletcaps=Core_types.Response_payload.snapshot_capsmsg|>RO_array.mapimaybe_embargoinCore_types.resolve_okembargo_resolver(Core_types.Response_payload.with_capscapsmsg)end;dec_refother;Question.dec_refquestion|>apply_question_actionstquestion)end;|`ResultsSentElsewhere->(* Keep unresolved; we must continue forwarding *)()|#Error.taserr->Question.resolvequestion(Errorerr)endletfinishtaid~release_result_caps=letanswer=Answers.find_exnt.answersaidinLog.debug(funf->f~tags:(with_aidaidt)"Received finish for %a"Answer.dumpanswer);Answers.releaset.answersaid;Answer.finishanswer~release_result_caps|>apply_answer_actionstanswerletsend_disembargotembargo_idtarget=letdesc=matchtargetwith|`None->Fmt.failwith"Protocol error: disembargo request for None cap"|`Local->Fmt.failwith"Protocol error: disembargo request for local target"|`QuestionCap(question,path)->Question.message_targetquestionpath|`Importimport->Import.message_targetimportinLog.debug(funf->f~tags:t.tags"Sending disembargo response to %a"EP.Out.pp_descdesc);t.queue_send(`Disembargo_reply(desc,embargo_id))(* If we're trying to disembargo something that resolved to an import, try to disembargo on
that instead. *)letrecdisembargo_imports=function|None|Some#EP.Out.message_targetasx->x|Some(`Importi)->disembargo_imports(Import.disembargo_targeti)letdisembargo_requesttrequest=Log.debug(funf->f~tags:t.tags"Received disembargo request %a"EP.In.pp_disembargo_requestrequest);matchrequestwith|`Loopback(old_path,embargo_id)->matchold_pathwith|`ReceiverHostedeid->send_disembargotembargo_id(Exports.find_exnt.exportseid|>Export.resolve_target);|`ReceiverAnswer(aid,path)->letanswer=Answers.find_exnt.answersaidinsend_disembargotembargo_id(Answer.disembargo_targetanswerpath)(* [double_disembargo_path resolve_target] is used to check whether we need a second embargo
after a previous disembargo arrived back at an answer or export which is now resolved to
[resolve_target]. If [resolve_target] is set and is remote and we have pipelined messages
to it then we return the path to use for the next disembargo step.
Returns [None] if no embargo is needed at the moment. *)letdouble_disembargo_path=function|`None->(* We haven't resolved yet. Any pipelined messages have been queued up locally and will
be forwarded when the answer or export does resolve. We can safely make calls directly
on the local promise. *)None|`Local->(* We resolved to a local service (possibly another promise).
Any pipelined messages have already been delivered to it and we can therefore send it
messages immediately. *)None|`QuestionCap(q,path)->(* We resolved to a question. If we have pipelined messages to the question and it has
resolved to a local service then we may need another embargo before using the new
local target. *)Question.answer_cap_disembargoqpath|>disembargo_imports|`Importi->(* We resolved to an import. If we have pipelined messages to the remote export and it
has resolved to a local service then we will need another embargo before using the new
local target. *)ifImport.usedithendisembargo_imports(Import.disembargo_targeti)elseNoneletdisembargo_replyttargetembargo_id=letembargo=snd(Embargoes.find_exnt.embargoesembargo_id)inLog.debug(funf->f~tags:t.tags"Received disembargo response %a -> %t"EP.In.pp_desctargetembargo#pp);(* A remote export or answer resolved to [target] at some point in the
past and we sent a disembargo request to it to flush any pipelined
messages. This is now done - all pipelined messages that we care about
have been delivered to [target]. However, [target] may have forwarded
them on in turn, so another disembargo may be called for.
Release the old embargo, so that the cap now points at [target]'s
initial resolution. If [target] is already resolved, attempt to update
to its resolution, performing another embargo if necessary.
*)letembargo_path=matchtargetwith|`ReceiverHostedid->letexport=Exports.find_exnt.exportsidindouble_disembargo_path(Export.resolve_targetexport)|`ReceiverAnswer(id,path)->letanswer=Answers.find_exnt.answersidinmatchAnswer.resolve_targetanswerpathwith|None->None(* No embargoes as still unresolved (target still remote) *)|Some(Error_)->None(* No embargoes for errors *)|Some(Okresolve_target)->double_disembargo_pathresolve_targetinletcap=importt?embargo_path(target:>EP.In.desc)inLog.debug(funf->f"Disembargo target is %t"cap#pp);Embargoes.releaset.embargoesembargo_id;embargo#resolvecap;dec_refembargoletresolvetimport_idnew_target=Log.debug(funf->f~tags:t.tags"Received resolve of import %a to %a"ImportId.ppimport_id(Fmt.result~ok:In.pp_desc~error:Exception.pp)new_target);letimport_new_target~embargo_path=matchnew_targetwith|Errore->Core_types.broken_cape|Okdesc->importtdesc?embargo_pathinmatchImports.findt.importsimport_idwith|None->letnew_target=import_new_target~embargo_path:NoneinLog.debug(funf->f~tags:t.tags"Import %a no longer used - releasing new resolve target %t"ImportId.ppimport_idnew_target#pp);dec_refnew_target|Someim->(* Check we're not resolving a settled import. *)ifim.Import.settledthen(letnew_target=import_new_target~embargo_path:Noneinletmsg=Fmt.str"Got a Resolve (to %t) for settled import %a!"new_target#ppImport.dumpimindec_refnew_target;failwithmsg);letget_importid=leti=Imports.find_exnt.importsidinImport.inc_refi;iinmatchImport.get_proxyimwith|Somex->(* This will also dec_ref the old remote-promise and the import. *)lettarget=import_new_target~embargo_path:(Import.embargo_pathim)inImport.mark_resolvedim~get_importnew_target;x#resolvetarget|None->(* If we get here:
- The user released the switchable, but
- Some [resolve_target] kept the import in the table. *)lettarget=import_new_target~embargo_path:NoneinImport.mark_resolvedim~get_importnew_target;Log.debug(funf->f~tags:t.tags"Ignoring resolve of import %a, which we no longer need (to %t)"ImportId.ppimport_idtarget#pp);dec_reftargetendlethandle_unimplementedt(msg:Out.t)=matchmsgwith|`Resolve(_,Error_)->()|`Resolve(_,Oknew_target)->(* If the peer doesn't understand resolve messages, we can just release target. *)beginmatchnew_targetwith|`None|`ReceiverHosted_|`ReceiverAnswer_->()|`SenderHostedid|`SenderPromiseid|`ThirdPartyHosted(_,id)->Input.releasetid~ref_count:1end|`Bootstrap(qid,_)->(* If the peer didn't understand our question, pretend it returned an exception. *)Input.returntqid~release_param_caps:true(Error.exn~ty:`Unimplemented"Bootstrap message not implemented by peer")|`Call(qid,_,_,_,_)->(* This could happen if we asked for the bootstrap object from a peer that doesn't
offer any services, and then tried to pipeline on the result. *)Input.returntqid~release_param_caps:true(Error.exn~ty:`Unimplemented"Call message not implemented by peer!")|_->failwith"Protocol error: peer unexpectedly responded with Unimplemented"letdisconnecttex=ift.disconnected=Nonethen(t.disconnected<-Someex;t.queue_send<-ignore;Exports.drop_allt.exports(fun_->Export.lost_connection);Hashtbl.cleart.exported_caps;Questions.drop_allt.questions(fun_->Question.lost_connection~ex);Answers.drop_allt.answers(fun_a->Answer.lost_connectiona|>apply_answer_actionsta);letbroken_cap=Core_types.broken_capexinImports.drop_allt.imports(fun_->Import.lost_connection~broken_cap);Embargoes.drop_allt.embargoes(fun_(_,e)->e#breakex;dec_refe))lethandle_msgt(msg:[<In.t|`UnimplementedofOut.t])=check_connectedt;matchmsgwith|`Abortex->disconnecttex|`Call(aid,target,msg,descs,results_to)->Input.calltaidtargetmsgdescs~results_to|`Bootstrap(qid,oid)->Input.bootstraptqidoid|`Return(aid,ret,release)->Input.returntaidret~release_param_caps:release|`Finish(aid,release)->Input.finishtaid~release_result_caps:release|`Release(id,count)->Input.releasetid~ref_count:count|`Disembargo_requestreq->Input.disembargo_requesttreq|`Disembargo_reply(target,id)->Input.disembargo_replyttargetid|`Resolve(id,target)->Input.resolvetidtarget|`Unimplementedx->handle_unimplementedtxletdump_embargof(id,proxy)=Fmt.pff"%a: @[%t@]"EmbargoId.ppidproxy#ppletcheck_embargox=(sndx)#check_invariantsletcheck_exported_captcapexport_id=matchExports.find_exnt.exportsexport_idwith|export->ifExport.serviceexport<>capthen(Debug.invariant_broken@@funf->Fmt.pff"export_caps maps %t to export %a back to different cap %a!"cap#ppExport.ppexportExport.dumpexport)|exceptionex->Debug.invariant_broken@@funf->Fmt.pff"exported_caps for %t: %a"cap#ppDebug.pp_exnexletexported_sort_keyexport_id=export_idletpp_exported_captf(cap,export_id)=Fmt.pff"%t => export %a%a"cap#ppExportId.ppexport_id(pp_check(check_exported_capt))(cap,export_id)letdumpft=matcht.disconnectedwith|Somereason->Fmt.pff"Disconnected: %a"Exception.ppreason|None->Fmt.pff"@[<v2>Questions:@,%a@]@,\
@[<v2>Answers:@,%a@]@,\
@[<v2>Exports:@,%a@]@,\
@[<v2>Imports:@,%a@]@,\
@[<v2>Embargoes:@,%a@]@,\
@[<v2>Exported caps:@,%a@]@,"(Questions.dump~check:Question.checkQuestion.dump)t.questions(Answers.dump~check:Answer.checkAnswer.dump)t.answers(Exports.dump~check:Export.checkExport.dump)t.exports(Imports.dump~check:Import.checkImport.dump)t.imports(Embargoes.dump~check:check_embargodump_embargo)t.embargoes(hashtbl_dump~key:exported_sort_key(pp_exported_capt))t.exported_capsletcheckt=Questions.iter(fun_->Question.check)t.questions;Answers.iter(fun_->Answer.check)t.answers;Imports.iter(fun_->Import.check)t.imports;Exports.iter(fun_->Export.check)t.exports;Embargoes.iter(fun_->check_embargo)t.embargoes;Hashtbl.iter(check_exported_capt)t.exported_capsend