123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182openCoreopenImportopenRaw_schedulerletdebug=Debug.thread_safeletrun_holding_async_lock(typeab)?(wakeup_scheduler=true)t(f:unit->a)~(finish:(a,exn)Result.t->b):b=ifdebugthenDebug.log"run_holding_async_lock"t[%sexp_of:t];ifnot(am_holding_lockt)thenlockt;protect~finally:(fun()->ifwakeup_schedulerthenthread_safe_wakeup_schedulert;unlockt)~f:(fun()->(* We run [f] within the [main_execution_context] so that any errors are sent to its
monitor, rather than whatever random monitor happened to be in effect. *)finish(with_execution_contexttKernel_scheduler.main_execution_context~f:(fun()->Result.try_withf)));;letensure_in_a_threadtfunction_=ifis_main_thread()thenraise_s[%message"cannot call from the main thread"(function_:string)];ifam_holding_locktthenraise_s[%message"cannot call while holding the async lock"(function_:string)];;letrun_in_async_with_optional_cycle?wakeup_schedulertf=ifdebugthenDebug.log"run_in_async_with_optional_cycle"t[%sexp_of:t];ensure_in_a_threadt"run_in_async_with_optional_cycle";run_holding_async_lock?wakeup_schedulertf~finish:(function|Errorexn->Errorexn|Ok(maybe_run_a_cycle,a)->(matchmaybe_run_a_cyclewith|`Do_not_run_a_cycle->()|`Run_a_cycle->have_lock_do_cyclet);Oka);;letblock_on_asynctf=ifdebugthenDebug.log"block_on_async"t[%sexp_of:t];(* We disallow calling [block_on_async] if the caller is running inside async. This can
happen if one is the scheduler, or if one is in some other thread that has used, e.g.
[run_in_async] to call into async and run a cycle. We do however, want to allow the
main thread to call [block_on_async], in which case it should release the lock and
allow the scheduler, which is running in another thread, to run. *)ifi_am_the_schedulert||(am_holding_lockt&¬(is_main_thread()))thenraise_s[%message"called [block_on_async] from within async"];(* While [block_on_async] is blocked, the Async scheduler may run and set the execution
context. So we save and restore the execution context, if we're in the main thread.
The restoration is necessary because subsequent code in the main thread can do
operations that rely on the execution context. *)letexecution_context=Kernel_scheduler.current_execution_contextt.kernel_schedulerin(* Create a scheduler thread if the scheduler isn't already running. *)ifnott.is_runningthen(t.is_running<-true;(* Release the Async lock if necessary, so that the scheduler can acquire it. *)ifam_holding_locktthenunlockt;letscheduler_ran_a_job=Thread_safe_ivar.create()inupon(return())(fun()->Thread_safe_ivar.fillscheduler_ran_a_job());ignore(Core.Thread.create(fun()->Exn.handle_uncaught~exit:true(fun()->lockt;never_returns(be_the_schedulert)))():Core.Thread.t);(* Block until the scheduler has run the above job. *)Thread_safe_ivar.readscheduler_ran_a_job);letmaybe_blocked=run_holding_async_lockt(fun()->Monitor.try_withf~name:"block_on_async")~finish:(funres->matchreswith|Errorexn->`Available(Errorexn)|Okd->(matchDeferred.peekdwith|Somev->`Availablev|None->have_lock_do_cyclet;(matchDeferred.peekdwith|Somev->`Availablev|None->letq=Squeue.create1inupond(funv->Squeue.push_uncondqv);(* Squeue.pop can block, so we have to do it outside async *)`Blocked_wait_on_squeueq)))inletres=matchmaybe_blockedwith|`Availablev->v|`Blocked_wait_on_squeueq->(* [run_holding_async_lock] released the lock. If the scheduler wasn't already
running when [block_on_async] was called, then we started it above. So, the
scheduler is running, and will eventually run the job to put something on the
squeue. So, it's OK to block waiting for it. *)Squeue.popqin(* If we're the main thread, we should lock the scheduler for the rest of main, to
prevent the scheduler, which is now running in another thread, from interfering with
the main thread. We also restore the execution context, so that the code in the main
thread will be in the same execution context as before it called [block_on_async].
The restored execution context will usually be [Execution_context.main], but need not
be, if the user has done operations that adjust the current execution context,
e.g. [Monitor.within]. If we're not in the main thread, the we don't need to
and cannot restore the execution context, because we do not hold the Async lock. *)ifis_main_thread()then(lockt;(* While [block_on_async] is blocked, Async can run and set the execution context. So
we restore the execution context, if we're in the main thread. The restoration is
necessary because subsequent code in the main thread can do operations that rely on
the execution context. *)Kernel_scheduler.set_execution_contextt.kernel_schedulerexecution_context);res;;letblock_on_async_exntf=Result.ok_exn(block_on_asynctf)letreset_schedulert=ifdebugthenDebug.log"reset_scheduler"t[%sexp_of:t];ifi_am_the_schedulert||(am_holding_lockt&¬(is_main_thread()))thenraise_s[%message"called [reset_scheduler] from within async"];ifam_holding_locktthenunlockt;Raw_scheduler.thread_safe_reset();;letrun_in_async?wakeup_schedulertf=ifdebugthenDebug.log"run_in_async"t[%sexp_of:t];ensure_in_a_threadt"run_in_async";run_holding_async_lock?wakeup_schedulertf~finish:Fn.id;;letrun_in_async_exn?wakeup_schedulertf=Result.ok_exn(run_in_async?wakeup_schedulertf);;letrun_in_async_waittf=ifdebugthenDebug.log"run_in_async_wait"t[%sexp_of:t];ensure_in_a_threadt"run_in_async_wait";block_on_asynctf;;letrun_in_async_wait_exntf=Result.ok_exn(run_in_async_waittf)letdeferredt=letivar=ifam_holding_locktthenIvar.create()elserun_holding_async_locktIvar.create~finish:Result.ok_exninletfillx=run_in_async_exnt(fun()->Ivar.fillivarx)inIvar.readivar,fill;;lett()=the_one_and_only~should_lock:falseletam_holding_async_lock()=am_holding_lock(t())letdeferred()=deferred(t())letrun_in_async_with_optional_cycle?wakeup_schedulerf=run_in_async_with_optional_cycle?wakeup_scheduler(t())f;;letrun_in_async?wakeup_schedulerf=run_in_async?wakeup_scheduler(t())fletrun_in_async_exn?wakeup_schedulerf=run_in_async_exn?wakeup_scheduler(t())fletblock_on_asyncf=block_on_async(t())fletblock_on_async_exnf=block_on_async_exn(t())fletrun_in_async_waitf=run_in_async_wait(t())fletrun_in_async_wait_exnf=run_in_async_wait_exn(t())fletreset_scheduler()=reset_scheduler(t())