123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374(*
* Copyright (C) 2023 Thomas Leonard
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)moduleSuspended=Eio_utils.SuspendedmoduleZzz=Eio_utils.ZzzmoduleLf_queue=Eio_utils.Lf_queuemoduleFiber_context=Eio.Private.Fiber_contextmoduleTrace=Eio.Private.TracemoduleRcfd=Eio_unix.Private.RcfdmodulePoll=Iomux.Polltypeexit=[`Exit_scheduler](* The type of items in the run queue. *)typerunnable=|IO:runnable(* Reminder to check for IO *)|Thread:'aSuspended.t*'a->runnable(* Resume a fiber with a result value *)|Failed_thread:'aSuspended.t*exn->runnable(* Resume a fiber with an exception *)(* For each FD we track which fibers are waiting for it to become readable/writeable. *)typefd_event_waiters={read:unitSuspended.tLwt_dllist.t;write:unitSuspended.tLwt_dllist.t;}typet={(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)run_q:runnableLf_queue.t;poll:Poll.t;mutablepoll_maxi:int;(* The highest index ever used in [poll]. *)fd_map:(Unix.file_descr,fd_event_waiters)Hashtbl.t;(* When adding to [run_q] from another domain, this domain may be sleeping and so won't see the event.
In that case, [need_wakeup = true] and you must signal using [eventfd]. *)eventfd:Rcfd.t;(* For sending events. *)eventfd_r:Unix.file_descr;(* For reading events. *)mutableactive_ops:int;(* Exit when this is zero and [run_q] and [sleep_q] are empty. *)(* If [false], the main thread will check [run_q] before sleeping again
(possibly because an event has been or will be sent to [eventfd]).
It can therefore be set to [false] in either of these cases:
- By the receiving thread because it will check [run_q] before sleeping, or
- By the sending thread because it will signal the main thread later *)need_wakeup:boolAtomic.t;sleep_q:Zzz.t;(* Fibers waiting for timers. *)}(* The message to send to [eventfd] (any character would do). *)letwake_buffer=Bytes.of_string"!"(* This can be called from any systhread (including ones not running Eio),
and also from signal handlers or GC finalizers. It must not take any locks. *)letwakeupt=Atomic.sett.need_wakeupfalse;(* [t] will check [run_q] after getting the event below *)Rcfd.uset.eventfd~if_closed:ignore(* Domain has shut down (presumably after handling the event) *)(funfd->tryignore(Unix.single_writefdwake_buffer01:int)with|Unix.Unix_error((Unix.EAGAIN|EWOULDBLOCK),_,_)->(* If the pipe is full then a wake up is pending anyway. *)()|Unix.Unix_error(Unix.EPIPE,_,_)->(* We're shutting down; the event has already been processed. *)())(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)letenqueue_threadtkx=Lf_queue.pusht.run_q(Thread(k,x));ifAtomic.gett.need_wakeupthenwakeupt(* Safe to call from anywhere (other systhreads, domains, signal handlers, GC finalizers) *)letenqueue_failed_threadtkex=Lf_queue.pusht.run_q(Failed_thread(k,ex));ifAtomic.gett.need_wakeupthenwakeupt(* Can only be called from our own domain, so no need to check for wakeup. *)letenqueue_at_headtk=Lf_queue.push_headt.run_q(Thread(k,()))letget_waiterstfd=matchHashtbl.find_optt.fd_mapfdwith|Somex->x|None->letx={read=Lwt_dllist.create();write=Lwt_dllist.create()}inHashtbl.addt.fd_mapfdx;x(* The OS told us that the event pipe is ready. Remove events. *)letclear_event_fdt=letbuf=Bytes.create8in(* Read up to 8 events at a time *)letgot=Unix.readt.eventfd_rbuf0(Bytes.lengthbuf)inassert(got>0)(* Update [t.poll]'s entry for [fd] to match [waiters]. *)letupdatetwaitersfd=letfdi=Iomux.Util.fd_of_unixfdinletflags=matchnot(Lwt_dllist.is_emptywaiters.read),not(Lwt_dllist.is_emptywaiters.write)with|false,false->Poll.Flags.empty|true,false->Poll.Flags.pollin|false,true->Poll.Flags.pollout|true,true->Poll.Flags.(pollin+pollout)inifflags=Poll.Flags.emptythen(Poll.invalidate_indext.pollfdi;(* Try to find the new maxi, go back on index until we find the next
used slot, -1 means none in use. *)letreclower_maxi=function|-1->t.poll_maxi<--1|index->ifPoll.((get_fdt.pollindex)<>invalid_fd)thent.poll_maxi<-indexelselower_maxi(predindex)iniffdi=t.poll_maxithenlower_maxi(predfdi);Hashtbl.removet.fd_mapfd)else(Poll.set_indext.pollfdifdflags;iffdi>t.poll_maxithent.poll_maxi<-fdi)letresumetnode=t.active_ops<-t.active_ops-1;letk:unitSuspended.t=Lwt_dllist.getnodeinFiber_context.clear_cancel_fnk.fiber;enqueue_threadtk()(* Called when poll indicates that an event we requested for [fd] is ready. *)letreadyt_indexfdrevents=assert(notPoll.Flags.(memreventspollnval));iffd==t.eventfd_rthen(clear_event_fdt(* The scheduler will now look at the run queue again and notice any new items. *))else(letwaiters=Hashtbl.findt.fd_mapfdinletpending=Lwt_dllist.create()inifPoll.Flags.(memrevents(pollout+pollhup+pollerr))thenLwt_dllist.transfer_lwaiters.writepending;ifPoll.Flags.(memrevents(pollin+pollhup+pollerr))thenLwt_dllist.transfer_lwaiters.readpending;(* If pending has things, it means we modified the waiters, refresh our view *)ifnot(Lwt_dllist.is_emptypending)thenupdatetwaitersfd;Lwt_dllist.iter_node_r(resumet)pending)(* Switch control to the next ready continuation.
If none is ready, wait until we get an event to wake one and then switch.
Returns only if there is nothing to do and no active operations. *)letrecnextt:[`Exit_scheduler]=(* Wakeup any paused fibers *)matchLf_queue.popt.run_qwith|None->assertfalse(* We should always have an IO job, at least *)|SomeThread(k,v)->(* We already have a runnable task *)Fiber_context.clear_cancel_fnk.fiber;Suspended.continuekv|SomeFailed_thread(k,ex)->Fiber_context.clear_cancel_fnk.fiber;Suspended.discontinuekex|SomeIO->(* Note: be sure to re-inject the IO task before continuing! *)(* This is not a fair scheduler: timers always run before all other IO *)letnow=Mtime_clock.now()inmatchZzz.pop~nowt.sleep_qwith|`Duek->Lf_queue.pusht.run_qIO;(* Re-inject IO job in the run queue *)Suspended.continuek()(* A sleeping task is now due *)|`Wait_until_|`Nothingasnext_due->lettimeout=matchnext_duewith|`Wait_untiltime->lettime=Mtime.to_uint64_nstimeinletnow=Mtime.to_uint64_nsnowinletdiff_ns=Int64.subtimenowinPoll.Nanosecondsdiff_ns|`Nothing->Poll.Infiniteiniftimeout=Infinite&&t.active_ops=0then((* Nothing further can happen at this point. *)Lf_queue.closet.run_q;(* Just to catch bugs if something tries to enqueue later *)`Exit_scheduler)else(Atomic.sett.need_wakeuptrue;ifLf_queue.is_emptyt.run_qthen((* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)Trace.suspendBegin;letnready=tryPoll.ppoll_or_pollt.poll(t.poll_maxi+1)timeoutwithUnix.Unix_error(Unix.EINTR,_,"")->0inTrace.suspendEnd;Atomic.sett.need_wakeupfalse;Lf_queue.pusht.run_qIO;(* Re-inject IO job in the run queue *)Poll.iter_readyt.pollnready(readyt);nextt)else((* Someone added a new job while we were setting [need_wakeup] to [true].
They might or might not have seen that, so we can't be sure they'll send an event. *)Atomic.sett.need_wakeupfalse;Lf_queue.pusht.run_qIO;(* Re-inject IO job in the run queue *)nextt))letwith_schedfn=letrun_q=Lf_queue.create()inLf_queue.pushrun_qIO;letsleep_q=Zzz.create()inleteventfd_r,eventfd_w=Unix.pipe~cloexec:true()inUnix.set_nonblockeventfd_r;Unix.set_nonblockeventfd_w;leteventfd=Rcfd.makeeventfd_winletcleanup()=Unix.closeeventfd_r;letwas_open=Rcfd.closeeventfdinassertwas_openinletpoll=Poll.create()inletfd_map=Hashtbl.create10inlett={run_q;poll;poll_maxi=(-1);fd_map;eventfd;eventfd_r;active_ops=0;need_wakeup=Atomic.makefalse;sleep_q}inleteventfd_ri=Iomux.Util.fd_of_unixeventfd_rinPoll.set_indext.polleventfd_rieventfd_rPoll.Flags.pollin;ifeventfd_ri>t.poll_maxithent.poll_maxi<-eventfd_ri;matchfntwith|x->cleanup();x|exceptionex->letbt=Printexc.get_raw_backtrace()incleanup();Printexc.raise_with_backtraceexbtletawait_readablet(k:unitSuspended.t)fd=matchFiber_context.get_errork.fiberwith|Somee->Suspended.discontinueke|None->t.active_ops<-t.active_ops+1;letwaiters=get_waiterstfdinletwas_empty=Lwt_dllist.is_emptywaiters.readinletnode=Lwt_dllist.add_lkwaiters.readinifwas_emptythenupdatetwaitersfd;Fiber_context.set_cancel_fnk.fiber(funex->Lwt_dllist.removenode;ifLwt_dllist.is_emptywaiters.readthenupdatetwaitersfd;t.active_ops<-t.active_ops-1;enqueue_failed_threadtkex);nexttletawait_writablet(k:unitSuspended.t)fd=matchFiber_context.get_errork.fiberwith|Somee->Suspended.discontinueke|None->t.active_ops<-t.active_ops+1;letwaiters=get_waiterstfdinletwas_empty=Lwt_dllist.is_emptywaiters.writeinletnode=Lwt_dllist.add_lkwaiters.writeinifwas_emptythenupdatetwaitersfd;Fiber_context.set_cancel_fnk.fiber(funex->Lwt_dllist.removenode;ifLwt_dllist.is_emptywaiters.writethenupdatetwaitersfd;t.active_ops<-t.active_ops-1;enqueue_failed_threadtkex);nexttletget_enqueuetk=function|Okv->enqueue_threadtkv|Errorex->enqueue_failed_threadtkexletawait_timeoutt(k:unitSuspended.t)time=matchFiber_context.get_errork.fiberwith|Somee->Suspended.discontinueke|None->letnode=Zzz.addt.sleep_qtimekinFiber_context.set_cancel_fnk.fiber(funex->Zzz.removet.sleep_qnode;enqueue_failed_threadtkex);nexttletwith_optfnx=t.active_ops<-t.active_ops+1;matchfnxwith|r->t.active_ops<-t.active_ops-1;r|exceptionex->t.active_ops<-t.active_ops-1;raiseex[@@@alert"-unstable"]type_Effect.t+=Enter:(t->'aEio_utils.Suspended.t->[`Exit_scheduler])->'aEffect.tletenterfn=Effect.perform(Enterfn)letrun~extra_effectstmainx=letrecfork~new_fiber:fiberfn=letopenEffect.DeepinTrace.fiber(Fiber_context.tidfiber);match_withfn(){retc=(fun()->Fiber_context.destroyfiber;nextt);exnc=(funex->Fiber_context.destroyfiber;Printexc.raise_with_backtraceex(Printexc.get_raw_backtrace()));effc=fun(typea)(e:aEffect.t)->matchewith|Enterfn->Some(funk->matchFiber_context.get_errorfiberwith|Somee->discontinueke|None->fnt{Suspended.k;fiber})|Eio.Private.Effects.Get_context->Some(funk->continuekfiber)|Eio.Private.Effects.Suspendf->Some(funk->letk={Suspended.k;fiber}inletenqueue=get_enqueuetkinffiberenqueue;nextt)|Eio.Private.Effects.Fork(new_fiber,f)->Some(funk->letk={Suspended.k;fiber}inenqueue_at_headtk;fork~new_fiberf)|Eio_unix.Private.Await_readablefd->Some(funk->await_readablet{Suspended.k;fiber}fd)|Eio_unix.Private.Await_writablefd->Some(funk->await_writablet{Suspended.k;fiber}fd)|e->extra_effects.Effect.Deep.effce}inletresult=refNoneinlet`Exit_scheduler=letnew_fiber=Fiber_context.make_root()inDomain_local_await.using~prepare_for_await:Eio.Private.Dla.prepare_for_await~while_running:(fun()->fork~new_fiber(fun()->result:=Some(with_optmainx);))inmatch!resultwith|Somex->x|None->failwith"BUG in scheduler: deadlock detected"