123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797open!Coreopen!ImportmoduleTime_ns=structincludeTime_nsexternalformat:float->string->string="core_time_ns_format"(* We use a more pleasant format than [Core.Time_ns.sexp_of_t],
which has to be messier for round trippability. *)letsexp_of_tt=[%sexp(format(t|>to_span_since_epoch|>Span.to_sec)"%Y-%m-%dT%H:%M:%S%z":string)];;endmoduleAlarm=structincludeTiming_wheel.Alarmletis_nullt=phys_equalt(null())endmoduleAlarm_precision=Timing_wheel.Alarm_precisionletdefault_timing_wheel_config=(* 1/8th of a millisecond alarm_precision seems sufficient to avoid having many alarms
in the same interval, which avoids quadratic insertion sort when firing alarms. And
the level bits give us levels of >1s, >1m, >1h, >1d. See test in
[../test/test_synchronous_time_source.ml]. *)Timing_wheel.Config.create~alarm_precision:Alarm_precision.(divabout_one_millisecond~pow2:3)~level_bits:(Timing_wheel.Level_bits.create_exn[13;6;6;5])();;typecallback=unit->unitmoduleId=Types.Time_source_idmoduleT1=structmoduleEvent=structmoduleStatus=structtypet=Types.Event.Status.t=|Fired(* in [fired_events], ready to run *)|Happening_periodic_event(* currently running the callback (for a periodic event) *)|Scheduled(* in the timing wheel *)|Unscheduled(* not in timing wheel or [fired_events] *)[@@derivingcompare,equal,sexp_of]lettransition_is_allowed~from~to_=matchfrom,to_with|(Fired,Happening_periodic_event(* started running callback (for a periodic event) *))|(Fired,Unscheduled(* aborted, or started running callback (for a non-periodic event) *))(* [reschedule_*] goes through an intermediate [Fired, Unscheduled] state,
so we never transition from [Fired] directly to [Scheduled]. *)|(Happening_periodic_event,Scheduled(* scheduled next iteration of a periodic event *))|Happening_periodic_event,Unscheduled(* aborted *)|Scheduled,Fired(* moved from timing wheel to [fired_events] *)|Scheduled,Unscheduled(* aborted *)|Unscheduled,Fired(* event scheduled in the past *)|Unscheduled,Scheduled(* event scheduled in the future *)->true|(Fired|Happening_periodic_event|Scheduled|Unscheduled),_->false;;endtypeevent=Types.Event.tletsexp_of_event({alarm=_;at;callback=_;execution_context=_;interval;next_fired=_;prev_fired=_;status}:event)=[%sexp{status:Status.t;at:Time_ns.t;interval:(Time_ns.Span.toption[@sexp.option])}];;moduleOption=struct(* This redefinition of [Event] is here so the type checks are right next
to [Obj.magic]s. *)moduleEvent_is_block:sigend=structopenTypesopenEventtype_t=t={(* must never be immediate *)mutablealarm:Job_or_event.tTiming_wheel.Alarm.t;mutableat:Time_ns.t;callback:unit->unit;execution_context:Execution_context.t;mutableinterval:Time_ns.Span.toption;mutablenext_fired:Option.t;mutableprev_fired:Option.t;mutablestatus:Status.t}endtypet=Types.Event.Option.t(* Using an immediate rather than a statically-allocated record here seems to
improve performance noticeably ([../bench/bin/bench_time_source.exe] benchmark
is faster by ~10ns per alarm), presumably because it avoids the expensive
parts of caml_modify. *)letnone=(Obj.magicNone:t)(* an arbitrary immediate *)letsome=(Obj.magic:Types.Event.t->t)letis_nonet=phys_equaltnoneletis_somet=not(is_nonet)letfirst_somet1t2=ifis_somet1thent1elset2letunsafe_value=(Obj.magic:t->Types.Event.t)moduleOptional_syntax=structmoduleOptional_syntax=structletis_none=is_noneletunsafe_value=unsafe_valueendendopenOptional_syntaxletsexp_of_tt=match%optionaltwith|None->[%sexp()]|Someevent->[%sexp(event:event)];;letvaluet~default=Bool.select(is_nonet)default(unsafe_valuet)letvalue_exnt=match%optionaltwith|None->raise_s[%sexp"[Synchronous_time_source.Event.Option.value_exn None]"]|Someevent->event;;letto_optiont=match%optionaltwith|None->None|Someevent->Someevent;;letof_option=function|None->none|Someevent->someevent;;endtypet=Types.Event.t={(* [alarm] is non-null iff the event is in the timing wheel. *)mutablealarm:Job_or_event.tAlarm.t;mutableat:Time_ns.t;callback:unit->unit;execution_context:Execution_context.t;(* [interval] is the period for the periodic events. *)mutableinterval:Time_ns.Span.toption;(* [next_fired] and [prev_fired] create a doubly-linked (non-circular) list of
fired events, linked via these fields. An event is added to the list when
it fires, either because it is added with a time in the past, or
because time advances. [advance_by_alarms] iterates over the events
in [next_fired] and runs them, emptying the list. [none] is used to
indicate the end of the linked list of fired events. *)mutablenext_fired:Option.t;mutableprev_fired:Option.t;mutablestatus:Status.t}[@@derivingfields]letsexp_of_t=[%sexp_of:event]letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~alarm:(check(funalarm->[%test_result:bool](Alarm.is_nullalarm)~expect:(matcht.statuswith|Fired|Happening_periodic_event|Unscheduled->true|Scheduled->false)))~at:ignore~callback:ignore~execution_context:ignore~interval:ignore~next_fired:(check(funnext_fired->match%optional(next_fired:Option.t)with|None->(* [next_fired] can be [None] even if the event status is Fired, assuming
it's at the end of the fired events list *)()|Somenext_fired->[%test_result:Status.t]t.status~expect:Fired;assert(phys_equal(Option.somet)next_fired.prev_fired)))~prev_fired:(check(funprev_fired->match%optional(prev_fired:Option.t)with|None->(* [prev_fired] can be [None] even if the event status is Fired, assuming
it's at the beginning of the fired events list *)()|Someprev_fired->[%test_result:Status.t]t.status~expect:Fired;assert(phys_equal(Option.somet)prev_fired.next_fired)))~status:(check(fun(status:Status.t)->matchstatuswith|Happening_periodic_event->assert(Core.Option.is_somet.interval)|Fired|Unscheduled|Scheduled->())));;letset_statustto_=letfrom=t.statusinifnot(Status.transition_is_allowed~from~to_)thenraise_s[%message[%here]"bug -- set_status transition not allowed"(from:Status.t)(to_:Status.t)~event:(t:t)];t.status<-to_;;letset_status_if~istto_=ifStatus.equalist.statusthenset_statustto_letscheduled_at=atendmoduleJob_or_event=structincludeJob_or_eventletsexp_of_tt=letopenJob_or_event.Matchinlet(Kk)=kindtinmatchk,projectktwith|Event,event->[%sexp(event:Event.t)]|Job,_->(* We don't display the [Job.t]s in [events] because those are
pool pointers, which are uninformative. *)[%message"<Job.t>"];;endtype-'rwt='rwTypes.Time_source.t1={id:Id.t;(* [advance_errors] accumulates errors raised by alarms run by
[advance_by_alarms]. *)mutableadvance_errors:Error.tlist;(* [am_advancing] is true only during [advance_by_alarms], and is used to cause
callbacks to raise if they call [advance_by_alarms]. *)mutableam_advancing:bool;events:Job_or_event.tTiming_wheel.t;(* [fired_events] is the front of the doubly-linked list of fired events,
which is stored in increasing order of [Event.at]. *)mutablefired_events:Event.Option.t;(* [most_recently_fired] is the event that was most recently inserted into
[fired_events]. It is used as an optimization to allow insertion of
subsequent events to start later in the list rather than at the beginning.
It specifically avoids quadratic behavior when inserting multiple events
that have exactly the same time -- the time source fires such events in
the order they were added, and we want them to be in that same order in
[fired_events]. *)mutablemost_recently_fired:Event.Option.t;(* We store [handle_fired] in [t] to avoid allocating it every time we call
[advance_clock]. *)handle_fired:Job_or_event.tAlarm.t->unit;is_wall_clock:bool;scheduler:Scheduler0.t}[@@derivingfields](* We don't include the [id] in the sexp because the user (rightly) can't control it, so
it's hard to make it deterministic in tests. *)letsexp_of_t_{id=_;advance_errors=_;am_advancing=_;events;fired_events=_;handle_fired=_;is_wall_clock;most_recently_fired=_;scheduler=_}=letnow=Timing_wheel.noweventsinifis_wall_clockthen[%message"wall_clock"(now:Time_ns.t)]else(letall_events=ref[]inTiming_wheel.iterevents~f:(funalarm->all_events:=(Alarm.ateventsalarm,Alarm.valueeventsalarm)::!all_events);letevents=List.sort!all_events~compare:(fun(at1,_)(at2,_)->Time_ns.compareat1at2)|>List.map~f:sndin[%message""(now:Time_ns.t)(events:Job_or_event.tlist)]);;lettiming_wheel_nowt=Timing_wheel.nowt.eventsletis_in_fired_events=letrecsearchcurrent~target_event=match%optional(current:Event.Option.t)with|None->false|Somecurrent->phys_equalcurrenttarget_event||searchcurrent.next_fired~target_eventinfunttarget_event->searcht.fired_events~target_event;;letinvariant_with_jobs(typerw)~job:(job_invariant:Job.t->unit)(t:rwt)=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~id:ignore~advance_errors:ignore~am_advancing:ignore~events:(check(funevents->Timing_wheel.invariantignoreevents;Timing_wheel.iterevents~f:(funalarm->letjob_or_event=Alarm.valueeventsalarminletopenJob_or_event.Matchinlet(Kk)=kindjob_or_eventinmatchk,projectkjob_or_eventwith|Job,job->job_invariantjob|Event,event->assert(phys_equalalarmevent.alarm);[%test_result:Time_ns.t]event.at~expect:(Alarm.ateventsalarm);[%test_result:Event.Status.t]event.status~expect:Scheduled;Event.invariantevent)))~fired_events:(check(fun(fired_events:Event.Option.t)->letreccheck_event(current:Event.t)=assert(Time_ns.(<=)current.at(timing_wheel_nowt));match%optional.Event.Optioncurrent.next_firedwith|None->()|Somenext->assert(Time_ns.(<=)current.atnext.at);check_eventnextinmatch%optional.Event.Optionfired_eventswith|None->()|Someevent->check_eventevent))~handle_fired:ignore~is_wall_clock:ignore~most_recently_fired:(check(funmost_recently_fired->match%optional(most_recently_fired:Event.Option.t)with|None->()|Someevent->assert(is_in_fired_eventstevent)))~scheduler:ignore);;letinvariantt=invariant_with_jobs~job:(fun_->())tendopenT1typet=readT1.t[@@derivingsexp_of]letinvariant=invariantletinvariant_with_jobs=invariant_with_jobsmoduleRead_write=structtypet=read_writeT1.t[@@derivingsexp_of]letinvariant=invariantletinvariant_with_jobs=invariant_with_jobsendletidt=t.idletis_wall_clockt=t.is_wall_clockletlengtht=Timing_wheel.lengtht.eventsletmax_allowed_alarm_timet=Timing_wheel.max_allowed_alarm_timet.eventsletread_only(t:[>read]T1.t)=(t:>t)(* [fire t event] sets [event.status = Fired] and inserts [event] into
[t.fired_events] in sorted time order. *)letfiret(event:Event.t)=Event.set_statuseventFired;event.alarm<-Alarm.null();let()=(* If [event] belongs after [t.most_recently_fired], then we start the
insertion there rather than at the front of [t.fired_events]. This works
nicely if we're getting the alarms in non-decreasing time order, which is
close to what [Timing_wheel] provides (although [Timing_wheel] doesn't
guarantee time ordering for times in the same interval). *)match%optional(t.most_recently_fired:Event.Option.t)with|Somemost_recently_firedwhenTime_ns.(<=)most_recently_fired.atevent.at->event.prev_fired<-Event.Option.somemost_recently_fired;event.next_fired<-most_recently_fired.next_fired|_->event.prev_fired<-Event.Option.none;event.next_fired<-t.fired_eventsint.most_recently_fired<-Event.Option.someevent;(* We use [Time_ns.( <= )] rather than [<] so that [event] is added after other
events at the same time. Since [Timing_wheel] fires alarms in a bucket in
the order in which they were added, using [<=] keeps events at the same
time in the order in which they were added. *)whilematch%optional(event.next_fired:Event.Option.t)with|None->false|Somenext->letcontinue=Time_ns.(<=)next.atevent.atinifcontinuethen(event.prev_fired<-event.next_fired;event.next_fired<-next.next_fired);continuedo()done;let()=match%optional(event.next_fired:Event.Option.t)with|None->()|Somenext->next.prev_fired<-Event.Option.someeventinmatch%optional(event.prev_fired:Event.Option.t)with|None->t.fired_events<-Event.Option.someevent|Someprev->prev.next_fired<-Event.Option.someevent;;letalarm_precisiont=Timing_wheel.alarm_precisiont.eventsletnext_alarm_fires_att=Timing_wheel.next_alarm_fires_att.eventsletnext_alarm_runs_att=ifEvent.Option.is_somet.fired_eventsthenSome(timing_wheel_nowt)elseTiming_wheel.next_alarm_fires_att.events;;letnowt=ift.is_wall_clockthenTime_ns.now()elsetiming_wheel_nowtlettiming_wheel_now=timing_wheel_nowletschedulet(event:Event.t)=Event.set_statuseventScheduled;event.alarm<-Timing_wheel.addt.events~at:event.at(event|>Job_or_event.of_event);;letremove_from_firedt(event:Event.t)~new_status=let()=match%optional(t.most_recently_fired:Event.Option.t)with|None->()|Somemost_recently_fired->ifphys_equaleventmost_recently_firedthent.most_recently_fired<-Event.Option.first_someevent.next_firedevent.prev_firedinlet()=match%optional(event.prev_fired:Event.Option.t)with|None->t.fired_events<-event.next_fired|Someprev->prev.next_fired<-event.next_firedinlet()=match%optional(event.next_fired:Event.Option.t)with|None->()|Somenext->next.prev_fired<-event.prev_firedinevent.next_fired<-Event.Option.none;event.prev_fired<-Event.Option.none;Event.set_statuseventnew_status;;moduleEvent=structincludeEventletcreate_internalt~at~interval~callback={alarm=Alarm.null();at;callback;execution_context=t.scheduler.current_execution_context;interval;next_fired=Event.Option.none;prev_fired=Event.Option.none;status=Unscheduled};;letaddtevent=ifTime_ns.(<=)event.at(timing_wheel_nowt)thenfireteventelsescheduletevent;;letcreate_and_addt~at~interval~callback=letevent=create_internalt~at~interval~callbackinaddtevent;event;;letattatcallback=create_and_addt~at~interval:None~callbackletaftertspancallback=create_and_addt~at:(Time_ns.after(nowt)span)~interval:None~callback;;letrequire_span_at_least_alarm_precisiontspan=letalarm_precision=alarm_precisiontinifTime_ns.Span.(<)spanalarm_precisionthenraise_s[%message"interval span smaller than alarm precision"(span:Time_ns.Span.t)(alarm_precision:Time_ns.Span.t)];;letat_intervalstspancallback=require_span_at_least_alarm_precisiontspan;create_and_addt~at:(nowt)~interval:(Somespan)~callback;;moduleAbort_result=structtypet=|Ok|Previously_unscheduled[@@derivingsexp_of]endletabortt(event:t):Abort_result.t=matchevent.statuswith|Happening_periodic_event->(matchevent.intervalwith|None->assertfalse|Some(_:Time_ns.Span.t)->event.interval<-None;event.status<-Unscheduled;Ok)|Fired->remove_from_firedtevent~new_status:Unscheduled;Ok|Scheduled->Event.set_statuseventUnscheduled;Timing_wheel.removet.eventsevent.alarm;event.alarm<-Alarm.null();Ok|Unscheduled->Previously_unscheduled;;letabort_if_possibletevent=ignore(aborttevent:Abort_result.t)letabort_exntevent=matchabortteventwith|Ok->()|reason->raise_s[%message"[Synchronous_time_source.abort_exn] cannot abort event"(reason:Abort_result.t)];;letcreatetcallback=create_internalt~at:Time_ns.epoch~interval:None~callbackletis_scheduled(event:t)=matchevent.statuswith|Happening_periodic_event|Scheduled|Fired->true|Unscheduled->false;;letschedule_at_internalt(event:t)at~interval=matchevent.statuswith|(Happening_periodic_event|Scheduled|Fired)asstatus->Or_error.error_s[%sexp"cannot schedule an event with status",(status:Event.Status.t)]|Unscheduled->event.at<-at;event.interval<-interval;addtevent;Ok();;letschedule_atteventat=schedule_at_internalteventat~interval:Noneletschedule_afterteventspan=schedule_attevent(Time_ns.after(nowt)span)letschedule_at_intervalsteventspan=require_span_at_least_alarm_precisiontspan;schedule_at_internaltevent(nowt)~interval:(Somespan);;letreschedule_atteventat:unit=matchevent.statuswith|Fired->remove_from_firedtevent~new_status:Unscheduled;event.at<-at;addtevent|Happening_periodic_event->(* Happening_periodic_event events have already been removed from [fired]. *)event.at<-at;addtevent|Scheduled->event.at<-at;ifTime_ns.(>)at(timing_wheel_nowt)thenTiming_wheel.reschedulet.eventsevent.alarm~atelse(Timing_wheel.removet.eventsevent.alarm;firetevent)|Unscheduled->event.at<-at;event.interval<-None;addtevent;;letreschedule_afterteventspan=reschedule_attevent(Time_ns.after(nowt)span)endletrun_aftertspancallback=ignore(Event.aftertspancallback:Event.t)letrun_attatcallback=ignore(Event.attatcallback:Event.t)letrun_at_intervalstspancallback=ignore(Event.at_intervalstspancallback:Event.t);;typesend_exn=Monitor0.t->?backtrace:[`Get|`ThisofBacktrace.t]->exn->unitletrun_fired_eventst~(send_exn:send_exnoption)=letcurrent_execution_context=t.scheduler.current_execution_contextinwhilematch%optional(t.fired_events:Event.Option.t)with|None->false|Someevent->(matchevent.statuswith|Happening_periodic_event|Scheduled|Unscheduled->assertfalse|Fired->letnew_status=matchevent.intervalwith|None->(Unscheduled:Event.Status.t)|Some_->(Happening_periodic_event:Event.Status.t)inremove_from_firedtevent~new_status;(* We set the execution context so that [event.callback] runs in the same context
that was in place when [event] was created. *)Scheduler0.set_execution_contextt.schedulerevent.execution_context;(* Any modification of [status] below needs to first check that the event is
still [Happening_periodic_event]. If the event status is not
[Happening_periodic_event] then the event's callback must have rescheduled the
event. In that case, do not set the status or attempt to reschedule a
repeating event.
This code could be much simpler if we immediately rescheduled the event before
running the callback (no need for the Happening_periodic_event state then).
One reason we don't do that is that we don't want to automatically reschedule
a periodic event if its callback raises. *)(matchevent.callback()with|exceptionexn->(matchsend_exnwith|None->t.advance_errors<-Error.of_exnexn::t.advance_errors|Somesend_exn->letbacktrace=Backtrace.Exn.most_recent()insend_exnevent.execution_context.monitorexn~backtrace:(`Thisbacktrace));Event.set_status_if~is:Happening_periodic_eventeventUnscheduled|()->(matchevent.intervalwith|None->Event.set_status_if~is:Happening_periodic_eventeventUnscheduled|Someinterval->ifEvent.Status.equalHappening_periodic_eventevent.statusthen((* The event's callback did not reschedule the event. So reschedule the
repeating timer based on the last [at] time. *)event.at<-Time_ns.next_multiple()~base:event.at~after:(timing_wheel_nowt)~interval;scheduletevent)));true)do()done;Scheduler0.set_execution_contextt.schedulercurrent_execution_context;;letany_fired_events_to_runt=Event.Option.is_some(t.fired_events:Event.Option.t)letadvance_clockt~to_~send_exn=Timing_wheel.advance_clockt.events~to_~handle_fired:t.handle_fired;run_fired_eventst~send_exn;;letfire_past_alarmst~send_exn=Timing_wheel.fire_past_alarmst.events~handle_fired:t.handle_fired;run_fired_eventst~send_exn;;letadvance_internalt~to_~send_exn=advance_clockt~to_~send_exn;fire_past_alarmst~send_exn;;letprepare_to_advancet~send_exn=ift.am_advancingthenraise_s[%sexp"cannot call [advance_by_alarms] or [advance_directly] from callback"];t.am_advancing<-true;(matcht.advance_errorswith|[]->()|_->t.advance_errors<-[]);run_fired_eventst~send_exn;;letfinish_advancingt=t.am_advancing<-false;matcht.advance_errorswith|[]->Ok()|errors->t.advance_errors<-[];Error(Error.of_listerrors);;letadvance_by_alarmst~to_=letsend_exn=Noneinprepare_to_advancet~send_exn;letcontinue=reftrueinwhile!continuedoifTiming_wheel.is_emptyt.eventsthencontinue:=falseelse(letmin_alarm_time=Timing_wheel.min_alarm_time_in_min_interval_exnt.eventsinifTime_ns.(>=)min_alarm_timeto_thencontinue:=falseelse(* We use the actual alarm time, rather than [next_alarm_fires_at], so as not to
expose (or accumulate errors associated with) the precision of
[Timing_wheel]. *)advance_internalt~to_:min_alarm_time~send_exn)done;advance_internalt~to_~send_exn;finish_advancingt;;letadvance_by_alarms_bytby=advance_by_alarmst~to_:(Time_ns.after(nowt)by)letadvance_by_max_alarms_in_each_timing_wheel_intervalt~to_=letsend_exn=Noneinprepare_to_advancet~send_exn;letcontinue=reftrueinwhile!continuedoifTiming_wheel.is_emptyt.eventsthencontinue:=falseelse(letnext_alarm_fires_at=Timing_wheel.next_alarm_fires_at_exnt.eventsinifTime_ns.(>=)next_alarm_fires_atto_thencontinue:=falseelse(* We use the actual alarm time, rather than [next_alarm_fires_at], so as not to
expose (or accumulate errors associated with) the precision of
[Timing_wheel]. *)advance_internalt~to_:(Timing_wheel.max_alarm_time_in_min_interval_exnt.events)~send_exn)done;advance_internalt~to_~send_exn;finish_advancingt;;letadvance_directlyt~to_=letsend_exn=Noneinprepare_to_advancet~send_exn;advance_internalt~to_~send_exn;finish_advancingt;;letadvance_directly_bytby=advance_directlyt~to_:(Time_ns.after(nowt)by)letduration_oftf=letstart=nowtinletresult=f()inletduration=Time_ns.diff(nowt)startinresult,duration;;letmax_alarm_time_in_min_timing_wheel_intervalt=Timing_wheel.max_alarm_time_in_min_intervalt.events;;lethas_events_to_runt=Event.Option.is_somet.fired_events