123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140openPicostypet={fiber:Fiber.t;mutex:Mutex.t;condition:Condition.t}letcreate_packed~forbidpacked=letfiber=Fiber.create_packed~forbidpackedinletmutex=Mutex.create()inletcondition=Condition.create()in{fiber;mutex;condition}letrecblocktriggert=ifnot(Trigger.is_signaledtrigger)thenbegin(* We block fibers (or threads) on a per thread mutex and condition. *)Mutex.lockt.mutex;matchifnot(Trigger.is_signaledtrigger)then(* We assume that there is no poll point after the above [Mutex.lock]
and before the below [Condition.wait] is ready to be woken up by a
[Condition.broadcast]. *)Condition.waitt.conditiont.mutexwith|()->Mutex.unlockt.mutex;blocktriggert|exceptionasync_exn->(* Condition.wait may be interrupted by asynchronous exceptions and we
must make sure to unlock even in that case. *)Mutex.unlockt.mutex;raiseasync_exnendletresumetriggert_=let_is_canceled:bool=Fiber.unsuspendt.fibertriggerin(* This will be called when the trigger is signaled. We simply broadcast on
the per thread condition variable. *)beginmatchMutex.lockt.mutexwith|()->Mutex.unlockt.mutex|exceptionSys_error_->(* This should mean that [resume] was called from a signal handler
running on the scheduler thread. If the assumption about not having
poll points holds, the [Condition.broadcast] should now be able to
wake up the [Condition.wait] in the scheduler. *)()end;Condition.broadcastt.conditionlet[@alert"-handler"]recawaitttrigger=ifFiber.try_suspendt.fibertriggerttresumethenblocktriggert;Fiber.canceledt.fiberandcurrentt=(* The current handler must never propagate cancelation, but it would be
possible to yield here to run some other fiber before resuming the current
fiber. *)t.fiberandyieldt=(* In other handlers we need to account for cancelation. *)Fiber.checkt.fiber;Thread.yield()andcancel_after:typea._->aComputation.t->_=(* We need an explicit type signature to allow OCaml to generalize the tyoe as
all of the handlers are in a single recursive definition. *)funtcomputation~secondsexn_bt->Fiber.checkt.fiber;Select.cancel_aftercomputation~secondsexn_btandspawn:typea._->forbid:bool->aComputation.t->_=funt~forbidcomputationmains->Fiber.checkt.fiber;letpacked=Computation.Packedcomputationinmatchmainswith|[main]->Thread.create(fun()->(* We need to (recursively) install the handler on each new thread
that we create. *)Handler.usinghandler(create_packed~forbidpacked)main)()|>ignore|mains->begin(* We try to be careful to implement the all-or-nothing behaviour based on
the assumption that we may run out of threads well before we run out of
memory. In a thread pool based scheduler this should actually not
require special treatment. *)letall_or_nothing=ref`Waitinmatchmains|>List.iter@@funmain->Thread.create(fun()->if!all_or_nothing==`WaitthenbeginMutex.lockt.mutex;matchwhilematch!all_or_nothingwith|`Wait->Condition.waitt.conditiont.mutex;true|`All|`Nothing->falsedo()donewith|()->Mutex.unlockt.mutex|exceptionasync_exn->(* Condition.wait may be interrupted by asynchronous
exceptions and we must make sure to unlock even in that
case. *)Mutex.unlockt.mutex;raiseasync_exnend;if!all_or_nothing==`Allthen(* We need to (recursively) install the handler on each new
thread that we create. *)Handler.usinghandler(create_packed~forbidpacked)main)()|>ignorewith|()->Mutex.lockt.mutex;all_or_nothing:=`All;Mutex.unlockt.mutex;Condition.broadcastt.condition|exceptionexn->Mutex.lockt.mutex;all_or_nothing:=`Nothing;Mutex.unlockt.mutex;Condition.broadcastt.condition;raiseexnendandhandler=Handler.{current;spawn;yield;cancel_after;await}letrun?(forbid=false)main=Select.check_configured();letpacked=Computation.Packed(Computation.create())inHandler.usinghandler(create_packed~forbidpacked)main