123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)(* [Lwt_sequence] is deprecated – we don't want users outside Lwt using it.
However, it is still used internally by Lwt. So, briefly disable warning 3
("deprecated"), and create a local, non-deprecated alias for
[Lwt_sequence] that can be referred to by the rest of the code in this
module without triggering any more warnings. *)[@@@ocaml.warning"-3"]moduleLwt_sequence=Lwt_sequence[@@@ocaml.warning"+3"]openLwt.Infixtype'at={create:unit->'aLwt.t;(* Create a new pool member. *)check:'a->(bool->unit)->unit;(* Check validity of a pool member when use resulted in failed promise. *)validate:'a->boolLwt.t;(* Validate an existing free pool member before use. *)dispose:'a->unitLwt.t;(* Dispose of a pool member. *)cleared:boolrefref;(* Have the current pool elements been cleared out? *)max:int;(* Size of the pool. *)mutablecount:int;(* Number of elements in the pool. *)list:'aQueue.t;(* Available pool members. *)waiters:'aLwt.uLwt_sequence.t;(* Promise resolvers waiting for a free member. *)}letcreatem?(validate=fun_->Lwt.return_true)?(check=fun_f->ftrue)?(dispose=fun_->Lwt.return_unit)create={max=m;create=create;validate=validate;check=check;dispose=dispose;cleared=ref(reffalse);count=0;list=Queue.create();waiters=Lwt_sequence.create()}(* Create a pool member. *)letcreate_memberp=Lwt.catch(fun()->(* Must be done before p.create to prevent other resolvers from
creating new members if the limit is reached. *)p.count<-p.count+1;p.create())(funexn->(* Creation failed, so don't increment count. *)p.count<-p.count-1;Lwt.failexn)(* Release a pool member. *)letreleasepc=matchLwt_sequence.take_opt_lp.waiterswith|Somewakener->(* A promise resolver is waiting, give it the pool member. *)Lwt.wakeup_laterwakenerc|None->(* No one is waiting, queue it. *)Queue.pushcp.list(* Dispose of a pool member. *)letdisposepc=p.disposec>>=fun()->p.count<-p.count-1;Lwt.return_unit(* Create a new member when one is thrown away. *)letreplace_disposedp=matchLwt_sequence.take_opt_lp.waiterswith|None->(* No one is waiting, do not create a new member to avoid
losing an error if creation fails. *)()|Somewakener->Lwt.on_any(Lwt.applyp.create())(func->Lwt.wakeup_laterwakenerc)(funexn->(* Creation failed, notify the waiter of the failure. *)Lwt.wakeup_later_exnwakenerexn)(* Verify a member is still valid before using it. *)letvalidate_and_returnpc=Lwt.try_bind(fun()->p.validatec)(function|true->Lwt.returnc|false->(* Remove this member and create a new one. *)disposepc>>=fun()->create_memberp)(fune->(* Validation failed: create a new member if at least one
resolver is waiting. *)disposepc>>=fun()->replace_disposedp;Lwt.reraisee)(* Acquire a pool member. *)letacquirep=ifQueue.is_emptyp.listthen(* No more available member. *)ifp.count<p.maxthen(* Limit not reached: create a new one. *)create_memberpelse(* Limit reached: wait for a free one. *)(Lwt.add_task_r[@ocaml.warning"-3"])p.waiters>>=validate_and_returnpelse(* Take the first free member and validate it. *)letc=Queue.takep.listinvalidate_and_returnpc(* Release a member when use resulted in failed promise if the member
is still valid. *)letcheck_and_releasepccleared=letok=reffalseinp.checkc(funresult->ok:=result);ifcleared||not!okthen((* Element is not ok or the pool was cleared - dispose of it *)disposepc)else((* Element is ok - release it back to the pool *)releasepc;Lwt.return_unit)letusepf=acquirep>>=func->(* Capture the current cleared state so we can see if it changes while this
element is in use *)letcleared=!(p.cleared)inletpromise=Lwt.catch(fun()->fc)(fune->check_and_releasepc!cleared>>=fun()->Lwt.faile)inpromise>>=fun_->if!clearedthen((* p was cleared while promise was resolving - dispose of this element *)disposepc>>=fun()->promise)else(releasepc;promise)letclearp=letelements=Queue.fold(funlelement->element::l)[]p.listinQueue.clearp.list;(* Indicate to any currently in-use elements that we cleared the pool *)letold_cleared=!(p.cleared)inold_cleared:=true;p.cleared:=reffalse;Lwt_list.iter_s(disposep)elementsletwait_queue_lengthp=Lwt_sequence.lengthp.waiters