123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116moduleFiber_context=Eio.Private.Fiber_contextmoduleTrace=Eio.Private.TracemoduleLf_queue=Eio_utils.Lf_queuemoduleSuspended=Eio_utils.Suspended(* Adds tracing to continuations *)exceptionDeadlock_detected(* The scheduler could just return [unit], but this is clearer. *)typeexit=[`Exit_scheduler]typestdenv=<clock:Clock.t;mono_clock:Clock.Mono.t;debug:Eio.Debug.t;backend_id:string;>typet={(* Suspended fibers waiting to run again.
[Lf_queue] is like [Stdlib.Queue], but is thread-safe (lock-free) and
allows pushing items to the head too, which we need. *)run_q:(unit->exit)Lf_queue.t;mono_clock:Clock.Mono.t;}moduleWall_clock=structtypet=Clock.Mono.ttypetime=floatletwall_of_mtimem=Int64.to_float(Mtime.to_uint64_nsm)/.1e9letwall_to_mtimew=Mtime.of_uint64_ns(Int64.of_float(w*.1e9))letnowt=wall_of_mtime(Eio.Time.Mono.nowt)letsleep_untilttime=Eio.Time.Mono.sleep_untilt(wall_to_mtimetime)endletwall_clock=lethandler=Eio.Time.Pi.clock(moduleWall_clock)infunmono_clock->Eio.Resource.T(mono_clock,handler)(* Resume the next runnable fiber, if any. *)letrecschedulet:exit=matchLf_queue.popt.run_qwith|Somef->f()|None->(* Nothing is runnable. Try advancing the clock. *)ifClock.Mono.try_advancet.mono_clockthenscheduletelse`Exit_scheduler(* Finished (or deadlocked) *)(* Run [main] in an Eio main loop. *)letrun_fullmain=letmono_clock=Clock.Mono.make()inletclock=wall_clockmono_clockinletstdenv=object(_:stdenv)methodclock=clockmethodmono_clock=mono_clockmethoddebug=Eio.Private.Debug.vmethodbackend_id="mock"endinlett={run_q=Lf_queue.create();mono_clock}inletrecfork~new_fiber:fiberfn=Trace.fiber(Fiber_context.tidfiber);(* Create a new fiber and run [fn] in it. *)Effect.Deep.match_withfn(){retc=(fun()->Fiber_context.destroyfiber;schedulet);exnc=(funex->letbt=Printexc.get_raw_backtrace()inFiber_context.destroyfiber;Printexc.raise_with_backtraceexbt);effc=fun(typea)(e:aEffect.t):((a,exit)Effect.Deep.continuation->exit)option->matchewith|Eio.Private.Effects.Suspendf->Some(funk->letk={Suspended.k;fiber}in(* Ask [f] to register whatever callbacks are needed to resume the fiber.
e.g. it might register a callback with a promise, for when that's resolved. *)ffiber(funresult->(* The fiber is ready to run again. Add it to the queue. *)Lf_queue.pusht.run_q(fun()->(* Resume the fiber. *)Fiber_context.clear_cancel_fnfiber;matchresultwith|Okv->Suspended.continuekv|Errorex->Suspended.discontinuekex));(* Switch to the next runnable fiber while this one's blocked. *)schedulet)|Eio.Private.Effects.Fork(new_fiber,f)->Some(funk->letk={Suspended.k;fiber}in(* Arrange for the forking fiber to run immediately after the new one. *)Lf_queue.push_headt.run_q(Suspended.continuek);(* Create and run the new fiber (using fiber context [new_fiber]). *)fork~new_fiberf)|Eio.Private.Effects.Get_context->Some(funk->Effect.Deep.continuekfiber)|_->None}inletnew_fiber=Fiber_context.make_root()inletresult=refNoneinlet`Exit_scheduler=Domain_local_await.using~prepare_for_await:Eio_utils.Dla.prepare_for_await~while_running:(fun()->fork~new_fiber(fun()->result:=Some(mainstdenv)))inmatch!resultwith|None->raiseDeadlock_detected|Somex->xletrunfn=run_full(fun_->fn())