12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273moduleFiber_context=Eio.Private.Fiber_contextmoduleLf_queue=Eio_utils.Lf_queueexceptionDeadlock_detected(* The scheduler could just return [unit], but this is clearer. *)typeexit=Exit_schedulertypet={(* Suspended fibers waiting to run again.
[Lf_queue] is like [Stdlib.Queue], but is thread-safe (lock-free) and
allows pushing items to the head too, which we need. *)run_q:(unit->exit)Lf_queue.t;}(* Resume the next runnable fiber, if any. *)letschedulet:exit=matchLf_queue.popt.run_qwith|Somef->f()|None->Exit_scheduler(* Finished (or deadlocked) *)(* Run [main] in an Eio main loop. *)letrunmain=lett={run_q=Lf_queue.create()}inletrecfork~new_fiber:fiberfn=(* Create a new fiber and run [fn] in it. *)Effect.Deep.match_withfn(){retc=(fun()->Fiber_context.destroyfiber;schedulet);exnc=(funex->letbt=Printexc.get_raw_backtrace()inFiber_context.destroyfiber;Printexc.raise_with_backtraceexbt);effc=fun(typea)(e:aEffect.t):((a,exit)Effect.Deep.continuation->exit)option->matchewith|Eio.Private.Effects.Suspendf->Some(funk->(* Ask [f] to register whatever callbacks are needed to resume the fiber.
e.g. it might register a callback with a promise, for when that's resolved. *)ffiber(funresult->(* The fiber is ready to run again. Add it to the queue. *)Lf_queue.pusht.run_q(fun()->(* Resume the fiber. *)Fiber_context.clear_cancel_fnfiber;matchresultwith|Okv->Effect.Deep.continuekv|Errorex->Effect.Deep.discontinuekex));(* Switch to the next runnable fiber while this one's blocked. *)schedulet)|Eio.Private.Effects.Fork(new_fiber,f)->Some(funk->(* Arrange for the forking fiber to run immediately after the new one. *)Lf_queue.push_headt.run_q(Effect.Deep.continuek);(* Create and run the new fiber (using fiber context [new_fiber]). *)fork~new_fiberf)|Eio.Private.Effects.Get_context->Some(funk->Effect.Deep.continuekfiber)|_->None}inletnew_fiber=Fiber_context.make_root()inletresult=refNoneinletExit_scheduler=Domain_local_await.using~prepare_for_await:Eio.Private.Dla.prepare_for_await~while_running:(fun()->fork~new_fiber(fun()->result:=Some(main())))inmatch!resultwith|None->raiseDeadlock_detected|Somex->x