123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161typejob=Job:float*(unit->'a)->joblet(<=)(a:float)b=Stdlib.(<=)ablet(>=)(a:float)b=Stdlib.(>=)ablet(<)(a:float)b=Stdlib.(<)ablet(>)(a:float)b=Stdlib.(>)abmoduleTaskHeap=CCHeap.Make(structtypet=jobletleq(Job(f1,_))(Job(f2,_))=f1<=f2end)exceptionStoppedtypet={mutablestop:bool;mutabletasks:TaskHeap.t;mutableexn_handler:exn->unit;t_mutex:Mutex.t;fifo_in:Unix.file_descr;fifo_out:Unix.file_descr;}letset_exn_handlertimerf=timer.exn_handler<-fletstandby_wait=10.(* when no task is scheduled, this is the amount of time that is waited
in a row for something to happen. This is also the maximal delay
between the call to {!stop} and the actual termination of the
thread. *)letepsilon=0.0001(* accepted time diff for actions. *)letwith_lock_tf=Mutex.lockt.t_mutex;tryletx=ftinMutex.unlockt.t_mutex;xwithe->Mutex.unlockt.t_mutex;raiseetypecommand=Quit|Run:(unit->_)->command|Waitoffloatletpop_task_t=lettasks,_=TaskHeap.take_exnt.tasksint.tasks<-tasksletcall_timerf=tryignore(f())withe->timer.exn_handlere(* check next task *)letnext_task_timer=matchTaskHeap.find_mintimer.taskswith|_whentimer.stop->Quit|None->Waitstandby_wait|Some(Job(time,f))->letnow=Unix.gettimeofday()inifnow+.epsilon>timethen((* now! *)pop_task_timer;Runf)elseWait(time-.now)(* The main thread function: wait for next event, run it, and loop *)letservetimer=letbuf=Bytes.make1'_'in(* acquire lock, call [process_task] and do as it commands *)letrecnext()=matchwith_lock_timernext_task_with|Quit->()|Runf->call_timerf;(* call outside of any lock *)next()|Waitdelay->waitdelay(* wait for [delay] seconds, or until something happens on [fifo_in] *)andwaitdelay=letread=Thread.wait_timed_readtimer.fifo_indelayin(* remove char from fifo, so that next write can happen *)ifreadthenignore(Unix.readtimer.fifo_inbuf01);next()innext()letnop_handler__=()letcreate()=letfifo_in,fifo_out=Unix.pipe()inlettimer={stop=false;tasks=TaskHeap.empty;exn_handler=nop_handler_;t_mutex=Mutex.create();fifo_in;fifo_out;}in(* start a thread to process tasks *)let_t=Thread.createservetimerintimerletunderscore_=Bytes.make1'_'(* awake the thread *)letawaken_timer=ignore(Unix.single_writetimer.fifo_outunderscore_01)(** [at s t ~f] will run [f ()] at the Unix echo [t] *)letattimertime~f=iftimer.stopthenraiseStopped;letnow=Unix.gettimeofday()inifnow>=timethencall_timerfelsewith_lock_timer(funtimer->iftimer.stopthenraiseStopped;(* time of the next scheduled event *)letnext_time=matchTaskHeap.find_mintimer.taskswith|None->max_float|Some(Job(d,_))->din(* insert task *)timer.tasks<-TaskHeap.insert(Job(time,f))timer.tasks;(* see if the timer thread needs to be awaken earlier *)iftime<next_timethenawaken_timer)letaftertimerdelay~f=assert(delay>=0.);letnow=Unix.gettimeofday()inattimer(now+.delay)~fexceptionExitEveryletevery?delaytimerd~f=letrecrun()=tryignore(f());schedule()withExitEvery->()(* stop *)andschedule()=aftertimerd~f:runinmatchdelaywith|None->run()|Somed->aftertimerd~f:runletactivetimer=nottimer.stop(** Stop the given timer, cancelling pending tasks *)letstoptimer=with_lock_timer(funtimer->ifnottimer.stopthen(timer.stop<-true;(* empty heap of tasks *)timer.tasks<-TaskHeap.empty;(* tell the thread to stop *)awaken_timer))