123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102openCoreopenAsyncopen!Int.Replace_polymorphic_compare(* Why don't we implement this using [Sequencer] (and thereby get its nice exception
handling, notably)?
Well, you end up writing some loop like this
{[
let rec run t =
match Throttle.num_jobs_running s with
| 0 -> when_idle etc.
| _ ->
(* wait for job to finish. *)
run t
]}
the problem is that "wait for job to finish" is hard. You can enqueue a job in the
sequencer, or use [Throttle.prior_jobs_done], but both of those things modify
[Throttle.num_jobs_running] and there are no guarantees about the race between
[num_jobs_running] changing back to [0] and the result of the dummy job you enqueued
becoming determined. In practice async-callbacks waiting on dummy job's deferred run
before [num_jobs_running] decreases, and you end up in an infinite loop.
We could probably hack around that with [Scheduler.yield_until_no_jobs_remain ()] or
something but besides being slow, I claim this ultimately ends up being far more
complicated than that which you see below. *)typejob=|Job:{start:unitIvar.t;finished:'aDeferred.t}->job[@@derivingsexp_of]typewhen_idle_next_step=|Call_me_when_idle_again|Finishedtypet={jobs_waiting:jobQueue.t;any_work_added:(unit,read_write)Bvar.t;mutablewhen_idle:(unit->when_idle_next_stepDeferred.t)option}[@@derivingsexp_of]letrecrunt=let%bind()=matchQueue.dequeuet.jobs_waitingwith|Some(Job{start;finished})->Ivar.fillstart();let%bind_=finishedinreturn()|None->(matcht.when_idlewith|None->Bvar.waitt.any_work_added|Somefunc->(match%bindfunc()with|Finished->t.when_idle<-None;return()|Call_me_when_idle_again->return()))inrunt;;letcreate()=lett={jobs_waiting=Queue.create();any_work_added=Bvar.create();when_idle=None}indon't_wait_for(runt);t;;letenqueuetjob:_Deferred.t=letstart=Ivar.create()inletfinished=let%bind()=Ivar.readstartinjob()inQueue.enqueuet.jobs_waiting(Job{start;finished});Bvar.broadcastt.any_work_added();finished;;letwhen_idletcallback=matcht.when_idlewith|Some_->failwith"Query_scheduler.when_idle: already have a callback"|None->t.when_idle<-Somecallback;Bvar.broadcastt.any_work_added();;letrecother_jobs_are_waitingt=matchQueue.is_emptyt.jobs_waitingwith|false->return()|true->let%bind()=Bvar.waitt.any_work_addedinother_jobs_are_waitingt;;