123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2022 Nomadic Labs, <contact@nomadic-labs.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)(* Main functor that is exported to the outside *)moduleMake(R:Rache.TRANSFER):Sigs.MAPwithtypekey=R.key=structtypekey=R.keytype'apromise={p:'aLwt.t;mutablebinded:bool;}letcancel{p;binded}=ifbindedthen()elseLwt.cancelpletof_lwtp={p;binded=false}letto_lwt{p;_}=ptype'at='apromiseR.tletcreaten=(* Promises are resources which are destroyed by cancelation *)R.create(fun_kp->cancelp)nletputckr=R.putck(of_lwtr)lettakeck=Option.mapto_lwt(R.takeck)lettake_allc=List.map(fun(k,r)->(k,to_lwtr))(R.take_allc)lettake_somecf=List.map(fun(k,r)->(k,to_lwtr))(* You can't meaningfully test on the promise so we only offer the key *)(R.take_somec(funk_p->fk))letbindckf=R.borrowck(funr->r.binded<-true;letp=to_lwtrinletq=Lwt.bindpfin(* NOTE: you might be thinking that the code is missing
[r.binded <- false] to signify the promise being returned. This
could happen when [q] resolves (which in some sense is when the
function [f] finally "returns"). (Or actually when [f] is called.)
However, note that in this (these) cases, the promise [p] is already
resolved and [Lwt.cancel] is a no-op. *)q)letbind_or_putckmakef=matchbindckfwith|Someq->q|None->letp=makekinletr=of_lwtpinR.putckr;r.binded<-true;letq=Lwt.bindpfinqletfoldfcacc=R.fold(funkracc->r.binded<-true;letopenLwt.Syntaxinlet*acc=accinlet*x=to_lwtrinfkxacc)c(Lwt.returnacc)letfold_oldest_firstfcacc=R.fold_oldest_first(funkracc->r.binded<-true;letopenLwt.Syntaxinlet*acc=accinlet*x=to_lwtrinfkxacc)c(Lwt.returnacc)letremoveck=R.removeckletclearc=R.clearcletfiltercf=R.filterc(funk_r->fk)letlengthc=R.lengthcletcapacityc=R.capacitycendmoduleMake_option(R:Rache.TRANSFER):Sigs.MAP_OPTIONwithtypekey=R.key=structtypekey=R.keytype'apromise={p:'aoptionLwt.t;mutablebinded:bool;on_take:unitLwt.t;resolve_on_take:unitLwt.u;}letcancel{p;binded;_}=ifbindedthen()elseLwt.cancelpletof_lwtp=let(on_take,resolve_on_take)=Lwt.wait()in{p;binded=false;on_take;resolve_on_take;}letto_lwt{p;_}=pletmonitor_noneck{p;on_take;_}=Lwt.on_failure(Lwt.choose[on_take;(* on_take is successfully resolved when the promise is taken
out of the cache so this callback is called and can be
garbage collected. *)(Lwt.try_bind(fun()->p)(functionNone->raiseExit|Some_->Lwt.return_unit)(fun(_:exn)->Lwt.return_unit))])(fun(_:exn)->R.removeck)type'at='apromiseR.tletcreaten=(* Promises are resources which are destroyed by cancelation *)R.create(fun_kp->cancelp)nletputckp=matchLwt.statepwith|Lwt.ReturnNone->()|_->letr=of_lwtpinmonitor_noneckr;R.putckrlettakeck=matchR.takeckwith|None->None|Somer->Lwt.wakeupr.resolve_on_take();Some(to_lwtr)lettake_allc=List.map(fun(k,r)->Lwt.wakeupr.resolve_on_take();(k,to_lwtr))(R.take_allc)lettake_somecf=List.map(fun(k,r)->Lwt.wakeupr.resolve_on_take();(k,to_lwtr))(* You can't meaningfully test on the promise so we only offer the key *)(R.take_somec(funk_p->fk))letbindckf=R.borrowck(funr->r.binded<-true;letp=to_lwtrinletq=Lwt.bindpfin(* NOTE: you might be thinking that the code is missing
[r.binded <- false] to signify the promise being returned. This
could happen when [q] resolves (which in some sense is when the
function [f] finally "returns"). (Or actually when [f] is called.)
However, note that in this (these) cases, the promise [p] is already
resolved and [Lwt.cancel] is a no-op. *)q)letbind_or_putckmakef=matchbindckfwith|Someq->q|None->letp=makekinmatchLwt.statepwith|Lwt.ReturnNone->fNone|_->letr=of_lwtpinmonitor_noneckr;R.putckr;r.binded<-true;letq=Lwt.bindpfinqletfoldfcacc=R.fold(funkracc->r.binded<-true;letopenLwt.Syntaxinlet*acc=accinlet*x=to_lwtrinmatchxwith|None->Lwt.returnacc|Somex->fkxacc)c(Lwt.returnacc)letfold_oldest_firstfcacc=R.fold_oldest_first(funkracc->r.binded<-true;letopenLwt.Syntaxinlet*acc=accinlet*x=to_lwtrinmatchxwith|None->Lwt.returnacc|Somex->fkxacc)c(Lwt.returnacc)letremoveck=R.removeckletclearc=R.clearcletfiltercf=R.filterc(funk_r->fk)letlengthc=R.lengthcletcapacityc=R.capacitycendmoduleMake_result(R:Rache.TRANSFER):Sigs.MAP_RESULTwithtypekey=R.key=structtypekey=R.keytype('a,'err)promise={p:('a,'err)resultLwt.t;mutablebinded:bool;on_take:unitLwt.t;resolve_on_take:unitLwt.u;}letcancel{p;binded;_}=ifbindedthen()elseLwt.cancelpletof_lwtp=let(on_take,resolve_on_take)=Lwt.wait()in{p;binded=false;on_take;resolve_on_take;}letto_lwt{p;_}=pletmonitor_errorck{p;on_take;_}=Lwt.on_failure(Lwt.choose[on_take;(* on_take is successfully resolved when the promise is taken
out of the cache so this callback is called and can be
garbage collected. *)(Lwt.try_bind(fun()->p)(functionError_->raiseExit|Ok_->Lwt.return_unit)(fun(_:exn)->Lwt.return_unit))])(fun(_:exn)->R.removeck)type('a,'err)t=('a,'err)promiseR.tletcreaten=(* Promises are resources which are destroyed by cancelation *)R.create(fun_kp->cancelp)nletputckp=matchLwt.statepwith|Lwt.Return(Error_)->()|_->letr=of_lwtpinmonitor_errorckr;R.putckrlettakeck=matchR.takeckwith|None->None|Somer->Lwt.wakeupr.resolve_on_take();Some(to_lwtr)lettake_allc=List.map(fun(k,r)->Lwt.wakeupr.resolve_on_take();(k,to_lwtr))(R.take_allc)lettake_somecf=List.map(fun(k,r)->Lwt.wakeupr.resolve_on_take();(k,to_lwtr))(* You can't meaningfully test on the promise so we only offer the key *)(R.take_somec(funk_p->fk))letbindckf=R.borrowck(funr->r.binded<-true;letp=to_lwtrinletq=Lwt.bindpfin(* NOTE: you might be thinking that the code is missing
[r.binded <- false] to signify the promise being returned. This
could happen when [q] resolves (which in some sense is when the
function [f] finally "returns"). (Or actually when [f] is called.)
However, note that in this (these) cases, the promise [p] is already
resolved and [Lwt.cancel] is a no-op. *)q)letbind_or_putckmakef=matchbindckfwith|Someq->q|None->letp=makekinmatchLwt.statepwith|Lwt.Return(Error_aserror)->ferror|_->letr=of_lwtpinmonitor_errorckr;R.putckr;r.binded<-true;letq=Lwt.bindpfinqletfoldfcacc=R.fold(funkracc->r.binded<-true;letopenLwt.Syntaxinlet*acc=accinlet*x=to_lwtrinmatchxwith|Error_->Lwt.returnacc|Okx->fkxacc)c(Lwt.returnacc)letfold_oldest_firstfcacc=R.fold_oldest_first(funkracc->r.binded<-true;letopenLwt.Syntaxinlet*acc=accinlet*x=to_lwtrinmatchxwith|Error_->Lwt.returnacc|Okx->fkxacc)c(Lwt.returnacc)letremoveck=R.removeckletclearc=R.clearcletfiltercf=R.filterc(funk_r->fk)letlengthc=R.lengthcletcapacityc=R.capacitycend