123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373openCore_kernelopenImportincludeScheduler0moduleSynchronous_time_source=Synchronous_time_source0moduleEvent=Synchronous_time_source.EventmoduleAlarm=Timing_wheel.AlarmmoduleJob_or_event=Synchronous_time_source.T1.Job_or_eventletdebug=Debug.schedulermoduleIvar=structopenTypes.Ivarletcreate_with_cellcell={cell}letcreate()=create_with_cellEmptyletcreate_full(typea)(a:a)=(* We allocate an immutable ivar and then cast it to a mutable ivar. The immutability
allows OCaml to statically allocate the ivar if [a] is constant. This cast is safe
because a full ivar is never mutated. We also believe that we will not trigger
flambda to spuriously repor warning 59, mutation of known immutable data. All
mutations of an ivar cell, i.e. [foo.cell <- ...], are directly preceded by a
[match foo.cell] that prevents the [Full] case from reaching the modification. So
flambda should always eliminate the [foo.cell <- ...] of a constant [Full] ivar,
and not warn. *)(Obj.magic:aImmutable.t->at){cell=Fulla};;endmoduleBvar=structopenTypes.Bvarletcreate()=of_repr{has_any_waiters=false;ivar=Ivar.create()}endmoduleVery_low_priority_worker=structmoduleExec_result=structtypet=Types.Very_low_priority_worker.Exec_result.t=|Finished|Not_finished[@@derivingsexp_of]endtypet=Types.Very_low_priority_worker.t={execution_context:Execution_context.t;exec:unit->Exec_result.t}[@@derivingfields,sexp_of]letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~execution_context:(checkExecution_context.invariant)~exec:ignore);;endtypet=Scheduler0.t={(* [check_access] optionally holds a function to run to check whether access to [t] is
currently allowed. It is used to detect invalid access to the scheduler from a
thread. *)mutablecheck_access:(unit->unit)option;mutablejob_pool:Job_pool.t;normal_priority_jobs:Job_queue.t;low_priority_jobs:Job_queue.t;very_low_priority_workers:Very_low_priority_worker.tDeque.t;mutablemain_execution_context:Execution_context.t;mutablecurrent_execution_context:Execution_context.t(* The scheduler calls [got_uncaught_exn] when an exception bubbles to the top of the
monitor tree without being handled. This function guarantees to never run another
job after this by calling [clear] and because [enqueue_job] will never add another
job. *);mutableuncaught_exn:(Exn.t*Sexp.t)option;mutablecycle_count:int;mutablecycle_start:Time_ns.t;mutablein_cycle:bool;mutablerun_every_cycle_start:(unit->unit)list;mutablerun_every_cycle_end:(unit->unit)list;mutablelast_cycle_time:Time_ns.Span.t;mutablelast_cycle_num_jobs:int;mutabletotal_cycle_time:Time_ns.Span.t;mutabletime_source:read_writeSynchronous_time_source.T1.t(* [external_jobs] is a queue of actions sent from outside of async. This is for the
case where we want to schedule a job or fill an ivar from a context where it is not
safe to run async code, because the async lock isn't held. For instance: - in an
OCaml finalizer, as they can run at any time in any thread.
The way to do it is to queue a thunk in [external_jobs] and call
[thread_safe_external_job_hook], which is responsible for notifying the scheduler
that new actions are available.
When using Async on unix, [thread_safe_external_job_hook] is set in [Async_unix]
to call [Interruptor.thread_safe_interrupt], which will wake up the
[Async_unix] scheduler and run a cycle.
Note that this hook might be used in other context (js_of_ocaml, mirage).
When running a cycle, we pull external actions at every job and perform them
immediately. *);external_jobs:External_job.tThread_safe_queue.t;mutablethread_safe_external_job_hook:unit->unit(* [job_queued_hook] and [event_added_hook] aim to be used by js_of_ocaml. *)(* We use [_ option] here because those hooks will not be set in the common case
and we want to avoid extra function calls. *);mutablejob_queued_hook:(Priority.t->unit)option;mutableevent_added_hook:(Time_ns.t->unit)option;mutableyield:((unit,read_write)Types.Bvar.t[@sexp.opaque]);mutableyield_until_no_jobs_remain:((unit,read_write)Types.Bvar.t[@sexp.opaque](* configuration*));mutablecheck_invariants:bool;mutablemax_num_jobs_per_priority_per_cycle:Max_num_jobs_per_priority_per_cycle.t;mutablerecord_backtraces:bool;mutableon_start_of_cycle:unit->unit;mutableon_end_of_cycle:unit->unit}[@@derivingfields,sexp_of]letuncaught_exn_unwrapped=uncaught_exnletuncaught_exnt=matcht.uncaught_exnwith|None->None|Some(exn,sexp)->Some(Error.create"unhandled exception"(exn,sexp)[%sexp_of:Exn.t*Sexp.t]);;letnum_pending_jobst=Job_queue.lengtht.normal_priority_jobs+Job_queue.lengtht.low_priority_jobs;;letnum_jobs_runt=Job_queue.num_jobs_runt.normal_priority_jobs+Job_queue.num_jobs_runt.low_priority_jobs;;letlast_cycle_num_jobst=t.last_cycle_num_jobsletinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~check_access:ignore~job_pool:(checkJob_pool.invariant)~normal_priority_jobs:(checkJob_queue.invariant)~low_priority_jobs:(checkJob_queue.invariant)~very_low_priority_workers:(check(funq->Deque.iterq~f:Very_low_priority_worker.invariant))~main_execution_context:(checkExecution_context.invariant)~current_execution_context:(checkExecution_context.invariant)~uncaught_exn:(check(fununcaught_exn->ifis_someuncaught_exnthenassert(num_pending_jobst=0)))~cycle_count:(check(funcycle_count->assert(cycle_count>=0)))~cycle_start:ignore~in_cycle:ignore~run_every_cycle_start:ignore~run_every_cycle_end:ignore~last_cycle_time:ignore~total_cycle_time:ignore~last_cycle_num_jobs:(check(funlast_cycle_num_jobs->assert(last_cycle_num_jobs>=0)))~time_source:(check(Synchronous_time_source.Read_write.invariant_with_jobs~job:(funjob->assert(Pool.pointer_is_validt.job_pooljob))))~external_jobs:ignore~thread_safe_external_job_hook:ignore~job_queued_hook:ignore~event_added_hook:ignore~yield:ignore~yield_until_no_jobs_remain:ignore~check_invariants:ignore~max_num_jobs_per_priority_per_cycle:ignore~record_backtraces:ignore~on_start_of_cycle:ignore~on_end_of_cycle:ignorewith|exn->raise_s[%message"Scheduler.invariant failed"(exn:exn)(t:t)];;letfree_jobtjob=Pool.freet.job_pooljobletenqueuet(execution_context:Execution_context.t)fa=(* If there's been an uncaught exn, we don't add the job, since we don't want any jobs
to run once there's been an uncaught exn. *)ifis_nonet.uncaught_exnthen(letpriority=execution_context.priorityinletjob_queue=matchprioritywith|Normal->t.normal_priority_jobs|Low->t.low_priority_jobsinJob_queue.enqueuejob_queueexecution_contextfa;matcht.job_queued_hookwith|None->()|Somef->fpriority);;letenqueue_jobtjob~free_job=letjob_pool=t.job_poolinenqueuet(Pool.getjob_pooljobPool.Slot.t0)(Pool.getjob_pooljobPool.Slot.t1)(Pool.getjob_pooljobPool.Slot.t2);iffree_jobthenPool.freet.job_pooljob;;lethandle_fired(time_source:_Synchronous_time_source.T1.t)job_or_event=letopenJob_or_event.Matchinlet(Kk)=kindjob_or_eventinmatchk,projectkjob_or_eventwith|Job,job->enqueue_jobtime_source.schedulerjob~free_job:true|Event,event->Synchronous_time_source.firetime_sourceevent;;letcreate()=letnow=Time_ns.now()inletrect={check_access=None;job_pool=Job_pool.create();normal_priority_jobs=Job_queue.create();low_priority_jobs=Job_queue.create();very_low_priority_workers=Deque.create();main_execution_context=Execution_context.main;current_execution_context=Execution_context.main;uncaught_exn=None;cycle_start=now;cycle_count=0;in_cycle=false;run_every_cycle_start=[];run_every_cycle_end=[];last_cycle_time=sec0.;last_cycle_num_jobs=0;total_cycle_time=sec0.;time_source;external_jobs=Thread_safe_queue.create();thread_safe_external_job_hook=ignore;job_queued_hook=None;event_added_hook=None;yield=Bvar.create();yield_until_no_jobs_remain=Bvar.create()(* configuration *);check_invariants=Async_kernel_config.check_invariants;max_num_jobs_per_priority_per_cycle=Async_kernel_config.max_num_jobs_per_priority_per_cycle;record_backtraces=Async_kernel_config.record_backtraces;on_start_of_cycle=Fn.id;on_end_of_cycle=Fn.id}andevents=Timing_wheel.create~config:Async_kernel_config.timing_wheel_config~start:nowandtime_source:_Synchronous_time_source.T1.t={id=Types.Time_source_id.create();advance_errors=[];am_advancing=false;events;handle_fired=(funalarm->handle_firedtime_source(Alarm.valueeventsalarm));fired_events=Event.none;is_wall_clock=true;most_recently_fired=Event.none;scheduler=t}int;;letis_deadt=is_somet.uncaught_exnletset_check_accesstf=t.check_access<-flett_ref=matchResult.try_withcreatewith|Okt->reft|Errorexn->Debug.log"Async cannot create its raw scheduler"exn[%sexp_of:exn];exit1;;letcheck_accesst=matcht.check_accesswith|None->()|Somef->f();;lett()=lett=!t_refincheck_accesst;t;;letcurrent_execution_contextt=ift.record_backtracesthenExecution_context.record_backtracet.current_execution_contextelset.current_execution_context;;letwith_execution_contextttmp_context~f=letold_context=current_execution_contexttinset_execution_contextttmp_context;protect~f~finally:(fun()->set_execution_contexttold_context);;letcreate_job(typea)texecution_contextfa=ifPool.is_fullt.job_poolthent.job_pool<-Pool.growt.job_pool;Pool.new3t.job_poolexecution_context(Obj.magic(f:a->unit):Obj.t->unit)(Obj.repr(a:a));;letgot_uncaught_exntexnsexp=ifdebugthenDebug.log"got_uncaught_exn"(exn,sexp)[%sexp_of:Exn.t*Sexp.t];List.iter[t.normal_priority_jobs;t.low_priority_jobs]~f:Job_queue.clear;t.uncaught_exn<-Some(exn,sexp);;(* [start_cycle t ~max_num_jobs_per_priority] enables subsequent calls of [run_jobs]
to run up to [max_num_jobs_per_priority] jobs of each priority level. *)letstart_cyclet~max_num_jobs_per_priority=letn=Max_num_jobs_per_priority_per_cycle.rawmax_num_jobs_per_priorityinJob_queue.set_jobs_left_this_cyclet.normal_priority_jobsn;Job_queue.set_jobs_left_this_cyclet.low_priority_jobsn;;(* [run_jobs t] removes jobs from [t] one at a time and runs them, stopping as soon
as an unhandled exception is raised, or when no more jobs can be run at any priority,
as per [~max_num_jobs_per_priority]. *)letrecrun_jobst=matchJob_queue.run_jobst.normal_priority_jobstwith|Error_ase->e|Ok()->(matchJob_queue.run_jobst.low_priority_jobstwith|Error_ase->e|Ok()->ifJob_queue.can_run_a_jobt.normal_priority_jobs||Job_queue.can_run_a_jobt.low_priority_jobsthenrun_jobstelseOk());;letstabilizet=start_cyclet~max_num_jobs_per_priority:(Max_num_jobs_per_priority_per_cycle.create_exnInt.max_value);matchrun_jobstwith|Ok()->Ok()|Error(exn,_backtrace)->Errorexn;;letcreate_time_source?(timing_wheel_config=Async_kernel_config.timing_wheel_config)~now()=lett=t()inletevents=Timing_wheel.create~config:timing_wheel_config~start:nowinletrectime_source:_Synchronous_time_source.T1.t={id=Types.Time_source_id.create();advance_errors=[];am_advancing=false;events;handle_fired=(funalarm->handle_firedtime_source(Alarm.valueeventsalarm));fired_events=Event.none;is_wall_clock=false;most_recently_fired=Event.none;scheduler=t}intime_source;;letwall_clock()=Synchronous_time_source.read_only(t()).time_source