123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167openCoreopenImportopenRaw_schedulermodulePriority=Linux_ext.PrioritymoduleWhen_finished=structtypet=|Notify_the_scheduler|Take_the_async_lock|Try_to_take_the_async_lock[@@derivingenumerate,sexp_of]letdefault=refTry_to_take_the_async_lockendletrun_after_scheduler_is_started~priority~thread~(when_finished:When_finished.t)~name~tf=letivar=Ivar.create()inletdoit()=(* At this point, we are in a thread-pool thread, not the async thread. *)letresult=Result.try_withfinletlocked=matchwhen_finishedwith|Take_the_async_lock->lockt;true|Notify_the_scheduler->false|Try_to_take_the_async_lock->(matchthread_pool_cpu_affinitytwith|Inherit->try_lockt|Cpuset_->(* If the user specified an affinity for the thread pool, they presumably intend
for Async jobs to be affinitized differently from thread-pool threads, so we
don't even attempt to run jobs on the thread-pool thread. *)false)iniflockedthenprotect~finally:(fun()->unlockt)~f:(fun()->Ivar.fillivarresult;have_lock_do_cyclet)elsethread_safe_enqueue_external_jobt(current_execution_contextt)(fun()->Ivar.fillivarresult)()in(matchthreadwith|None->ok_exn(Thread_pool.add_workt.thread_pooldoit?name?priority);ifThread_pool.num_threadst.thread_pool=0thenraise_s[%message"Async's thread pool was unable to create a single thread"~_:(Thread_pool.last_thread_creation_failuret.thread_pool:(Sexp.toption[@sexp.option]))]|Somehelper_thread->ok_exn(Thread_pool.add_work_for_helper_threadt.thread_poolhelper_threaddoit?name?priority));Ivar.readivar>>|Result.ok_exn;;letrun?priority?thread?(when_finished=!When_finished.default)?namef=match!Raw_scheduler.the_one_and_only_refwith|Initializedtwhent.is_running->run_after_scheduler_is_started~priority~thread~when_finished~name~tf|_->(* We use [bind unit ...] to force calls to [run_after_scheduler_is_started] to wait
until after the scheduler is started. We do this because
[run_after_scheduler_is_started] will cause things to run in other threads, and
when a job is finished in another thread, it will try to acquire the async lock and
manipulate async datastructures. This seems hard to think about if async hasn't
even started yet. *)Deferred.bind(return())~f:(fun()->run_after_scheduler_is_started~priority~thread~when_finished~name~t:(Raw_scheduler.t())f);;moduleHelper_thread=struct(* A wrapper around [Thread_pool]'s helper thread, so we can attach a finalizer. *)typet={thread_pool_helper_thread:Thread_pool.Helper_thread.t}[@@derivingfields,sexp_of](* Both [create] and [create_now] add Async finalizers to the returned helper thread so
that the thread can be added back to the set of worker threads when there are no
references to the helper thread and the thread has no pending work. Because
[Thread_pool.finished_with_helper_thread] needs to acquire the thread pool lock, it
cannot be run within an ordinary finalizer, since that could cause it to be run in a
context where the code interrupted by the GC might already be holding the thread pool
lock, which would result in a deadlock. Hence we use an Async finalizer -- this
causes the GC to merely schedule an Async job that calls
[Thread_pool.finished_with_helper_thread]. We don't attach the finalizer inside
[Thread_pool] because the thread pool doesn't know about Async, and in particular
doesn't know about Async finalizers. *)letcreate_internalschedulerthread_pool_helper_thread=letfinalize{thread_pool_helper_thread}=Thread_pool.finished_with_helper_threadscheduler.thread_poolthread_pool_helper_threadinlett={thread_pool_helper_thread}inadd_finalizer_exnschedulertfinalize;t;;letcreate_now?priority?name()=letscheduler=the_one_and_only~should_lock:trueinResult.map(Thread_pool.create_helper_threadscheduler.thread_pool?name?priority)~f:(funhelper_thread->create_internalschedulerhelper_thread);;letcreate?priority?name()=letscheduler=the_one_and_only~should_lock:trueinlet%maphelper_thread=run(fun()->Thread_pool.become_helper_threadscheduler.thread_pool?name?priority)increate_internalscheduler(ok_exnhelper_thread);;endletrun?priority?thread?when_finished?namef=letthread=Option.mapthread~f:Helper_thread.thread_pool_helper_threadinrun?priority?thread?when_finished?namef;;letsyscall~namef=run~name(fun()->Syscall.syscallf)letsyscall_exn~namef=run~name(fun()->Result.ok_exn(Syscall.syscallf))letpipe_of_squeuesq=letr,w=Pipe.create()in(* The functions are defined to avoid unnecessary allocation. *)letpull()=letq=Linked_queue.create()inSqueue.transfer_queuesqq;qinletreccontinueq=Linked_queue.iterq~f:(Pipe.write_without_pushbackw);Pipe.pushbackw>>>loop(* [run pull] runs [pull] in a thread, because [Squeue.transfer_queue] can block. *)andloop()=runpull>>>continueinloop();r;;