123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319open!Core_kernelopen!Importopen!Deferred_stdmoduleDeferred=Deferred1type'aoutcome=[`Okof'a|`Aborted|`Raisedofexn][@@derivingsexp_of]moduleInternal_job:sigtype'at[@@derivingsexp_of]valcreate:('a->'bDeferred.t)->'at*'boutcomeDeferred.t(* Every internal job will eventually be either [run] or [abort]ed, but not both. *)valrun:'at->'a->[`Ok|`Raised]Deferred.tvalabort:_t->unitend=structtype'at={start:[`Abort|`Startof'a]Ivar.t;outcome:[`Ok|`Aborted|`Raised]Deferred.t}[@@derivingsexp_of]letcreatework=letstart=Ivar.create()inletresult=match%bindIvar.readstartwith|`Abort->return`Aborted|`Starta->(match%mapMonitor.try_with(fun()->worka)with|Oka->`Oka|Errorexn->`Raisedexn)inletoutcome=match%mapresultwith|`Ok_->`Ok|`Aborted->`Aborted|`Raised_->`Raisedinlett={start;outcome}int,result;;letrunta=Ivar.fillt.start(`Starta);match%mapt.outcomewith|`Aborted->assertfalse|(`Ok|`Raised)asx->x;;letabortt=Ivar.fillt.start`Abortendtype'at={continue_on_error:bool;max_concurrent_jobs:int(* [job_resources_not_in_use] holds resources that are not currently in use by a
running job. *);job_resources_not_in_use:'aStack_or_counter.t(* [jobs_waiting_to_start] is the queue of jobs that haven't yet started. *);jobs_waiting_to_start:'aInternal_job.tQueue.t(* [0 <= num_jobs_running <= max_concurrent_jobs]. *);mutablenum_jobs_running:int(* [capacity_available] is [Some ivar] if user code has called [capacity_available t] and
is waiting to be notified when capacity is available in the throttle.
[maybe_start_job] will fill [ivar] when capacity becomes available, i.e. when
[jobs_waiting_to_start] is empty and [num_jobs_running < max_concurrent_jobs]. *);mutablecapacity_available:unitIvar.toption(* [is_dead] is true if [t] was killed due to a job raising an exception or [kill t]
being called. *);mutableis_dead:bool(* [cleans] holds functions that will be called to clean each resource when [t] is
killed. *);mutablecleans:('a->unitDeferred.t)list(* [num_resources_not_cleaned] is the number of resources whose clean functions have not
yet completed. While [t] is alive, [num_resources_not_cleaned =
max_concurrent_jobs]. Once [t] is killed, [num_resources_not_cleaned] decreases to
zero over time as the clean functions complete. *);mutablenum_resources_not_cleaned:int(* [cleaned] becomes determined when [num_resources_not_cleaned] reaches zero,
i.e. after [t] is killed and all its clean functions complete. *);cleaned:unitIvar.t}[@@derivingfields,sexp_of]letinvariantinvariant_at:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~continue_on_error:ignore~max_concurrent_jobs:(check(funmax_concurrent_jobs->assert(max_concurrent_jobs>0)))~job_resources_not_in_use:(check(funjob_resources_not_in_use->Stack_or_counter.iterjob_resources_not_in_use~f:invariant_a;assert(Stack_or_counter.lengthjob_resources_not_in_use=ift.is_deadthen0elset.max_concurrent_jobs-t.num_jobs_running)))~jobs_waiting_to_start:(check(functionjobs_waiting_to_start->ift.is_deadthenassert(Queue.is_emptyjobs_waiting_to_start)))~num_jobs_running:(check(funnum_jobs_running->assert(num_jobs_running>=0);assert(num_jobs_running<=t.max_concurrent_jobs);ifnum_jobs_running<t.max_concurrent_jobsthenassert(Queue.is_emptyt.jobs_waiting_to_start)))~capacity_available:(check(function|None->()|Someivar->assert(Ivar.is_emptyivar)))~is_dead:ignore~cleans:ignore~num_resources_not_cleaned:(check(funnum_resources_not_cleaned->assert(num_resources_not_cleaned>=0);assert(num_resources_not_cleaned<=t.max_concurrent_jobs);ifnum_resources_not_cleaned<t.max_concurrent_jobsthenassertt.is_dead))~cleaned:(check(funcleaned->ifIvar.is_fullcleanedthenassert(t.num_resources_not_cleaned=0)))with|exn->raise_s[%message"Throttle.invariant failed"(exn:exn)(t:_t)];;moduleT2=structtypenonrec('a,'kind)t='at[@@derivingsexp_of]letinvariantinvariant_a_t=invariantinvariant_atendletnum_jobs_waiting_to_startt=Queue.lengtht.jobs_waiting_to_startletclean_resourceta=Deferred.all_unit(List.mapt.cleans~f:(funf->fa))>>>fun()->t.num_resources_not_cleaned<-t.num_resources_not_cleaned-1;ift.num_resources_not_cleaned=0thenIvar.fillt.cleaned();;letkillt=ifnott.is_deadthen(t.is_dead<-true;Queue.itert.jobs_waiting_to_start~f:Internal_job.abort;Queue.cleart.jobs_waiting_to_start;Stack_or_counter.itert.job_resources_not_in_use~f:(funa->clean_resourceta);Stack_or_counter.cleart.job_resources_not_in_use);;letat_killtf=(* We preserve the execution context so that exceptions raised by [f] go to the monitor
in effect when [at_kill] was called. *)letf=unstage(Monitor.Exported_for_scheduler.preserve_execution_context'f)int.cleans<-f::t.cleans;;letcleanedt=Ivar.readt.cleanedletrecstart_jobt=assert(nott.is_dead);assert(t.num_jobs_running<t.max_concurrent_jobs);assert(not(Queue.is_emptyt.jobs_waiting_to_start));letjob=Queue.dequeue_exnt.jobs_waiting_to_startint.num_jobs_running<-t.num_jobs_running+1;letjob_resource=Stack_or_counter.pop_exnt.job_resources_not_in_useinInternal_job.runjobjob_resource>>>funres->t.num_jobs_running<-t.num_jobs_running-1;(matchreswith|`Ok->()|`Raised->ifnott.continue_on_errorthenkillt);ift.is_deadthenclean_resourcetjob_resourceelse(Stack_or_counter.pusht.job_resources_not_in_usejob_resource;ifnot(Queue.is_emptyt.jobs_waiting_to_start)thenstart_jobtelse(matcht.capacity_availablewith|None->()|Someivar->Ivar.fillivar();t.capacity_available<-None));;letcreate_internal~continue_on_errorjob_resources=letmax_concurrent_jobs=Stack_or_counter.lengthjob_resourcesin{continue_on_error;max_concurrent_jobs;job_resources_not_in_use=job_resources;jobs_waiting_to_start=Queue.create();num_jobs_running=0;capacity_available=None;is_dead=false;cleans=[];num_resources_not_cleaned=max_concurrent_jobs;cleaned=Ivar.create()};;letcreate_with~continue_on_errorjob_resources=create_internal~continue_on_error(Stack_or_counter.of_listjob_resources);;moduleSequencer=structtypenonrec'at='at[@@derivingsexp_of]letcreate?(continue_on_error=false)a=create_with~continue_on_error[a]endletcreate~continue_on_error~max_concurrent_jobs=ifmax_concurrent_jobs<=0thenraise_s[%message"Throttle.create requires positive max_concurrent_jobs, but got"(max_concurrent_jobs:int)];create_internal~continue_on_error(Stack_or_counter.create_counter~length:max_concurrent_jobs);;moduleJob=structtype('a,'b)t={internal_job:'aInternal_job.t;result:[`Okof'b|`Aborted|`Raisedofexn]Deferred.t}letresultt=t.resultletabortt=Internal_job.abortt.internal_jobletcreatef=letinternal_job,result=Internal_job.createfin{internal_job;result};;endletenqueue'tf=letjob=Job.createfinift.is_deadthenJob.abortjobelse(Queue.enqueuet.jobs_waiting_to_startjob.internal_job;ift.num_jobs_running<t.max_concurrent_jobsthenstart_jobt);Job.resultjob;;letenqueuetf=match%mapenqueue'tfwith|`Oka->a|`Aborted->raise_s[%message"throttle aborted job"]|`Raisedexn->raiseexn;;letmonad_sequence_how?(how=`Sequential)~f=stage(matchhowwith|`Parallel->f|(`Sequential|`Max_concurrent_jobs_)ashow->letmax_concurrent_jobs=matchhowwith|`Sequential->1|`Max_concurrent_jobsmax_concurrent_jobs->max_concurrent_jobsinlett=create~continue_on_error:false~max_concurrent_jobsinfuna->enqueuet(fun()->fa));;letmonad_sequence_how2?(how=`Sequential)~f=stage(matchhowwith|`Parallel->f|(`Sequential|`Max_concurrent_jobs_)ashow->letmax_concurrent_jobs=matchhowwith|`Sequential->1|`Max_concurrent_jobsmax_concurrent_jobs->max_concurrent_jobsinlett=create~continue_on_error:false~max_concurrent_jobsinfuna1a2->enqueuet(fun()->fa1a2));;letprior_jobs_donet=(* We queue [t.max_concurrent_jobs] dummy jobs and when they are all started we know
that all prior jobs finished. We make sure that all dummy jobs wait for the last one
to get started before finishing. *)Deferred.create(funall_dummy_jobs_running->letdummy_jobs_running=ref0infor_=1tot.max_concurrent_jobsdodon't_wait_for(enqueuet(fun_->incrdummy_jobs_running;if!dummy_jobs_running=t.max_concurrent_jobsthenIvar.fillall_dummy_jobs_running();Ivar.readall_dummy_jobs_running))done);;letcapacity_availablet=ifnum_jobs_runningt<max_concurrent_jobstthenreturn()else(matcht.capacity_availablewith|Someivar->Ivar.readivar|None->Deferred.create(funivar->t.capacity_available<-Someivar));;