1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889openPicostypet={fiber:Fiber.t;mutex:Mutex.t;condition:Condition.t}letcreate_packed~forbidpacked=letfiber=Fiber.create_packed~forbidpackedinletmutex=Mutex.create()inletcondition=Condition.create()in{fiber;mutex;condition}letrecblocktriggert=ifnot(Trigger.is_signaledtrigger)thenbegin(* We block fibers (or threads) on a per thread mutex and condition. *)Mutex.lockt.mutex;matchifnot(Trigger.is_signaledtrigger)then(* We assume that there is no poll point after the above [Mutex.lock]
and before the below [Condition.wait] is ready to be woken up by a
[Condition.broadcast]. *)Condition.waitt.conditiont.mutexwith|()->Mutex.unlockt.mutex;blocktriggert|exceptionexn->(* Condition.wait may be interrupted by asynchronous exceptions and we
must make sure to unlock even in that case. *)Mutex.unlockt.mutex;raiseexnendletresumetriggert_=let_is_canceled:bool=Fiber.unsuspendt.fibertriggerin(* This will be called when the trigger is signaled. We simply broadcast on
the per thread condition variable. *)beginmatchMutex.lockt.mutexwith|()->Mutex.unlockt.mutex|exceptionSys_error_->(* This should mean that [resume] was called from a signal handler
running on the scheduler thread. If the assumption about not having
poll points holds, the [Condition.broadcast] should now be able to
wake up the [Condition.wait] in the scheduler. *)()end;Condition.broadcastt.conditionlet[@alert"-handler"]recawaitttrigger=ifFiber.try_suspendt.fibertriggerttresumethenblocktriggert;Fiber.canceledt.fiberandcurrentt=(* The current handler must never propagate cancelation, but it would be
possible to yield here to run some other fiber before resuming the current
fiber. *)t.fiberandyieldt=(* In other handlers we need to account for cancelation. *)Fiber.checkt.fiber;Thread.yield()andcancel_after:typea._->aComputation.t->_=(* We need an explicit type signature to allow OCaml to generalize the tyoe as
all of the handlers are in a single recursive definition. *)funtcomputation~secondsexn_bt->Fiber.checkt.fiber;Select.cancel_aftercomputation~secondsexn_btandspawn:typea._->forbid:bool->aComputation.t->_=funt~forbidcomputationmains->Fiber.checkt.fiber;letpacked=Computation.Packedcomputationinmains|>List.iter@@funmain->Thread.create(fun()->(* We need to (recursively) install the handler on each new thread
that we create. *)Handler.usinghandler(create_packed~forbidpacked)main)()|>ignoreandhandler=Handler.{current;spawn;yield;cancel_after;await}letrun~forbidmain=Select.check_configured();letpacked=Computation.Packed(Computation.create())inHandler.usinghandler(create_packed~forbidpacked)main