123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182open!Core_kernelopen!ImportmoduleScheduler=Scheduler0letdummy_e=Execution_context.mainletdummy_f:Obj.t->unit=ignoreletdummy_a:Obj.t=Obj.repr()letslots_per_elt=3moduleA=Uniform_array(* This is essentially a specialized [Flat_queue], done for reasons of speed. *)typet=Types.Job_queue.t={mutablenum_jobs_run:int;mutablejobs_left_this_cycle:int(* [jobs] is an array of length [capacity t * slots_per_elt], where each elt has the
three components of a job ([execution_context], [f], [a]) in consecutive spots in
[jobs]. [enqueue] doubles the length of [jobs] if [jobs] is full. [jobs] never
shrinks. [jobs] is somewhat like a [Core_kernel.Pool] specialized to 3-tuples; we
don't use [Pool] because that implements a set, where [jobs] is a queue. *);mutablejobs:Obj.tA.tsexp_opaque(* [mask] is [capacity t - 1], and is used for quickly computing [i mod (capacity
t)] *);mutablemask:int(* [front] is the index of the first job in the queue. The array index of that job's
execution context is [front * slots_per_elt]. *);mutablefront:int;mutablelength:int}[@@derivingfields,sexp_of]letoffsetti=(t.front+i)landt.mask*slots_per_eltletcapacityt=t.mask+1letinvariantt:unit=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~num_jobs_run:(check(funnum_jobs_run->assert(num_jobs_run>=0)))~jobs_left_this_cycle:(check(funjobs_left_this_cycle->assert(jobs_left_this_cycle>=0)))~jobs:(check(funjobs->fori=0tot.length-1doExecution_context.invariant(Obj.obj(A.getjobs(offsetti)):Execution_context.t)done))~mask:(check(funmask->letcapacity=mask+1inassert(Int.is_pow2capacity);assert(capacity*slots_per_elt=A.lengtht.jobs)))~front:(check(funfront->assert(front>=0);assert(front<capacityt)))~length:(check(funlength->assert(length>=0);assert(length<=capacityt))));;letcreate_array~capacity=A.create_obj_array~len:(capacity*slots_per_elt)letcreate()=letcapacity=1in{num_jobs_run=0;jobs_left_this_cycle=0;jobs=create_array~capacity;mask=capacity-1;front=0;length=0};;letcleart=t.front<-0;t.length<-0;t.jobs_left_this_cycle<-0;;letgrowt=letold_capacity=capacitytinletnew_capacity=old_capacity*2inletold_jobs=t.jobsinletold_front=t.frontinletlen1=Int.mint.length(old_capacity-old_front)*slots_per_eltinletlen2=(t.length*slots_per_elt)-len1inletnew_jobs=create_array~capacity:new_capacityinA.blit~len:len1~src:old_jobs~src_pos:(old_front*slots_per_elt)~dst:new_jobs~dst_pos:0;A.blit~len:len2~src:old_jobs~src_pos:0~dst:new_jobs~dst_pos:len1;t.mask<-new_capacity-1;t.jobs<-new_jobs;t.front<-0;;letset(typea)tiexecution_contextfa=letoffset=offsettiinA.unsafe_sett.jobsoffset(Obj.repr(execution_context:Execution_context.t));A.unsafe_sett.jobs(offset+1)(Obj.repr(f:a->unit));A.unsafe_sett.jobs(offset+2)(Obj.repr(a:a));;letenqueuetexecution_contextfa=ift.length=capacitytthengrowt;settt.lengthexecution_contextfa;t.length<-t.length+1;;letset_jobs_left_this_cycletn=ifn<0thenraise_s[%message"Jobs.set_jobs_left_this_cycle got negative number"(n:int)(t:t)];t.jobs_left_this_cycle<-n;;letcan_run_a_jobt=t.length>0&&t.jobs_left_this_cycle>0letrun_jobt(scheduler:Scheduler.t)execution_contextfa=t.num_jobs_run<-t.num_jobs_run+1;Scheduler.set_execution_contextschedulerexecution_context;fa;;letrun_external_jobst(scheduler:Scheduler.t)=letexternal_jobs=scheduler.external_jobsinwhileThread_safe_queue.lengthexternal_jobs>0dolet(External_job.T(execution_context,f,a))=Thread_safe_queue.dequeue_exnexternal_jobsinrun_jobtschedulerexecution_contextfadone;;letrun_jobs(typea)tscheduler=(* We do the [try-with] outside of the [while] because it is cheaper than doing a
[try-with] for each job. *)try(* [run_external_jobs] before entering the loop, since it might enqueue a job,
changing [t.length]. *)run_external_jobstscheduler;whilecan_run_a_jobtdoletthis_job=offsett0inletexecution_context:Execution_context.t=Obj.obj(A.unsafe_gett.jobsthis_job)inletf:a->unit=Obj.obj(A.unsafe_gett.jobs(this_job+1))inleta:a=Obj.obj(A.unsafe_gett.jobs(this_job+2))in(* We clear out the job right now so that it isn't live at the next minor
collection. We tried not doing this and saw significant (15% or so) performance
hits due to spurious promotion. *)sett0dummy_edummy_fdummy_a;t.front<-(t.front+1)landt.mask;t.length<-t.length-1;t.jobs_left_this_cycle<-t.jobs_left_this_cycle-1;(* It is OK if [run_job] or [run_external_jobs] raises, in which case the exn is
handled by the outer try-with. The only side effects we have done are to take
the job out of the queue and decrement [jobs_left_this_cycle]. [run_job] or
[run_external_jobs] may side effect [t], either by enqueueing jobs, or by
clearing [t]. *)run_jobtschedulerexecution_contextfa;(* [run_external_jobs] at each iteration of the [while] loop, for fairness. *)run_external_jobstschedulerdone;Result.ok_unitwith|exn->(* We call [Exn.backtrace] immediately after catching an unhandled exception, to
ensure there is no intervening code that interferes with the global backtrace
state. *)letbacktrace=Backtrace.Exn.most_recent()inError(exn,backtrace);;