123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)openLwt.Infixtype'aevent='aReact.eventtype'asignal='aReact.signalmoduleE=structincludeReact.E(* +---------------------------------------------------------------+
| Lwt-specific utilities |
+---------------------------------------------------------------+ *)letfinalisef_=f()letwith_finaliserfevent=letr=ref()inGc.finalise(finalisef)r;map(funx->ignore(Sys.opaque_identityr);x)eventletnextev=letwaiter,wakener=Lwt.task()inletev=map(funx->Lwt.wakeupwakenerx)(onceev)inLwt.on_cancelwaiter(fun()->stopev);waiterletlimitfe=(* Thread which prevents [e] from occurring while it is sleeping *)letlimiter=refLwt.return_unitin(* The occurrence that is delayed until the limiter returns. *)letdelayed=refNonein(* The resulting event. *)letevent,push=create()inletiter=fmap(funx->ifLwt.is_sleeping!limiterthenbegin(* The limiter is sleeping, we queue the event for later
delivering. *)match!delayedwith|Somecell->(* An occurrence is already queued, replace it. *)cell:=x;None|None->letcell=refxindelayed:=Somecell;Lwt.on_success!limiter(fun()->ifLwt.is_sleeping!limiterthendelayed:=Noneelseletx=!cellindelayed:=None;limiter:=f();pushx);Noneendelsebegin(* Set the limiter for future events. *)limiter:=f();(* Send the occurrence now. *)pushx;Noneend)einselect[iter;event]letcancel_threadt()=Lwt.canceltletfromf=letevent,push=create()inletrecloop()=f()>>=funx->pushx;loop()inlett=Lwt.pause()>>=loopinwith_finaliser(cancel_threadt)eventletto_streamevent=letstream,push,set_ref=Lwt_stream.create_with_reference()inset_ref(map(funx->push(Somex))event);streamletof_streamstream=letevent,push=create()inlett=Lwt.pause()>>=fun()->Lwt_stream.iter(funv->trypushvwithexn->!Lwt.async_exception_hookexn)streaminwith_finaliser(cancel_threadt)eventletdelaythread=matchLwt.pollthreadwith|Somee->e|None->letevent,send=create()inLwt.on_successthread(fune->sende;stopevent);switchnevereventletkeeped=ref[]letkeepe=keeped:=mapignoree::!keeped(* +---------------------------------------------------------------+
| Event transformations |
+---------------------------------------------------------------+ *)letrun_pe=letevent,push=create()inletiter=fmap(funt->Lwt.on_successt(funv->pushv);None)einselect[iter;event]letrun_se=letevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funt->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->t))(funv->pushv);None)einselect[iter;event]letmap_pfe=letevent,push=create()inletiter=fmap(funx->Lwt.on_success(fx)(funv->pushv);None)einselect[iter;event]letmap_sfe=letevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(funv->pushv);None)einselect[iter;event]letapp_pefe=letevent,push=create()inletiter=fmap(fun(f,x)->Lwt.on_success(fx)(funv->pushv);None)(app(map(funfx->(f,x))ef)e)inselect[iter;event]letapp_sefe=letevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(fun(f,x)->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(funv->pushv);None)(app(map(funfx->(f,x))ef)e)inselect[iter;event]letfilter_pfe=letevent,push=create()inletiter=fmap(funx->Lwt.on_success(fx)(functiontrue->pushx|false->());None)einselect[iter;event]letfilter_sfe=letevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(functiontrue->pushx|false->());None)einselect[iter;event]letfmap_pfe=letevent,push=create()inletiter=fmap(funx->Lwt.on_success(fx)(functionSomex->pushx|None->());None)einselect[iter;event]letfmap_sfe=letevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(functionSomex->pushx|None->());None)einselect[iter;event]letdiff_sfe=letprevious=refNoneinletevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funx->match!previouswith|None->previous:=Somex;None|Somey->previous:=Somex;Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fxy))(funv->pushv);None)einselect[iter;event]letaccum_sefacc=letacc=refaccinletevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funf->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->f!acc))(funx->acc:=x;pushx);None)efinselect[iter;event]letfold_sfacce=letacc=refaccinletevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->f!accx))(funx->acc:=x;pushx);None)einselect[iter;event]letrecrev_foldfacc=function|[]->Lwt.returnacc|x::l->rev_foldfaccl>>=funacc->faccxletmerge_sfaccel=letevent,push=create()inletmutex=Lwt_mutex.create()inletiter=fmap(funl->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->rev_foldfaccl))(funv->pushv);None)(merge(funaccx->x::acc)[]el)inselect[iter;event]endmoduleS=structincludeReact.S(* +---------------------------------------------------------------+
| Lwt-specific utilities |
+---------------------------------------------------------------+ *)letfinalisef_=f()letwith_finaliserfsignal=letr=ref()inGc.finalise(finalisef)r;map(funx->ignore(Sys.opaque_identityr);x)signalletlimit?eqfs=(* Thread which prevent [s] to changes while it is sleeping *)letlimiter=ref(f())in(* The occurrence that is delayed until the limiter returns. *)letdelayed=refNonein(* The resulting event. *)letevent,push=E.create()inletiter=E.fmap(funx->ifLwt.is_sleeping!limiterthenbegin(* The limiter is sleeping, we queue the event for later
delivering. *)match!delayedwith|Somecell->(* An occurrence is already queued, replace it. *)cell:=x;None|None->letcell=refxindelayed:=Somecell;Lwt.on_success!limiter(fun()->ifLwt.is_sleeping!limiterthendelayed:=Noneelseletx=!cellindelayed:=None;limiter:=f();pushx);Noneendelsebegin(* Set the limiter for future events. *)limiter:=f();(* Send the occurrence now. *)pushx;Noneend)(changess)inhold?eq(values)(E.select[iter;event])letkeeped=ref[]letkeeps=keeped:=mapignores::!keeped(* +---------------------------------------------------------------+
| Signal transformations |
+---------------------------------------------------------------+ *)letrun_s?eqs=letevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funt->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->t))(funv->pushv);None)(changess)inLwt_mutex.with_lockmutex(fun()->values)>>=funx->Lwt.return(hold?eqx(E.select[iter;event]))letmap_s?eqfs=letevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(funv->pushv);None)(changess)inLwt_mutex.with_lockmutex(fun()->f(values))>>=funx->Lwt.return(hold?eqx(E.select[iter;event]))letapp_s?eqsfs=letevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(fun(f,x)->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(funv->pushv);None)(E.app(E.map(funfx->(f,x))(changessf))(changess))inLwt_mutex.with_lockmutex(fun()->(valuesf)(values))>>=funx->Lwt.return(hold?eqx(E.select[iter;event]))letfilter_s?eqfis=letevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(functiontrue->pushx|false->());None)(changess)inletx=valuesinLwt_mutex.with_lockmutex(fun()->fx)>>=function|true->Lwt.return(hold?eqx(E.select[iter;event]))|false->Lwt.return(hold?eqi(E.select[iter;event]))letfmap_s?eqfis=letevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(functionSomex->pushx|None->());None)(changess)inLwt_mutex.with_lockmutex(fun()->f(values))>>=function|Somex->Lwt.return(hold?eqx(E.select[iter;event]))|None->Lwt.return(hold?eqi(E.select[iter;event]))letdiff_sfs=letprevious=ref(values)inletevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funx->lety=!previousinprevious:=x;Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fxy))(funv->pushv);None)(changess)inE.select[iter;event]letsample_sfes=E.map_s(funx->fx(values))eletaccum_s?eqefi=hold?eqi(E.accum_sefi)letfold_s?eqfie=hold?eqi(E.fold_sfie)letrecrev_foldfacc=function|[]->Lwt.returnacc|x::l->rev_foldfaccl>>=funacc->faccxletmerge_s?eqfaccsl=lets=merge(funaccx->x::acc)[]slinletevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funl->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->rev_foldfaccl))(funv->pushv);None)(changess)inLwt_mutex.with_lockmutex(fun()->rev_foldfacc(values))>>=funx->Lwt.return(hold?eqx(E.select[iter;event]))letl1_s?eqfs1=map_s?eqfs1letl2_s?eqfs1s2=(* Some details about the use of [fun _ _ -> false] on
https://github.com/ocsigen/lwt/pull/893#pullrequestreview-783083496 *)map_s?eq(fun(x1,x2)->fx1x2)(l2~eq:(fun__->false)(funx1x2->(x1,x2))s1s2)letl3_s?eqfs1s2s3=map_s?eq(fun(x1,x2,x3)->fx1x2x3)(l3~eq:(fun__->false)(funx1x2x3->(x1,x2,x3))s1s2s3)letl4_s?eqfs1s2s3s4=map_s?eq(fun(x1,x2,x3,x4)->fx1x2x3x4)(l4~eq:(fun__->false)(funx1x2x3x4->(x1,x2,x3,x4))s1s2s3s4)letl5_s?eqfs1s2s3s4s5=map_s?eq(fun(x1,x2,x3,x4,x5)->fx1x2x3x4x5)(l5~eq:(fun__->false)(funx1x2x3x4x5->(x1,x2,x3,x4,x5))s1s2s3s4s5)letl6_s?eqfs1s2s3s4s5s6=map_s?eq(fun(x1,x2,x3,x4,x5,x6)->fx1x2x3x4x5x6)(l6~eq:(fun__->false)(funx1x2x3x4x5x6->(x1,x2,x3,x4,x5,x6))s1s2s3s4s5s6)(* +---------------------------------------------------------------+
| Monadic interface |
+---------------------------------------------------------------+ *)letreturn=constletbind_s?eqsf=letevent,push=E.create()inletmutex=Lwt_mutex.create()inletiter=E.fmap(funx->Lwt.on_success(Lwt_mutex.with_lockmutex(fun()->fx))(funv->pushv);None)(changess)inLwt_mutex.with_lockmutex(fun()->f(values))>>=funx->Lwt.return(switch?eq(hold~eq:(==)x(E.select[iter;event])))end