123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445[@@@ocaml.warning"+A-44-48"](* this module is a copy of Lwt_sequence from
Lwt version 17c8d5e2071f3690850a99c3ce0f2e6a79a8ac7f *)moduleSequence=struct[@@@ocaml.warning"-32"](* start copy & paste ------------------------------------------------------- *)(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)exceptionEmptytype'at={mutableprev:'at;mutablenext:'at;}type'anode={mutablenode_prev:'at;mutablenode_next:'at;mutablenode_data:'a;mutablenode_active:bool;}externalseq_of_node:'anode->'at="%identity"externalnode_of_seq:'at->'anode="%identity"(* +-----------------------------------------------------------------+
| Operations on nodes |
+-----------------------------------------------------------------+ *)letgetnode=node.node_dataletsetnodedata=node.node_data<-dataletremovenode=ifnode.node_activethenbeginnode.node_active<-false;letseq=seq_of_nodenodeinseq.prev.next<-seq.next;seq.next.prev<-seq.prevend(* +-----------------------------------------------------------------+
| Operations on sequences |
+-----------------------------------------------------------------+ *)letcreate()=letrecseq={prev=seq;next=seq}inseqletis_emptyseq=seq.next==seqletlengthseq=letrecloopcurrlen=ifcurr==seqthenlenelseletnode=node_of_seqcurrinloopnode.node_next(len+1)inloopseq.next0letadd_ldataseq=letnode={node_prev=seq;node_next=seq.next;node_data=data;node_active=true}inseq.next.prev<-seq_of_nodenode;seq.next<-seq_of_nodenode;nodeletadd_rdataseq=letnode={node_prev=seq.prev;node_next=seq;node_data=data;node_active=true}inseq.prev.next<-seq_of_nodenode;seq.prev<-seq_of_nodenode;nodelettake_lseq=ifis_emptyseqthenraiseEmptyelsebeginletnode=node_of_seqseq.nextinremovenode;node.node_dataendlettake_rseq=ifis_emptyseqthenraiseEmptyelsebeginletnode=node_of_seqseq.previnremovenode;node.node_dataendlettake_opt_lseq=ifis_emptyseqthenNoneelsebeginletnode=node_of_seqseq.nextinremovenode;Somenode.node_dataendlettake_opt_rseq=ifis_emptyseqthenNoneelsebeginletnode=node_of_seqseq.previnremovenode;Somenode.node_dataendletpeek_opt_lseq=ifis_emptyseqthenNoneelsebeginletnode=node_of_seqseq.nextinSomenode.node_dataendlettransfer_ls1s2=s2.next.prev<-s1.prev;s1.prev.next<-s2.next;s2.next<-s1.next;s1.next.prev<-s2;s1.prev<-s1;s1.next<-s1lettransfer_rs1s2=s2.prev.next<-s1.next;s1.next.prev<-s2.prev;s2.prev<-s1.prev;s1.prev.next<-s2;s1.prev<-s1;s1.next<-s1letiter_lfseq=letrecloopcurr=ifcurr!=seqthenbeginletnode=node_of_seqcurrinifnode.node_activethenfnode.node_data;loopnode.node_nextendinloopseq.nextletiter_rfseq=letrecloopcurr=ifcurr!=seqthenbeginletnode=node_of_seqcurrinifnode.node_activethenfnode.node_data;loopnode.node_prevendinloopseq.prevletiter_node_lfseq=letrecloopcurr=ifcurr!=seqthenbeginletnode=node_of_seqcurrinifnode.node_activethenfnode;loopnode.node_nextendinloopseq.nextletiter_node_rfseq=letrecloopcurr=ifcurr!=seqthenbeginletnode=node_of_seqcurrinifnode.node_activethenfnode;loopnode.node_prevendinloopseq.prevletfold_lfseqacc=letrecloopcurracc=ifcurr==seqthenaccelseletnode=node_of_seqcurrinifnode.node_activethenloopnode.node_next(fnode.node_dataacc)elseloopnode.node_nextaccinloopseq.nextaccletfold_rfseqacc=letrecloopcurracc=ifcurr==seqthenaccelseletnode=node_of_seqcurrinifnode.node_activethenloopnode.node_prev(fnode.node_dataacc)elseloopnode.node_prevaccinloopseq.prevaccletfind_node_lfseq=letrecloopcurr=ifcurr!=seqthenletnode=node_of_seqcurrinifnode.node_activetheniffnode.node_datathennodeelseloopnode.node_nextelseloopnode.node_nextelseraiseNot_foundinloopseq.nextletfind_node_rfseq=letrecloopcurr=ifcurr!=seqthenletnode=node_of_seqcurrinifnode.node_activetheniffnode.node_datathennodeelseloopnode.node_prevelseloopnode.node_prevelseraiseNot_foundinloopseq.prevletfind_node_opt_lfseq=trySome(find_node_lfseq)withNot_found->Noneletfind_node_opt_rfseq=trySome(find_node_rfseq)withNot_found->None(* end copy & paste --------------------------------------------------------- *)end[@@@ocaml.warning"+32"]openLwt.Infixtype'at={create:unit->'aLwt.t;(* Create a new pool member. *)check:exn->'a->boolLwt.t;(* Check validity of a pool member when use resulted in failed promise. *)validate:'a->boolLwt.t;(* Validate an existing free pool member before use. *)dispose:'a->unitLwt.t;(* Dispose of a pool member. *)cleared:boolrefref;(* Have the current pool elements been cleared out? *)mutablemax:int;(* Size of the pool. *)mutablecount:int;(* Number of elements in the pool. *)list:'aQueue.t;(* Available pool members. *)waiters:('aLwt.u*float)Sequence.t;(* Promise resolvers waiting for a free member. *)}letcreate?(validate=fun_->Lwt.return_true)?(check=fun__->Lwt.return_true)?(dispose=fun_->Lwt.return_unit)maxcreate={max;create;validate;check;dispose;cleared=ref(reffalse);count=0;list=Queue.create();waiters=Sequence.create()}letset_maxpn=p.max<-n(* Create a pool member. *)letcreate_memberp=Lwt.catch(fun()->(* Must be done before p.create to prevent other resolvers from
creating new members if the limit is reached. *)p.count<-p.count+1;p.create())(funexn->(* Creation failed, so don't increment count. *)p.count<-p.count-1;Lwt.failexn)(* Release a pool member. *)letreleasepc=matchSequence.take_opt_lp.waiterswith|Some(wakener,_)->(* A promise resolver is waiting, give it the pool member. *)Lwt.wakeup_laterwakenerc|None->(* No one is waiting, queue it. *)Queue.pushcp.listexceptionResource_limit_exceededletadd?(omit_max_check=false)pc=ifnotomit_max_check&&p.count>=p.maxthenraiseResource_limit_exceeded;p.count<-p.count+1;releasepc(* Dispose of a pool member. *)letdisposepc=p.disposec>>=fun()->p.count<-p.count-1;Lwt.return_unit(* Create a new member when one is thrown away. *)letreplace_disposedp=matchSequence.take_opt_lp.waiterswith|None->(* No one is waiting, do not create a new member to avoid
losing an error if creation fails. *)()|Some(wakener,_)->Lwt.on_any(Lwt.applyp.create())(func->Lwt.wakeup_laterwakenerc)(funexn->(* Creation failed, notify the waiter of the failure. *)Lwt.wakeup_later_exnwakenerexn)(* Verify a member is still valid before using it. *)letvalidate_and_returnpc=Lwt.try_bind(fun()->p.validatec)(function|true->Lwt.returnc|false->(* Remove this member and create a new one. *)disposepc>>=fun()->create_memberp)(fune->(* Validation failed: create a new member if at least one
resolver is waiting. *)disposepc>>=fun()->replace_disposedp;Lwt.faile)exceptionResource_invalidof{safe:bool}letadd_task_rsequence=letp,r=Lwt.task()inletnode=Sequence.add_r(r,Unix.gettimeofday())sequenceinLwt.on_cancelp(fun()->Sequence.removenode);p(* Acquire a pool member. *)letacquire~attemptsp=assert(attempts>0);letonce()=ifQueue.is_emptyp.listthen(* No more available member. *)ifp.count<p.maxthen(* Limit not reached: create a new one. *)create_memberpelse(* Limit reached: wait for a free one. *)(add_task_r[@ocaml.warning"-3"])p.waiters>>=validate_and_returnpelse(* Take the first free member and validate it. *)letc=Queue.takep.listinvalidate_and_returnpcinletreckeep_trying?(e=Resource_invalid{safe=true})attempts=ifattempts>0thenLwt.catchonce@@fune->matchewith|Resource_invalid{safe=true}->keep_trying~e(attempts-1)|e->Lwt.faileelseLwt.faileinkeep_tryingattempts(* Release a member when use resulted in failed promise if the member
is still valid. *)letcheck_and_releaseepccleared=p.checkec>>=funok->if!cleared||notokthen((* Element is not ok or the pool was cleared - dispose of it *)disposepc)else((* Element is ok - release it back to the pool *)releasepc;Lwt.return_unit)letuse?(creation_attempts=1)?(usage_attempts=1)pf=assert(usage_attempts>0);letcleared=!(p.cleared)in(* Capture the current cleared state so we can see if it changes while this
element is in use *)letrecmake_promiseattempts=ifattempts<=0thenLwt.fail@@Resource_invalid{safe=true}elseacquire~attempts:creation_attemptsp>>=func->Lwt.catch(fun()->fc>>=funres->Lwt.return(c,res))(fune->check_and_releaseepccleared>>=fun()->matchewith|Resource_invalid{safe=true}->make_promise(attempts-1)|e->Lwt.faile)inletpromise=make_promiseusage_attemptsinpromise>>=fun(c,_)->if!clearedthen((* p was cleared while promise was resolving - dispose of this element *)disposepc>>=fun()->Lwt.mapsndpromise)else(releasepc;Lwt.mapsndpromise)letclearp=letelements=Queue.fold(funlelement->element::l)[]p.listinQueue.clearp.list;(* Indicate to any currently in-use elements that we cleared the pool *)letold_cleared=!(p.cleared)inold_cleared:=true;p.cleared:=reffalse;Lwt_list.iter_s(disposep)elementsletwait_queue_lengthp=Sequence.lengthp.waitersletwait_queue_delayp=matchSequence.peek_opt_lp.waiterswith|None->0.|Some(_,d)->Unix.gettimeofday()-.d