123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141openEio.StdexceptionCancelletignore_cancel=function|Cancel->()|ex->raiseex(* Call this to cause the current [Lwt_engine.iter] to return. *)letready=ref(lazy())(* While the Lwt event loop is running, this is the switch that contains any fibres handling Lwt operations.
Lwt does not use structured concurrency, so it can spawn background threads without explicitly taking a
switch argument, which is why we need to use a global variable here. *)letloop_switch=refNoneletnotify()=Lazy.force!ready(* Run [fn] in a new fibre and return a lazy value that can be forced to cancel it. *)letfork_with_cancel~swfn=letcancel=refNoneinFibre.fork_sub~sw~on_error:ignore_cancel(funsw->cancel:=Some(lazy(trySwitch.failswCancelwithInvalid_argument_->()));fn());(* The forked fibre runs first, so [cancel] must be set by now. *)Option.get!cancelletmake_engine~sw~clock=objectinheritLwt_engine.abstractmethodprivatecleanup=trySwitch.failswExitwithInvalid_argument_->()(* Already destroyed *)methodprivateregister_readablefdcallback=fork_with_cancel~sw@@fun()->whiletruedoEio_unix.await_readablefd;Eio.Cancel.protect(fun()->callback();notify())donemethodprivateregister_writablefdcallback=fork_with_cancel~sw@@fun()->whiletruedoEio_unix.await_writablefd;Eio.Cancel.protect(fun()->callback();notify())donemethodprivateregister_timerdelayrepeatcallback=fork_with_cancel~sw@@fun()->ifrepeatthen(whiletruedoEio.Time.sleepclockdelay;Eio.Cancel.protect(fun()->callback();notify())done)else(Eio.Time.sleepclockdelay;Eio.Cancel.protect(fun()->callback();notify()))methoditerblock=ifblockthen(letp,r=Promise.create()inready:=lazy(Promise.resolver());Promise.awaitp)else(Fibre.yield())endtypeno_return=|(* Run an Lwt event loop until [user_promise] resolves. Raises [Exit] when done. *)letmain~clockuser_promise:no_return=Switch.run@@funsw->ifOption.is_some!loop_switchtheninvalid_arg"Lwt_eio event loop already running";Switch.on_releasesw(fun()->loop_switch:=None);loop_switch:=Somesw;Lwt_engine.set(make_engine~sw~clock);(* An Eio fibre may resume an Lwt thread while in [Lwt_engine.iter] and forget to call [notify].
If that called [Lwt.pause] then it wouldn't wake up, so handle this common case here. *)Lwt.register_pause_notifier(fun_->notify());Lwt_main.runuser_promise;(* Stop any event fibres still running: *)raiseExitletwith_event_loop~clockfn=letp,r=Lwt.wait()inSwitch.run@@funsw->Fibre.fork~sw(fun()->matchmain~clockpwith|_->.|exceptionExit->());Fun.protectfn~finally:(fun()->Lwt.wakeupr();notify())letget_loop_switch()=match!loop_switchwith|Somesw->sw|None->Fmt.failwith"Must be called from within Lwt_eio.with_event_loop!"modulePromise=structletawait_lwtlwt_promise=letp,r=Promise.create()inLwt.on_anylwt_promise(Promise.resolve_okr)(Promise.resolve_errorr);Promise.await_exnpletawait_eioeio_promise=letsw=get_loop_switch()inletp,r=Lwt.wait()inFibre.fork~sw(fun()->Lwt.wakeupr(Promise.awaiteio_promise);notify());pletawait_eio_resulteio_promise=letsw=get_loop_switch()inletp,r=Lwt.wait()inFibre.fork~sw(fun()->matchPromise.awaiteio_promisewith|Okx->Lwt.wakeuprx;notify()|Errorex->Lwt.wakeup_exnrex;notify());pendletrun_eiofn=letsw=get_loop_switch()inletp,r=Lwt.wait()inFibre.fork~sw(fun()->matchfn()with|x->Lwt.wakeuprx;notify()|exceptionex->Lwt.wakeup_exnrex;notify());p