123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311open!Core_kernelopen!Importopen!Deferred_stdmoduleDeferred=Deferred1moduleScheduler=Scheduler1moduleStream=Async_streaminclude(Scheduler:moduletypeofSchedulerwithmoduleBvar:=Scheduler.BvarwithmoduleIvar:=Scheduler.IvarwithmoduleSynchronous_time_source:=Scheduler.Synchronous_time_source)lett=Scheduler.tincludeMonitor.Exported_for_schedulerletfind_localkey=Execution_context.find_local(current_execution_context(t()))keyletwith_localkeyvalue~f=lett=t()inletexecution_context=Execution_context.with_local(current_execution_contextt)keyvalueinwith_execution_contexttexecution_context~f;;letmain_execution_context=(t()).main_execution_contextletcan_run_a_jobt=num_pending_jobst>0||Bvar.has_any_waiterst.yieldlethas_upcoming_eventt=not(Timing_wheel_ns.is_empty(eventst))letnext_upcoming_eventt=Timing_wheel_ns.next_alarm_fires_at(eventst)letnext_upcoming_event_exnt=Timing_wheel_ns.next_alarm_fires_at_exn(eventst)letevent_precisiont=Timing_wheel_ns.alarm_precision(eventst)letcycle_startt=t.cycle_startletrun_every_cycle_startt~f=t.run_every_cycle_start<-f::t.run_every_cycle_startletmap_cycle_timest~f=Stream.create(funtail->run_every_cycle_startt~f:(fun()->Tail.extendtail(ft.last_cycle_time)));;letlong_cyclest~at_least=Stream.create(funtail->run_every_cycle_startt~f:(fun()->ifTime_ns.Span.(>=)t.last_cycle_timeat_leastthenTail.extendtailt.last_cycle_time));;letcycle_num_jobst=Stream.create(funtail->run_every_cycle_startt~f:(fun()->Tail.extendtailt.last_cycle_num_jobs));;letcycle_countt=t.cycle_countletset_max_num_jobs_per_priority_per_cycletint=t.max_num_jobs_per_priority_per_cycle<-Max_num_jobs_per_priority_per_cycle.create_exnint;;letmax_num_jobs_per_priority_per_cyclet=Max_num_jobs_per_priority_per_cycle.rawt.max_num_jobs_per_priority_per_cycle;;letset_thread_safe_external_job_hooktf=t.thread_safe_external_job_hook<-fletthread_safe_enqueue_external_jobtexecution_contextfa=Thread_safe_queue.enqueuet.external_jobs(External_job.T(execution_context,f,a));t.thread_safe_external_job_hook();;letset_event_added_hooktf=t.event_added_hook<-Somefletset_job_queued_hooktf=t.job_queued_hook<-Somefletcreate_alarmtf=letexecution_context=current_execution_contexttinGc.Expert.Alarm.create(fun()->thread_safe_enqueue_external_jobtexecution_contextf());;letadd_finalizertheap_blockf=letexecution_context=current_execution_contexttinletfinalizerheap_block=(* Here we can be in any thread, and may not be holding the async lock. So, we can
only do thread-safe things.
By putting [heap_block] in [external_jobs], we are keeping it alive until the next
time the async scheduler gets around to dequeueing it. Calling
[t.thread_safe_external_job_hook] ensures that will happen in short order. Thus,
we are not dramatically increasing the lifetime of [heap_block], since the OCaml
runtime already resurrected [heap_block] so that we could refer to it here. The
OCaml runtime already removed the finalizer function when it noticed [heap_block]
could be finalized, so there is no infinite loop in which we are causing the
finalizer to run again. Also, OCaml does not impose any requirement on finalizer
functions that they need to dispose of the block, so it's fine that we keep
[heap_block] around until later. *)ifDebug.finalizersthenDebug.log_string"enqueueing finalizer";thread_safe_enqueue_external_jobtexecution_contextfheap_blockinifDebug.finalizersthenDebug.log_string"adding finalizer";(* We use [Caml.Gc.finalise] instead of [Core_kernel.Gc.add_finalizer] because the latter
has its own wrapper around [Caml.Gc.finalise] to run finalizers synchronously. *)tryCaml.Gc.finalisefinalizerheap_blockwith|Invalid_argument_->(* [Heap_block] ensures that this will only fail for static data, in which case we
can drop the finalizer since the block will never be collected.*)();;letadd_finalizer_exntxf=add_finalizert(Heap_block.create_exnx)(funheap_block->f(Heap_block.valueheap_block));;letadd_finalizer_lasttheap_blockf=letexecution_context=current_execution_contexttinletfinalizer()=(* Here we can be in any thread, and may not be holding the async lock. So, we can
only do thread-safe things. *)ifDebug.finalizersthenDebug.log_string"enqueueing finalizer (using 'last' semantic)";thread_safe_enqueue_external_jobtexecution_contextf()inifDebug.finalizersthenDebug.log_string"adding finalizer (using 'last' semantic)";(* We use [Caml.Gc.finalise_last] instead of [Core_kernel.Gc.add_finalizer_last] because
the latter has its own wrapper around [Caml.Gc.finalise_last] to run finalizers
synchronously. *)tryCaml.Gc.finalise_lastfinalizerheap_blockwith|Invalid_argument_->(* [Heap_block] ensures that this will only fail for static data, in which case we
can drop the finalizer since the block will never be collected.*)();;letadd_finalizer_last_exntxf=add_finalizer_lastt(Heap_block.create_exnx)f(** [force_current_cycle_to_end] sets the number of normal jobs allowed to run in this
cycle to zero. Thus, after the currently running job completes, the scheduler will
switch to low priority jobs and then end the current cycle. *)letforce_current_cycle_to_endt=Job_queue.set_jobs_left_this_cyclet.normal_priority_jobs0;;(* We preallocate [send_exn] to avoid allocating it on each call to [advance_clock]. *)letsend_exn=SomeMonitor.send_exnletadvance_clockt~now=Synchronous_time_source0.advancet.time_source~to_:now~send_exn;;letrun_cyclet=ifdebugthenDebug.log"run_cycle starting"t[%sexp_of:t];t.on_start_of_cycle();letnow=Time_ns.now()int.cycle_count<-t.cycle_count+1;t.cycle_start<-now;t.in_cycle<-true;Bvar.broadcastt.yield();letnum_jobs_run_at_start_of_cycle=num_jobs_runtinList.itert.run_every_cycle_start~f:(funf->f());advance_clockt~now;start_cyclet~max_num_jobs_per_priority:t.max_num_jobs_per_priority_per_cycle;letrecrun_jobst=matchScheduler.run_jobstwith|Ok()->()|Error(exn,backtrace)->Monitor.send_exn(Monitor.current())exn~backtrace:(`Thisbacktrace);(* [run_jobs] stopped due to an exn. There may still be jobs that could be
run this cycle, so [run_jobs] again. *)run_jobstinrun_jobst;letcycle_time=Time_ns.diff(Time_ns.now())t.cycle_startint.last_cycle_time<-cycle_time;t.last_cycle_num_jobs<-num_jobs_runt-num_jobs_run_at_start_of_cycle;t.total_cycle_time<-Time_ns.Span.(t.total_cycle_time+cycle_time);ifBvar.has_any_waiterst.yield_until_no_jobs_remain&&Job_queue.lengtht.normal_priority_jobs+Job_queue.lengtht.low_priority_jobs=0thenBvar.broadcastt.yield_until_no_jobs_remain();t.in_cycle<-false;t.on_end_of_cycle();ifdebugthenDebug.log"run_cycle finished"(uncaught_exnt,is_some(next_upcoming_eventt))[%sexp_of:Error.toption*bool];;letrun_cycles_until_no_jobs_remain()=ifdebugthenDebug.log_string"run_cycles_until_no_jobs_remain starting";lett=t()inifis_deadtthenraise_s[%message"run_cycles_until_no_jobs_remain cannot proceed -- scheduler is dead"~scheduler:(t:t)];letrecloop()=run_cyclet;advance_clockt~now:(Time_ns.now());ifcan_run_a_jobtthenloop()inloop();(* Reset the current execution context to maintain the invariant that when we're not in
a job, [current_execution_context = main_execution_context]. *)set_execution_contexttt.main_execution_context;ifdebugthenDebug.log_string"run_cycles_until_no_jobs_remain finished";Option.iter(uncaught_exnt)~f:Error.raise;;letmake_async_unusable()=lett=!t_refint.check_access<-Some(fun()->raise_s[%sexp"Async scheduler is unusable due to [make_async_unusable]"]);;letreset_in_forked_process()=ifdebugthenDebug.log_string"reset_in_forked_process";(* There is no need to empty [main_monitor_hole]. *)Scheduler.(t_ref:=create());;letcheck_invariantst=t.check_invariantsletset_check_invariantstb=t.check_invariants<-bletset_record_backtracestb=t.record_backtraces<-bletset_on_start_of_cycletf=t.on_start_of_cycle<-fletset_on_end_of_cycletf=t.on_end_of_cycle<-fletyieldt=Bvar.waitt.yieldletyield_until_no_jobs_remaint=Bvar.waitt.yield_until_no_jobs_remainletyield_every~n=ifn<=0thenraise_s[%message"Scheduler.yield_every got nonpositive count"(n:int)]elseifn=1thenstage(funt->yieldt)else(letcount_until_yield=refninstage(funt->decrcount_until_yield;if!count_until_yield>0thenreturn()else(count_until_yield:=n;yieldt)));;lettotal_cycle_timet=(* Adjust for the fact the caller's probably an Async job. *)ift.in_cyclethen(letthis_cycle_time=Time_ns.(diff(now())t.cycle_start)inTime_ns.Span.(t.total_cycle_time+this_cycle_time))elset.total_cycle_time;;moduleVery_low_priority_work=structmoduleWorker_result=Very_low_priority_worker.Exec_resultletrecrunt=run_workerst~num_execs_before_yielding:1_000andrun_workerst~num_execs_before_yielding=ifnum_execs_before_yielding=0thenyield_then_runtelseifnot(Deque.is_emptyt.very_low_priority_workers)then(letworker=Deque.dequeue_front_exnt.very_low_priority_workersinset_execution_contexttworker.execution_context;run_workertworker~num_execs_before_yielding)andyield_then_runt=ifnot(Deque.is_emptyt.very_low_priority_workers)thenDeferred.upon(yieldt)(fun()->runt)andrun_workertworker~num_execs_before_yielding=assert(phys_equalt.current_execution_contextworker.execution_context);ifnum_execs_before_yielding=0then(Deque.enqueue_frontt.very_low_priority_workersworker;yield_then_runt)else(letnum_execs_before_yielding=num_execs_before_yielding-1inmatchworker.exec()with|Finished->run_workerst~num_execs_before_yielding|Not_finished->run_workertworker~num_execs_before_yielding|exceptionexn->letbt=Backtrace.Exn.most_recent()inMonitor.send_exn(Monitor.current())exn~backtrace:(`Thisbt);run_workerst~num_execs_before_yielding);;letenqueue~f=lett=t()inletqueue=t.very_low_priority_workersinletrunning=not(Deque.is_emptyqueue)inletexecution_context=Execution_context.create_like(current_execution_contextt)~priority:LowinDeque.enqueue_backqueue{execution_context;exec=f};ifnotrunningthenenqueuetexecution_contextrunt;;endmoduleFor_bench=structletadvance_clock=advance_clockend