123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138openLwt.Infix(** {1 Basic signal} *)typehandler_response=|ContinueListening|StopListeningtype'at={mutablen:int;(* how many handlers? *)mutablehandlers:('a->handler_responseLwt.t)array;mutablealive:keepalive;(* keep some signal alive *)}(** Signal of type 'a *)andkeepalive=|Keep:'at->keepalive|NotAlive:keepalivetype'asignal='atlet_exn_handler=ref(fun_->())letnop_handler_x=Lwt.returnContinueListeningletcreate()=lets={n=0;handlers=Array.make3nop_handler;alive=NotAlive;}ins(* remove handler at index i *)letremovesi=assert(s.n>0&&i>=0);ifi<s.n-1(* erase handler with the last one *)thens.handlers.(i)<-s.handlers.(s.n-1);s.handlers.(s.n-1)<-nop_handler;(* free handler *)s.n<-s.n-1;()letsendsx=letrecloopi=Lwt.catch(fun()->s.handlers.(i)x>>=function|ContinueListening->Lwt.returnfalse|StopListening->Lwt.returntrue)(funexn->!_exn_handlerexn;Lwt.returnfalse(* be conservative, keep... *))>>=funb->ifbthen(removesi;(* i-th handler is done, remove it *)loopi)elseifi<s.nthen(loop(i+1))else(Lwt.return())inloop0letonsf=(* resize handlers if needed *)ifs.n=Array.lengths.handlersthenbeginlethandlers=Array.make(s.n+4)nop_handlerinArray.blits.handlers0handlers0s.n;s.handlers<-handlersend;s.handlers.(s.n)<-f;s.n<-s.n+1leton'sf=ons(funx->fx>>=fun_->Lwt.returnContinueListening)letoncesf=ons(funx->fx>>=fun_->Lwt.returnStopListening)letpropagateab=ona(funx->sendbx>>=fun()->Lwt.returnContinueListening)(** {2 Combinators} *)letmapsignalf=letsignal'=create()in(* weak ref *)letr=Weak.create1inWeak.setr0(Somesignal');onsignal(funx->matchWeak.getr0with|None->Lwt.returnStopListening|Somesignal'->sendsignal'(fx)>>=fun()->Lwt.returnContinueListening);signal'.alive<-Keepsignal;signal'letfiltersignalp=letsignal'=create()in(* weak ref *)letr=Weak.create1inWeak.setr0(Somesignal');onsignal(funx->matchWeak.getr0with|None->Lwt.returnStopListening|Somesignal'->(ifpxthensendsignal'xelseLwt.return())>>=fun()->Lwt.returnContinueListening);signal'.alive<-Keepsignal;signal'letfilter_mapsignalf=letsignal'=create()in(* weak ref *)letr=Weak.create1inWeak.setr0(Somesignal');onsignal(funx->matchWeak.getr0with|None->Lwt.returnStopListening|Somesignal'->beginmatchfxwith|None->Lwt.return()|Somex->sendsignal'xend>>=fun()->Lwt.returnContinueListening);signal'.alive<-Keepsignal;signal'letset_exn_handlerh=_exn_handler:=h(** {2 Send-only View} *)(** Can be used only for sending *)moduleSend_ref=structtype'at='asignalletmakes=sletsend=sendend