123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203(* This file is free software, part of containers. See file "license" for more details. *)(** {1 Event timer} *)typejob=|Job:float*(unit->'a)->joblet(<=)(a:float)b=Stdlib.(<=)ablet(>=)(a:float)b=Stdlib.(>=)ablet(<)(a:float)b=Stdlib.(<)ablet(>)(a:float)b=Stdlib.(>)abmoduleTaskHeap=CCHeap.Make(structtypet=jobletleq(Job(f1,_))(Job(f2,_))=f1<=f2end)exceptionStoppedtypet={mutablestop:bool;mutabletasks:TaskHeap.t;mutableexn_handler:(exn->unit);t_mutex:Mutex.t;fifo_in:Unix.file_descr;fifo_out:Unix.file_descr;}letset_exn_handlertimerf=timer.exn_handler<-fletstandby_wait=10.(* when no task is scheduled, this is the amount of time that is waited
in a row for something to happen. This is also the maximal delay
between the call to {!stop} and the actual termination of the
thread. *)letepsilon=0.0001(* accepted time diff for actions. *)letwith_lock_tf=Mutex.lockt.t_mutex;tryletx=ftinMutex.unlockt.t_mutex;xwithe->Mutex.unlockt.t_mutex;raiseetypecommand=|Quit|Run:(unit->_)->command|Waitoffloatletpop_task_t=lettasks,_=TaskHeap.take_exnt.tasksint.tasks<-tasksletcall_timerf=tryignore(f())withe->timer.exn_handlere(* check next task *)letnext_task_timer=matchTaskHeap.find_mintimer.taskswith|_whentimer.stop->Quit|None->Waitstandby_wait|SomeJob(time,f)->letnow=Unix.gettimeofday()inifnow+.epsilon>timethen((* now! *)pop_task_timer;Runf)elseWait(time-.now)(* The main thread function: wait for next event, run it, and loop *)letservetimer=letbuf=Bytes.make1'_'in(* acquire lock, call [process_task] and do as it commands *)letrecnext()=matchwith_lock_timernext_task_with|Quit->()|Runf->call_timerf;(* call outside of any lock *)next()|Waitdelay->waitdelay(* wait for [delay] seconds, or until something happens on [fifo_in] *)andwaitdelay=letread=Thread.wait_timed_readtimer.fifo_indelayin(* remove char from fifo, so that next write can happen *)ifreadthenignore(Unix.readtimer.fifo_inbuf01);next()innext()letnop_handler__=()letcreate()=letfifo_in,fifo_out=Unix.pipe()inlettimer={stop=false;tasks=TaskHeap.empty;exn_handler=nop_handler_;t_mutex=Mutex.create();fifo_in;fifo_out;}in(* start a thread to process tasks *)let_t=Thread.createservetimerintimerletunderscore_=Bytes.make1'_'(* awake the thread *)letawaken_timer=ignore(Unix.single_writetimer.fifo_outunderscore_01)(** [at s t ~f] will run [f ()] at the Unix echo [t] *)letattimertime~f=iftimer.stopthenraiseStopped;letnow=Unix.gettimeofday()inifnow>=timethencall_timerfelsewith_lock_timer(funtimer->iftimer.stopthenraiseStopped;(* time of the next scheduled event *)letnext_time=matchTaskHeap.find_mintimer.taskswith|None->max_float|SomeJob(d,_)->din(* insert task *)timer.tasks<-TaskHeap.insert(Job(time,f))timer.tasks;(* see if the timer thread needs to be awaken earlier *)iftime<next_timethenawaken_timer)letaftertimerdelay~f=assert(delay>=0.);letnow=Unix.gettimeofday()inattimer(now+.delay)~fexceptionExitEveryletevery?delaytimerd~f=letrecrun()=tryignore(f());schedule()withExitEvery->()(* stop *)andschedule()=aftertimerd~f:runinmatchdelaywith|None->run()|Somed->aftertimerd~f:run(*$R
let start = Unix.gettimeofday() in
let timer = create() in
let res = CCLock.create 0 in
let sem = CCSemaphore.create 1 in
CCSemaphore.acquire 1 sem;
let stop = ref 0. in
every timer 0.1
~f:(fun () ->
if CCLock.incr_then_get res > 5 then (
stop := Unix.gettimeofday();
CCSemaphore.release 1 sem;
raise ExitEvery
));
CCSemaphore.acquire 1 sem; (* wait *)
OUnit.assert_equal ~printer:CCInt.to_string 6 (CCLock.get res);
OUnit.assert_bool "estimate delay" (abs_float (!stop -. start -. 0.5) < 0.2);
*)letactivetimer=nottimer.stop(** Stop the given timer, cancelling pending tasks *)letstoptimer=with_lock_timer(funtimer->ifnottimer.stopthen(timer.stop<-true;(* empty heap of tasks *)timer.tasks<-TaskHeap.empty;(* tell the thread to stop *)awaken_timer;))(*$R
(* scenario: n := 1; n := n*4 ; n := n+2; res := n *)
let timer = create () in
let n = CCLock.create 1 in
let res = CCLock.create 0 in
after timer 0.3
~f:(fun () -> CCLock.update n (fun x -> x+2));
ignore (Thread.create
(fun _ -> Thread.delay 0.4; CCLock.set res (CCLock.get n)) ());
after timer 0.2
~f:(fun () -> CCLock.update n (fun x -> x * 4));
Thread.delay 0.6 ;
OUnit.assert_equal 6 (CCLock.get res);
*)