123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461moduleLog=Debug.LogmoduleMake(C:S.CORE_TYPES)=structopenC(* Only used internally to detect cycles. *)letcycle_marker=C.broken_cap(Exception.v"<cycle marker>")letcycle_errfmt="@[<v>Attempt to create a cycle detected:@,"^^fmt^^"@]"|>Fmt.kstr@@funmsg->Log.info(funf->f"%s"msg);C.broken_struct(`Exception(Exception.vmsg))classtypestruct_ref_internal=objectinheritstruct_resolvermethodpipeline:Wire.Path.t->C.struct_resolver->Wire.Request.t->unitmethodfield_update_rc:Wire.Path.t->int->unitmethodfield_blocker:Wire.Path.t->base_refoptionmethodfield_when_resolved:Wire.Path.t->(cap->unit)->unit(* (can't use [when_resolved] because that checks the promise isn't finished) *)methodfield_check_invariants:Wire.Path.t->unitmethodfield_sealed_dispatch:'a.Wire.Path.t->'aS.brand->'aoptionmethodfield_pp:Wire.Path.t->Format.formatter->unitendletinvalid_cap=object(_:C.cap)methodcall__=failwith"invalid_cap"methodupdate_rc=failwith"invalid_cap"methodshortest=failwith"invalid_cap"methodwhen_more_resolved_=failwith"invalid_cap"methodppf=Fmt.stringf"(invalid cap)"methodblocker=failwith"invalid_cap"methodcheck_invariants=failwith"invalid_cap"methodsealed_dispatch_=failwith"invalid_cap"methodproblem=failwith"invalid_cap"methodwhen_released=failwith"invalid_cap"endmoduleField_map=Map.Make(Wire.Path)classtypefield_cap=objectinheritcapmethodresolve:cap->unitendtypefield={cap:field_cap;mutableref_count:RC.t;}type'aunresolved={mutabletarget:'a;mutablerc:RC.t;mutablefields:fieldField_map.t;when_resolved:(struct_ref->unit)Queue.t;(* This is non-None only while we are resolving. Then, it initially contains the fields
we're resolving to. Asking for the blocker of a field returns it, but also updates the
array so you can't ask again. *)mutablecycle_detector:(Wire.Response.t*caparray)option;}type'astate=|Unresolvedof'aunresolved|Forwardingofstruct_ref|Finishedletpp_opt_blocked_onf=function|None->()|Someb->Fmt.pff" (blocked on %t)"b#ppletpp_state~pp_promisef=function|Unresolved{target;_}->Fmt.pff"%a"pp_promisetarget|Forwardingp->p#ppf|Finished->Fmt.pff"(finished)"letdispatchstate~unresolved~forwarding=matchstatewith|Finished->failwith"Already finished"|Unresolvedx->unresolvedx|Forwardingx->forwardingxtypepromise_field={sr:struct_ref_internal;path:Wire.Path.t;on_release:(unit->unit)Queue.t;(* Note: currently, a field can never be released while unresolved.
Possibly fields should have their own ref-counts.
However, this doesn't matter for the only user of [on_release], which
is the restorer system (that just needs to know if something becomes
invalid, so it doesn't keep it cached). *)}typefield_state=|PromiseFieldofpromise_field|ForwardingFieldofcapletfieldpath(p:#struct_ref_internal)=object(self:#field_cap)valmutablestate=PromiseField{sr=p;path;on_release=Queue.create()}valid=Debug.OID.next()methodcallresultsmsg=matchstatewith|PromiseFieldp->p.sr#pipelinep.pathresultsmsg|ForwardingFieldc->c#callresultsmsgmethodppf=matchstatewith|PromiseFieldp->Fmt.pff"field(%a)%t"Debug.OID.ppid(p.sr#field_ppp.path)|ForwardingFieldc->Fmt.pff"field(%a) -> %t"Debug.OID.ppidc#ppmethodupdate_rcd=matchstatewith|ForwardingFieldc->c#update_rcd|PromiseFieldp->p.sr#field_update_rcp.pathdmethodwhen_releasedfn=matchstatewith|PromiseFieldp->Queue.addfnp.on_release|ForwardingFieldx->x#when_releasedfnmethodresolvecap=Log.debug(funf->f"Resolved field(%a) to %t"Debug.OID.ppidcap#pp);matchstatewith|ForwardingField_->failwith"Field already resolved!"|PromiseFieldp->state<-ForwardingFieldcap;Queue.iter(funfn->cap#when_releasedfn)p.on_releasemethodshortest=matchstatewith|ForwardingFieldc->c#shortest|PromiseField_->(self:>cap)methodblocker=matchstatewith|ForwardingFieldc->c#blocker|PromiseFieldp->p.sr#field_blockerp.pathmethodproblem=matchstatewith|ForwardingFieldc->c#problem|PromiseField_->Nonemethodwhen_more_resolvedfn=matchstatewith|ForwardingFieldc->c#when_more_resolvedfn|PromiseFieldp->p.sr#field_when_resolvedp.pathfnmethodcheck_invariants=matchstatewith|ForwardingFieldc->c#check_invariants|PromiseFieldp->p.sr#field_check_invariantsp.pathmethodsealed_dispatchbrand=matchstatewith|ForwardingField_->None|PromiseFieldp->p.sr#field_sealed_dispatchp.pathbrandendclassvirtual['promise]tinit=object(self:'self)constraint'self=#C.struct_refconstraint'self=#C.struct_resolvervalmutablestate=Unresolved{target=init;fields=Field_map.empty;cycle_detector=None;when_resolved=Queue.create();rc=RC.one;}valvirtualname:string(* e.g. "local-promise" *)valmutableblocker=Nonevalid=Debug.OID.next()methodprivatevirtualpp_unresolved:'promiseFmt.tmethodprivatevirtualdo_pipeline:'promise->Wire.Path.t->C.struct_resolver->Wire.Request.t->unitmethodprivatevirtualon_resolve:'promise->struct_ref->unit(* We have just started forwarding. Send any queued data onwards. *)methodprivatevirtualsend_cancel:'promise->unit(* There is no longer a need for this (unresolved) proxy's result. *)methodvirtualfield_sealed_dispatch:'a.Wire.Path.t->'aS.brand->'aoptionmethodprivatefield_resolved_f=()(** [field_resolved f] is called when [f] has been resolved. *)methodpipelinepathresultsmsg=dispatchstate~unresolved:(funx->self#do_pipelinex.targetpathresultsmsg)~forwarding:(funx->(x#cappath)#callresultsmsg)methodresponse=dispatchstate~unresolved:(fun_->None)~forwarding:(funx->x#response)methodblocker=dispatchstate~unresolved:(fun_->matchblockerwith|None->Some(self:>base_ref)|Somex->x#blocker)~forwarding:(funx->x#blocker)methodset_blocker(b:C.base_ref)=ifb#blocker=Some(self:>C.base_ref)thenError`Cycleelse(blocker<-Someb;Ok())methodclear_blocker=blocker<-Nonemethodcappath=dispatchstate~unresolved:(funu->letfield=matchField_map.find_optpathu.fieldswith|Somef->f|None->letcap=fieldpath(self:>struct_ref_internal)inletfield={cap;ref_count=RC.one}inu.fields<-Field_map.addpathfieldu.fields;(* Map takes initial ref *)C.inc_refself;(* Field takes ref on us too *)fieldinfield.ref_count<-RC.succfield.ref_count~pp:self#pp;(* Ref for user *)(field.cap:>cap))~forwarding:(funx->x#cappath)methodppf=matchstatewith|Unresolvedu->Fmt.pff"%s(%a, %a) -> %a%a"nameDebug.OID.ppidRC.ppu.rcself#pp_unresolvedu.targetpp_opt_blocked_onblocker|Forwardingx->Fmt.pff"%s(%a) -> %t"nameDebug.OID.ppidx#pp|Finished->Fmt.pff"%s(%a) (finished)"nameDebug.OID.ppidmethodresolvex=Log.debug(funf->f"@[Updating: %t@\n\
@ to: -> %t"self#ppx#pp);matchstatewith|Finished->dec_refx(* Cancelled *)|Forwardingx->failwith(Fmt.str"Already forwarding (to %t)!"x#pp)|Unresolvedu->(* Check for cycles *)letx=letblocked_on_usr=r#blocker=Some(self:>base_ref)inifblocked_on_usxthencycle_err"Resolving:@, %t@,with:@, %t"self#ppx#ppelsematchx#responsewith|Some(Error_)|None->x|Some(Okpayload)->(* Only break the fields which have cycles, not the whole promise.
Otherwise, it can lead to out-of-order delivery where a message
that should have been delivered to a working target instead gets
dropped. Note also that fields can depend on other fields. *)letcaps=C.Response_payload.snapshot_capspayloadinletdetector_caps=Array.make(RO_array.lengthcaps)cycle_markerinu.cycle_detector<-Some(payload,detector_caps);letbreak_cyclesc=fori=0toArray.lengthdetector_caps-1dodetector_caps.(i)<-RO_array.get_exncapsidone;ifc#blocker=Some(self:>C.base_ref)thenC.broken_cap(Exception.v(Fmt.str"<cycle: %t>"c#pp))elsecinletfixed_caps=RO_array.mapbreak_cyclescapsinifRO_array.equal(=)fixed_capscapsthenxelse(RO_array.iterC.inc_reffixed_caps;C.dec_refx;C.return@@C.Response_payload.with_capsfixed_capspayload)instate<-Forwardingx;beginmatchRC.to_intu.rcwith|None->assertfalse(* Can't happen; we don't detect leaks *)|Somerc->x#update_rcrc;(* Transfer our ref-count to [x]. *)end;u.fields|>Field_map.iter(funpathf->letpp=self#field_pppathinletref_count=RC.predf.ref_count~ppin(* Count excluding our ref *)f.ref_count<-RC.zero;beginmatchRC.to_intref_countwith|None(* Field was leaked; shouldn't happen since we hold a copy anyway. *)|Some0->f.cap#resolveinvalid_cap(* No other users *)|Someref_count->(* Someone else is using it too *)letc=x#cappathin(* Increases ref by one *)(* Transfer our refs to the new target, excluding the one already addded by [x#cap]. *)c#update_rc(ref_count-1);f.cap#resolvecend;self#field_resolved(f.cap:>cap));self#on_resolveu.targetx;Queue.iter(funfn->fnx)u.when_resolved;letrefs_dropped=Field_map.cardinalu.fieldsinx#update_rc(-(refs_dropped+1))(* Also, we take ownership of [x]. *)methodresolver=(self:>C.struct_resolver)methodupdate_rcd=dispatchstate~unresolved:(funu->let{target;rc;fields;when_resolved;cycle_detector=_}=uinu.rc<-RC.sumrcd~pp:self#pp;ifRC.is_zerou.rcthen(assert(Field_map.is_emptyfields);state<-Finished;leterr=C.broken_struct`CancelledinQueue.iter(funfn->fnerr)when_resolved;self#send_canceltarget;))~forwarding:(funx->x#update_rcd)methodwhen_resolvedfn=dispatchstate~unresolved:(funu->Queue.add(funp->p#when_resolvedfn)u.when_resolved)~forwarding:(funx->x#when_resolvedfn)methodfield_blockerpath=matchstatewith|Unresolved{cycle_detector=Some(msg,caps);_}->beginmatchWire.Response.cap_indexmsgpathwith|Someiwheni>=0&&i<Array.lengthcaps->(* We're in the process of resolving.
Pretend that we've already resolved for the purpose of finding the blocker,
because one field might depend on another, and that's OK. But also disable
this path from being followed again, in case there's a cycle. *)letcap=caps.(i)inifcap=cycle_markerthenSome(self:>C.base_ref)else(caps.(i)<-cycle_marker;cap#blocker)|_->Noneend|_->self#blockermethodfield_when_resolvedifn=letfn:Response_payload.tor_error->unit=function|Error(`Exceptione)->fn(C.broken_cape)|Error`Cancelled->fn(C.broken_capException.cancelled)|Okpayload->matchC.Response_payload.fieldpayloadiwith|None->fnC.null|Somecap->fncapindispatchstate~unresolved:(funu->Queue.add(funp->p#when_resolvedfn)u.when_resolved)~forwarding:(funx->x#when_resolvedfn)methodfield_update_rcpathd=dispatchstate~unresolved:(funu->(* When we resolve, we'll be holding references to all the caps in the resolution, so
so they must still be alive by the time we pass on any extra inc or dec refs. *)letf=Field_map.findpathu.fieldsinassert(f.ref_count>RC.one);(* rc can't be one because that's our reference *)letpp=self#field_pppathinf.ref_count<-RC.sumf.ref_countd~pp)~forwarding:(funx->letc=x#cappathin(* Increases rc by one *)c#update_rc(d-1))methodfield_dec_refpath=dispatchstate~unresolved:(funu->letf=Field_map.findpathu.fieldsinassert(f.ref_count>RC.one);(* rc can't be one because that's our reference *)letpp=self#field_pppathinf.ref_count<-RC.predf.ref_count~pp)~forwarding:(funx->letc=x#cappathin(* Increases ref by one *)c#update_rc(-2))methodprivateupdate_targettarget=dispatchstate~unresolved:(funu->u.target<-target)~forwarding:(fun_->failwith"Already forwarding!")methodfield_check_invariantsi=dispatchstate~unresolved:(funu->letf=Field_map.findiu.fieldsinassert(f.ref_count>RC.one);self#check_invariants)~forwarding:(fun_->Fmt.failwith"Promise is resolved, but field %a isn't!"Wire.Path.ppi)methodfield_pppathf=matchstatewith|Finished->Fmt.pff"Promise is finished, but field %a isn't!"Wire.Path.pppath|Forwarding_->Fmt.pff"Promise is resolved, but field %a isn't!"Wire.Path.pppath|Unresolvedu->letfield=Field_map.findpathu.fieldsinmatchRC.to_intfield.ref_countwith|None->Fmt.pff"(rc=LEAKED) -> #%a -> %t"Wire.Path.pppathself#pp|Somerc->(* (separate the ref-count that we hold on it) *)Fmt.pff"(rc=1+%d) -> #%a -> %t"(rc-1)Wire.Path.pppathself#ppmethodcheck_invariants=dispatchstate~unresolved:(funu->RC.check~pp:self#ppu.rc;Field_map.iter(funif->RC.checkf.ref_count~pp:(self#field_ppi))u.fields;matchblockerwith|Somexwhenx#blocker=None->Debug.invariant_broken(funf->Fmt.pff"%t is blocked on %t, but that isn't blocked!"self#ppx#pp)|_->())~forwarding:(funx->x#check_invariants)methodsealed_dispatch_=NoneinitializerLog.debug(funf->f"Created %t"self#pp)endend