123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848moduleStable=structopen!Core_kernel.Core_kernel_stablemoduleResource=structmoduleV1=structtypet={state:[`Busy|`Closing|`Idle];since:Time_ns.Span.V2.t}[@@derivingsexp,bin_io]endendmoduletypeKey=sigtypet[@@derivingsexp,bin_io]endmoduleResource_list=structmoduleV1=structtype'keyt={key:'key;resources:Resource.V1.tlist;queue_length:int;max_time_on_queue:Time_ns.Span.V2.toption}[@@derivingsexp,bin_io]endendmoduleStatus=structmoduleV1=structtype'keyt={resource_lists:'keyResource_list.V1.tlist;num_jobs_in_cache:int}[@@derivingsexp,bin_io]endendendopen!Core_kernelopen!Async_kernelopen!ImportincludeResource_cache_intfmoduleUid=Unique_id.Int()moduleMake_wrapped(R:Resource.S_wrapped)=structmoduleStatus=structmoduleKey=R.KeymoduleResource=structtypestate=[`Busy|`Idle|`Closing][@@derivingsexp_of,compare]typet=Stable.Resource.V1.t={state:state;since:Time_ns.Span.t}[@@derivingfields,sexp_of,compare]endmoduleResource_list=structtype'keyt_='keyStable.Resource_list.V1.t={key:'key;resources:Resource.tlist;queue_length:int;max_time_on_queue:Time_ns.Span.toption}[@@derivingfields,sexp_of,compare]typet=Key.tt_[@@derivingsexp_of,compare]endtype'keyt_='keyStable.Status.V1.t={resource_lists:'keyResource_list.t_list;num_jobs_in_cache:int}[@@derivingfields,sexp_of,compare]typet=Key.tt_[@@derivingsexp_of,compare]moduleMake_stable=structmoduleV1(Key:Stable.Keywithtypet=Key.t)=structtypet=Key.tStable.Status.V1.t[@@derivingsexp,bin_io]endendendmoduleDelayed_failures=structtypet=[`Error_opening_resourceofR.Key.t*Error.t|`Cache_is_closed]endmoduleJob:sigtype'atvalcreate:?open_timeout:Time_ns.Span.t->give_up:unitDeferred.t->f:(R.t->'aDeferred.t)->'at(* Use [has_result t] instead of [Deferred.is_determined (result t)] to prevent a race
condition. It is possible that the result ivar was filled but [result] is not yet
determined. *)valhas_result:_t->boolvalresult:'at->[`OkofR.Key.t*'a|`Gave_up_waiting_for_resource|Delayed_failures.t]Deferred.tvalf:'at->R.t->'aDeferred.tvalopen_timeout:'at->Time_ns.Span.toptionvalcreated_at:'at->Time_ns.tvalmark_result_from_available_resource:'at->R.Key.t->result:'aDeferred.t->unitvalmark_result_from_resource_creation:'at->result:[`OkofR.Key.t*'a|Delayed_failures.t|(* This case is not possible, but the compiler gets mad otherwise *)`Gave_up_waiting_for_resource]Deferred.t->unitvalmark_cache_closed:'at->unitend=structtype'at={f:R.t->'aDeferred.t;result_ivar:[`OkofR.Key.t*'a|`Gave_up_waiting_for_resource|Delayed_failures.t]Deferred.tIvar.t;open_timeout:Time_ns.Span.toption;created_at:Time_ns.t}[@@derivingfields]letcreate?open_timeout~give_up~f=letresult_ivar=Ivar.create()inupongive_up(fun()->Ivar.fill_if_emptyresult_ivar(return`Gave_up_waiting_for_resource));{f;result_ivar;open_timeout;created_at=Time_ns.now()};;letmark_result_from_available_resourcetargs~result=Ivar.fillt.result_ivar(let%mapres=resultin`Ok(args,res));;letmark_result_from_resource_creationt~result=Ivar.fillt.result_ivarresultletmark_cache_closedt=Ivar.fill_if_emptyt.result_ivar(return`Cache_is_closed)lethas_resultt=Ivar.is_fullt.result_ivarletresultt=let%bindresult=Ivar.readt.result_ivarinresult;;end(* [Resource] wraps [R] taking care that uses of [with_] don't cross paths, and that
[close] and [close_finished] are well behaved.
It will trigger [close] once the [max_resource_reuse] or [idle_cleanup_after] are
exceeded. *)moduleResource:sigtypet(* [create] will immediately produce a [Resource.t] that is initially
busy with:
- calling [R.open_]
- calling [immediate ~f:with_] with the resource created (if successful)
If [R.open_] fails, this resource is immediately closed
otherwise the resource will become idle after the initial use.
@see [immediate]. *)valcreate:?open_timeout:Time_ns.Span.t->Config.t->R.Key.t->R.Common_args.t->with_:(R.t->'aDeferred.t)->log_error:(string->unit)->t*[>`OkofR.Key.t*'a|Delayed_failures.t]Deferred.tvalstatus:t->Status.Resource.t(* [close_when_idle] forces the resource to shutdown either now or when the currently
running [f] completes *)valclose_when_idle:t->unitDeferred.t(* [close_finished] becomes determined when this [Resource] has been closed.
We guarantee that this will become determined, even if the underlying
resource implementation is not well behaved. *)valclose_finished:t->unitDeferred.t(* Aquire an exclusive lock on this resource and call [f]. If [f] fails, or if the
number of calls exceeds [max_resource_reuse] this resource will be closed.
Otherwise this resource will be marked as idle and will close if not used again
within a predefined timeout. *)valimmediate:t->f:(R.t->'aDeferred.t)->[`Okof'aDeferred.t|`Resource_unavailable_untilofunitDeferred.t|`Resource_closed]valequal:t->t->boolend=structtypet={uid:Uid.t;key:R.Key.t;args:R.Common_args.t;resource:R.tSet_once.t;mutablestate:[`Idle|`In_use_untilofunitIvar.t|`Closing];mutablein_state_since:Time_ns.t;config:Config.t;mutableremaining_uses:int;close_finished:unitIvar.t;log_error:string->unit}letequalab=Uid.equala.uidb.uidletstatust=letstate=matcht.statewith|`Idle->`Idle|`In_use_until_->`Busy|`Closing->`Closingin{Status.Resource.state;since=Time_ns.diff(Time_ns.now())t.in_state_since};;letset_statetstate=t.state<-state;t.in_state_since<-Time_ns.now();;letclose_finishedt=Ivar.readt.close_finishedletcloset=letreally_close()=set_statet`Closing;letclosed=matchSet_once.gett.resourcewith|None->Deferred.unit|Somer->(match%mapMonitor.try_with(fun()->ifR.has_close_startedrthenDeferred.unitelseR.closer)with|Ok()->()|Errorexn->t.log_error(sprintf!"Exception closing resource: %{Exn}"exn))inmatch%mapClock_ns.with_timeout(Time_ns.Span.of_sec10.)closedwith|`Result()|`Timeout->Ivar.fillt.close_finished()inmatcht.statewith|`Closing->close_finishedt|`Idle->really_close()|`In_use_untildone_->assert(not(Ivar.is_fulldone_));close_finishedt>>>Ivar.filldone_;really_close();;letclose_when_idlet=matcht.statewith|`Closing->close_finishedt|`Idle->closet|`In_use_until_->(* This will trigger a [close] when the current task completes. *)t.remaining_uses<-0;close_finishedt;;letset_idlet=matcht.statewith|`Closing->failwith"Impossible, can't set a closed resource to idle"|`Idle->failwith"Impossible, already marked as idle"|`In_use_untildone_->assert(Ivar.is_emptydone_);ift.remaining_uses<=0thendon't_wait_for(closet)else(set_statet`Idle;Ivar.filldone_();Clock_ns.aftert.config.idle_cleanup_after>>>fun()->matcht.statewith|`Closing|`In_use_until_->()|`Idle->letidle_time=Time_ns.diff(Time_ns.now())t.in_state_sinceinifTime_ns.Span.(>=)idle_timet.config.idle_cleanup_afterthendon't_wait_for(closet));;letunsafe_immediatet~f=matcht.statewith|`Closing->failwith"Can't [unsafe_immediate] a closed resource"|`Idle->failwith"Can't [unsafe_immediate] an idle resource"|`In_use_untildone_->assert(Ivar.is_emptydone_);assert(t.remaining_uses>0);t.remaining_uses<-t.remaining_uses-1;(* deliberately not filling [done_] here.
It is filled in [set_idle] or [close]. *)(match%mapMonitor.try_with(fun()->f(Set_once.get_exnt.resource[%here]))with|Okres->set_idlet;res|Errorexn->don't_wait_for(Deferred.ignore(closet));raiseexn);;letimmediatet~f=matcht.statewith|`Closing->`Resource_closed|`In_use_untildone_->`Resource_unavailable_until(Ivar.readdone_)|`Idle->(* It is possible that [R.close] was called but [R.close_finished] is not
determined yet. Use [R.is_closed] to prevent this race. *)ifR.has_close_started(Set_once.get_exnt.resource[%here])then`Resource_closedelse(set_statet(`In_use_until(Ivar.create()));`Ok(unsafe_immediatet~f));;letcreate?open_timeoutconfigkeyargs~with_~log_error=lett={uid=Uid.create();key;args;resource=Set_once.create();state=`In_use_until(Ivar.create());in_state_since=Time_ns.now();config;remaining_uses=config.Config.max_resource_reuse;close_finished=Ivar.create();log_error}inletres=match%bindDeferred.Or_error.try_with_join(fun()->matchopen_timeoutwith|None->R.open_keyargs|Sometimeout->letresource_ivar=Ivar.create()in(match%mapClock_ns.with_timeouttimeout(let%mapr=R.open_keyargsinIvar.fillresource_ivarr;r)with|`Resultr->r|`Timeout->(* In case we timeout, make sure we cleanup after ourselves *)(Ivar.readresource_ivar>>>function|Error_->()|Okr->don't_wait_for(R.closer));Or_error.error_string"Exceeded open timeout while creating resource"))with|Okres->(* A call to [close_and_flush] might have occurred *)ift.remaining_uses>0then(don't_wait_for(let%bind()=R.close_finishedresinclose_when_idlet);Set_once.set_exnt.resource[%here]res;let%mapr=unsafe_immediatet~f:with_in`Ok(key,r))elsereturn`Cache_is_closed|Errorerr->(* Ensure [close_finished] gets filled *)don't_wait_for(closet);return(`Error_opening_resource(key,err))int,res;;end(* Limit the number of concurrent [Resource.t]s globally *)moduleGlobal_resource_limiter:sigtypetvalcreate:Config.t->t(* create a single resource, and block a slot until the resource has been cleaned
up *)valcreate_resource:?open_timeout:Time_ns.Span.t->t->R.Key.t->R.Common_args.t->with_:(R.t->'aDeferred.t)->log_error:(string->unit)->[`OkofResource.t*[>`OkofR.Key.t*'a|Delayed_failures.t]Deferred.t|`Cache_is_closed|`No_resource_available_untilofunitDeferred.t]valclose_and_flush:t->unitDeferred.tend=structtypet={config:Config.t;throttle:unitThrottle.t}letcreateconfig={config;throttle=Throttle.create~continue_on_error:true~max_concurrent_jobs:config.max_resources};;letcreate_resource?open_timeout{config;throttle}keyargs~with_~log_error=ifThrottle.is_deadthrottlethen`Cache_is_closedelseifThrottle.num_jobs_runningthrottle<Throttle.max_concurrent_jobsthrottlethen(assert(Throttle.num_jobs_waiting_to_startthrottle=0);letr,v=Resource.create?open_timeoutconfigkeyargs~with_~log_errorindon't_wait_for(Throttle.enqueuethrottle(fun()->Resource.close_finishedr));`Ok(r,v))else`No_resource_available_until(Deferred.any[Throttle.capacity_availablethrottle;Throttle.cleanedthrottle]);;letclose_and_flusht=Throttle.killt.throttle;Throttle.cleanedt.throttle;;end(* Limit the number of concurrent [Resource.t]s locally *)moduleResource_list:sigtypetvalcreate:Config.t->Global_resource_limiter.t->R.Key.t->R.Common_args.t->log_error:(string->unit)->tvalstatus:t->Status.Resource_list.t(* [is_empty] is true iff there are no currently connected/connecting resources. *)valis_empty:t->bool(* [close_and_flush'] will mark this resource list for removal and start tearing down
all its resources. *)valclose_and_flush':t->unit(* [close_finished] becomes determined after [close_and_flush'] was called and all
resources have been closed. *)valclose_finished:t->unitDeferred.t(* [find_available_resource] and [create_resource] can be used to bypass [enqueue] in
the case where there is an idle resource or an available slot. *)valfind_available_resource:t->f:(R.t->'aDeferred.t)->[`Immediateof'aDeferred.t|`None_untilofunitDeferred.t]valcreate_resource:?open_timeout:Time_ns.Span.t->t->f:(R.t->'aDeferred.t)->[>`OkofR.Key.t*'a|Delayed_failures.t]Deferred.toptionvalenqueue:t->'aJob.t->unitend=structtypejob=T:'aJob.t->jobtypet={config:Config.t;key:R.Key.t;args:R.Common_args.t;global_resource_limiter:Global_resource_limiter.t;mutableresources:Resource.tlist;waiting_jobs:jobQueue.t;trigger_queue_manager:unitMvar.Read_write.t;mutableclose_started:bool;close_finished:unitIvar.t;log_error:string->unit}letstatust=letmax_time_on_queue=Queue.peekt.waiting_jobs|>Option.map~f:(fun(Tjob)->Time_ns.diff(Time_ns.now())(Job.created_atjob))in{Status.Resource_list.key=t.key;resources=List.mapt.resources~f:Resource.status;queue_length=Queue.lengtht.waiting_jobs;max_time_on_queue};;letfind_available_resourcet~f=letrecloop~until=function|[]->`None_until(Deferred.anyuntil)|r::rs->(matchResource.immediater~fwith|`Okr->`Immediater|`Resource_unavailable_untilu->loop~until:(u::until)rs|`Resource_closed->loop~untilrs)inloopt.resources~until:[];;letcreate_resource?open_timeoutt~f=ifList.lengtht.resources>=t.config.max_resources_per_idthenNoneelse(matchGlobal_resource_limiter.create_resource?open_timeoutt.global_resource_limitert.keyt.args~with_:f~log_error:t.log_errorwith|`Cache_is_closed->None|`No_resource_available_untilu->(* Trigger when there is global capacity available *)uponu(Mvar.sett.trigger_queue_manager);None|`Ok(resource,response)->t.resources<-resource::t.resources;(Resource.close_finishedresource>>>fun()->t.resources<-List.filtert.resources~f:(funr->not(Resource.equalresourcer));(* Trigger that capacity is now available *)Mvar.sett.trigger_queue_manager();ift.close_started&&List.is_emptyt.resourcesthenIvar.fillt.close_finished());(* Trigger when this resource is now available. This is needed because
[create_resource] is called from outside this module *)uponresponse(fun_->Mvar.sett.trigger_queue_manager());Someresponse);;letallocate_resourcest=letrecloop()=matchQueue.peekt.waiting_jobswith|None->()|Some(Tjob)->(* Skip if this job has a result already *)ifJob.has_resultjobthen(let(_:_)=Queue.dequeue_exnt.waiting_jobsinloop())else(matchfind_available_resourcet~f:(Job.fjob)with|`Immediateresult->Job.mark_result_from_available_resourcejobt.key~result;let(_:_)=Queue.dequeue_exnt.waiting_jobsinloop()|`None_untiluntil->(* Trigger when a resource is available *)uponuntil(Mvar.sett.trigger_queue_manager);(matchcreate_resource?open_timeout:(Job.open_timeoutjob)t~f:(Job.fjob)with|Someresult->Job.mark_result_from_resource_creationjob~result;let(_:_)=Queue.dequeue_exnt.waiting_jobsinloop()|None->()))inloop();;letstart_background_resource_allocatort=letrecloop()=let%bind()=Mvar.taket.trigger_queue_managerinift.close_startedthen(Queue.itert.waiting_jobs~f:(fun(Tjob)->Job.mark_cache_closedjob);Queue.cleart.waiting_jobs;Deferred.unit)else(allocate_resourcest;loop())inloop();;letcreateconfigglobal_resource_limiterkeyargs~log_error=lett={config;key;args;global_resource_limiter;resources=[];waiting_jobs=Queue.create();trigger_queue_manager=Mvar.create();close_started=false;close_finished=Ivar.create();log_error}indon't_wait_for(start_background_resource_allocatort);t;;letenqueuetjob=Queue.enqueuet.waiting_jobs(Tjob);(* Trigger that a new job is on the queue *)Mvar.sett.trigger_queue_manager();upon(Job.resultjob)(fun_->Queue.filter_inplacet.waiting_jobs~f:(fun(Tjob')->not(phys_samejobjob'));(* Trigger that a resource is now available *)Mvar.sett.trigger_queue_manager());;letis_emptyt=List.is_emptyt.resources&&Queue.is_emptyt.waiting_jobsletclose_finishedt=Ivar.readt.close_finishedletclose_and_flush't=ifnott.close_startedthen(t.close_started<-true;ifList.is_emptyt.resourcesthenIvar.fillt.close_finished()else(Mvar.sett.trigger_queue_manager();List.itert.resources~f:(funr->don't_wait_for(Resource.close_when_idler))));;endtypet={config:Config.t;global_resource_limiter:Global_resource_limiter.t;cache:Resource_list.tR.Key.Table.t;args:R.Common_args.t;mutablenum_jobs_in_cache:int;mutableclose_started:bool;close_finished:unitIvar.t;log_error:string->unit}letstatust=letresource_lists=List.map(Hashtbl.datat.cache)~f:Resource_list.statusin{Status.resource_lists;num_jobs_in_cache=t.num_jobs_in_cache};;letget_resource_listtkey=Hashtbl.find_or_addt.cachekey~default:(fun()->Resource_list.createt.configt.global_resource_limiterkeyt.args~log_error:t.log_error);;letfind_any_available_resourcetkeys~f=List.find_mapkeys~f:(funkey->letres_list=get_resource_listtkeyinmatchResource_list.find_available_resourceres_list~fwith|`Immediateres->Some(key,res)|`None_until_->None);;letcreate_any_resource?open_timeouttkeys~f=List.find_mapkeys~f:(funkey->letres_list=get_resource_listtkeyinResource_list.create_resource?open_timeoutres_list~f);;letenqueue_all?open_timeoutt~give_upkeys~f=letjob=Job.create?open_timeout~give_up~finList.iterkeys~f:(funkey->letres_list=get_resource_listtkeyinResource_list.enqueueres_listjob);Job.resultjob;;letwith_any'?open_timeout?(give_up=Deferred.never())tkeys~f=letfresource=f(R.underlyingresource)int.num_jobs_in_cache<-t.num_jobs_in_cache+1;letresult=ift.close_startedthenreturn`Cache_is_closedelse(matchfind_any_available_resourcetkeys~fwith|Some(args,res)->let%mapres=resin`Ok(args,res)|None->(matchcreate_any_resource?open_timeouttkeys~fwith|Someres->res|None->ifDeferred.is_determinedgive_upthenreturn`Gave_up_waiting_for_resourceelseenqueue_all?open_timeout~give_uptkeys~f))inuponresult(fun_->t.num_jobs_in_cache<-t.num_jobs_in_cache-1);result;;letwith_any?open_timeout?give_uptkeys~f=match%mapwith_any'?open_timeoutt?give_upkeys~fwith|`Okargs_and_res->Okargs_and_res|`Error_opening_resource(key,err)->lettag=sprintf!"Error creating required resource: %{sexp:R.Key.t}"keyinError(Error.tag~tagerr)|`Cache_is_closed->Or_error.error_string"Cache is closed"|`Gave_up_waiting_for_resource->Or_error.error_string"Gave up waiting for resource";;letwith_?open_timeout?give_uptkey~f=match%mapwith_any?open_timeout?give_upt[key]~fwith|Ok(_args,res)->Okres|Errore->Errore;;letwith_'?open_timeout?give_uptkey~f=match%mapwith_any'?open_timeout?give_upt[key]~fwith|`Ok(_args,res)->`Okres|`Error_opening_resource(_args,err)->`Error_opening_resourceerr|`Cache_is_closed->`Cache_is_closed|`Gave_up_waiting_for_resource->`Gave_up_waiting_for_resource;;letwith_any_loop?open_timeout?give_uptkeys~f=letrecloop~failed=function|[]->return(`Error_opening_all_resources(List.revfailed))|keys->(match%bindwith_any'?open_timeout?give_uptkeys~fwith|(`Ok_|`Gave_up_waiting_for_resource|`Cache_is_closed)asres->returnres|`Error_opening_resource(failed_key,e)->letremaining=List.filterkeys~f:(funkey->not(R.Key.equalkeyfailed_key))inloop~failed:((failed_key,e)::failed)remaining)inloop~failed:[]keys;;letinit~config~log_errorargs=lett={config;global_resource_limiter=Global_resource_limiter.createconfig;cache=R.Key.Table.create();args;num_jobs_in_cache=0;close_started=false;close_finished=Ivar.create();log_error}inClock_ns.every~stop:(Ivar.readt.close_finished)config.idle_cleanup_after(fun()->Hashtbl.filter_inplacet.cache~f:(fund->ifResource_list.is_emptydthen(Resource_list.close_and_flush'd;false)elsetrue));t;;letclose_and_flusht=ifnott.close_startedthen(t.close_started<-true;let%map()=Deferred.all_unit(Global_resource_limiter.close_and_flusht.global_resource_limiter::List.map(Hashtbl.datat.cache)~f:(funr->Resource_list.close_and_flush'r;Resource_list.close_finishedr))inIvar.fillt.close_finished())elseIvar.readt.close_finished;;letconfigt=t.configletclose_startedt=t.close_startedletclose_finishedt=Ivar.readt.close_finishedendmoduleMake(R:Resource.S)=structincludeMake_wrapped(structincludeRtyperesource=tletunderlyingt=tend)endmoduleMake_simple(R:Resource.Simple)=structincludeMake_wrapped(structincludeResource.Make_simple(R)end)end