123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223exceptionCancelled=Exn.CancelledexceptionCancel_hook_failed=Exn.Cancel_hook_failedtypestate=|On|Cancellingofexn*Printexc.raw_backtrace|Finished(* There is a tree of cancellation contexts for each domain.
A fiber is always in exactly one context, but can move to a new child and back (see [sub]).
While a fiber is performing a cancellable operation, it sets a cancel function.
When a context is cancelled, we call each fiber's cancellation function (first replacing it with [ignore]).
Cancelling always happens from the fiber's own domain.
An operation may either finish normally or be cancelled (not both).
If a function can succeed in a separate domain,
the user's cancel function is responsible for ensuring that this is done atomically. *)typet={mutablestate:state;children:tLwt_dllist.t;fibers:fiber_contextLwt_dllist.t;protected:bool;domain:Domain.id;(* Prevent access from other domains *)}andfiber_context={tid:Trace.id;mutablecancel_context:t;mutablecancel_node:fiber_contextLwt_dllist.nodeoption;(* Our entry in [cancel_context.fibers] *)mutablecancel_fn:exn->unit;(* Encourage the current operation to finish *)mutablevars:Hmap.t;}type_Effect.t+=Get_context:fiber_contextEffect.tletpp_stateft=beginmatcht.statewith|On->Fmt.stringf"on"|Cancelling(ex,_)->Fmt.pff"cancelling(%a)"Fmt.exnex|Finished->Fmt.stringf"finished"end;ift.protectedthenFmt.pff" (protected)"letpp_fiberffiber=Fmt.pff"%d"(fiber.tid:>int)letpp_lwt_dlist~sepppft=letfirst=reftrueint|>Lwt_dllist.iter_l(funitem->if!firstthenfirst:=falseelsesepf();ppfitem;)letrecdumpft=Fmt.pff"@[<v2>%a [%a]%a@]"pp_statet(pp_lwt_dlist~sep:(Fmt.any",")pp_fiber)t.fiberspp_childrent.childrenandpp_childrenfts=ts|>Lwt_dllist.iter_l(funt->Fmt.cutf();dumpft)letis_ont=matcht.statewith|On->true|Cancelling_|Finished->falseletcheckt=matcht.statewith|On->()|Cancelling(ex,_)->raise(Cancelledex)|Finished->invalid_arg"Cancellation context finished!"letget_errort=matcht.statewith|On->None|Cancelling(ex,_)->Some(Cancelledex)|Finished->Some(Invalid_argument"Cancellation context finished!")letis_finishedt=matcht.statewith|Finished->true|On|Cancelling_->falseletmove_fiber_totfiber=letnew_node=Lwt_dllist.add_rfibert.fibersin(* Add to new context *)fiber.cancel_context<-t;Option.iterLwt_dllist.removefiber.cancel_node;(* Remove from old context *)fiber.cancel_node<-Somenew_node(* Note: the new value is not linked into the cancellation tree. *)letcreate~protected=letchildren=Lwt_dllist.create()inletfibers=Lwt_dllist.create()in{state=Finished;children;protected;fibers;domain=Domain.self()}(* Links [t] into the tree as a child of [parent] and returns a function to remove it again. *)letactivatet~parent=assert(t.state=Finished);assert(parent.state<>Finished);t.state<-On;letnode=Lwt_dllist.add_rtparent.childreninfun()->assert(parent.state<>Finished);t.state<-Finished;Lwt_dllist.removenode(* Runs [fn] with a fresh cancellation context. *)letwith_cc~ctx:fiber~parent~protectedfn=ifnotprotectedthencheckparent;lett=create~protectedinletdeactivate=activatet~parentinmove_fiber_totfiber;letcleanup()=move_fiber_toparentfiber;deactivate()inmatchfntwith|x->cleanup();x|exceptionex->cleanup();raiseexletprotectfn=letctx=Effect.performGet_contextinwith_cc~ctx~parent:ctx.cancel_context~protected:true@@fun_->(* Note: there is no need to check the new context after [fn] returns;
the goal of cancellation is only to finish the thread promptly, not to report the error.
We also do not check the parent context, to make sure the caller has a chance to handle the result. *)fn()(* Mark the cancellation tree rooted at [t] as Cancelling (stopping at protected sub-contexts),
and return a list of all fibers in the newly-cancelling contexts. Since modifying the cancellation
tree can only be done from our domain, this is effectively an atomic operation. Once it returns,
new (non-protected) fibers cannot be added to any of the cancelling contexts. *)letreccancel_internaltexacc_fibers=matcht.statewith|Finished->invalid_arg"Cancellation context finished!"|Cancelling_->acc_fibers|On->letbt=Printexc.get_raw_backtrace()int.state<-Cancelling(ex,bt);letacc_fibers=Lwt_dllist.fold_rList.const.fibersacc_fibersinLwt_dllist.fold_r(cancel_childex)t.childrenacc_fibersandcancel_childextacc=ift.protectedthenaccelsecancel_internaltexaccletcheck_our_domaint=ifDomain.self()<>t.domaintheninvalid_arg"Cancellation context accessed from wrong domain!"letcanceltex=check_our_domaint;letfibers=cancel_internaltex[]inletcex=Cancelledexinletrecaux=function|[]->[]|x::xs->letfn=x.cancel_fninx.cancel_fn<-ignore;matchfncexwith|()->auxxs|exceptionex2->ex2::auxxsiniffibers<>[]then(matchauxfiberswith|[]->()|exns->raise(Cancel_hook_failedexns))letsubfn=letctx=Effect.performGet_contextinletparent=ctx.cancel_contextinwith_cc~ctx~parent~protected:false@@funt->fnt(* Like [sub], but it's OK if the new context is cancelled.
(instead, return the parent context on exit so the caller can check that) *)letsub_uncheckedfn=letctx=Effect.performGet_contextinletparent=ctx.cancel_contextinwith_cc~ctx~parent~protected:false@@funt->fnt;parentmoduleFiber_context=structtypet=fiber_contextlettidt=t.tidletcancellation_contextt=t.cancel_contextletget_errort=get_errort.cancel_contextletset_cancel_fntfn=t.cancel_fn<-fnletclear_cancel_fnt=t.cancel_fn<-ignoreletmake~cc~vars=lettid=Trace.mint_id()inTrace.createtidFiber;lett={tid;cancel_context=cc;cancel_node=None;cancel_fn=ignore;vars}int.cancel_node<-Some(Lwt_dllist.add_rtcc.fibers);tletmake_root()=letcc=create~protected:falseincc.state<-On;make~cc~vars:Hmap.emptyletdestroyt=Option.iterLwt_dllist.removet.cancel_nodeletvarst=t.varsletget_vars()=vars(Effect.performGet_context)letwith_varstvarsfn=letold_vars=t.varsint.vars<-vars;letcleanup()=t.vars<-old_varsinmatchfn()with|x->cleanup();x|exceptionex->cleanup();raiseexend