123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659open!Core_kernelopen!ImportmoduleTime_ns=structincludeTime_nsexternalformat:float->string->string="core_kernel_time_ns_format"(* We use a more pleasant format than [Core_kernel.Time_ns.sexp_of_t],
which has to be messier for round trippability. *)letsexp_of_tt=[%sexp(format(t|>to_span_since_epoch|>Span.to_sec)"%Y-%m-%dT%H:%M:%S%z":string)];;endmoduleAlarm=structincludeTiming_wheel.Alarmletis_nullt=phys_equalt(null())endmoduleAlarm_precision=Timing_wheel.Alarm_precisionletdefault_timing_wheel_config=(* 1/8th of a millisecond alarm_precision seems sufficient to avoid having many alarms
in the same interval, which avoids quadratic insertion sort when firing alarms. And
the level bits give us levels of >1s, >1m, >1h, >1d. See test in
[../test/test_synchronous_time_source.ml]. *)Timing_wheel.Config.create~alarm_precision:Alarm_precision.(divabout_one_millisecond~pow2:3)~level_bits:(Timing_wheel.Level_bits.create_exn[13;6;6;5])();;typecallback=unit->unitmoduleId=Types.Time_source_idmoduleT1=structmoduleEvent=structmoduleStatus=structtypet=Types.Event.Status.t=|Aborted(* in [fired_events], must not run *)|Fired(* in [fired_events], ready to run *)|Happening(* currently running the callback *)|Scheduled(* in the timing wheel *)|Unscheduled(* not in timing wheel or [fired_events] *)[@@derivingcompare,sexp_of]lettransition_is_allowed~from~to_=matchfrom,to_with|Aborted,Unscheduled(* skipped running callback *)|Fired,Happening(* started running callback *)|Fired,Aborted(* aborted *)|Happening,Scheduled(* for repeating events *)|Happening,Unscheduled(* event callback finished *)|Scheduled,Fired(* moved from timing wheel to [fired_events] *)|Scheduled,Unscheduled(* aborted *)|Unscheduled,Fired(* event scheduled in the past *)|Unscheduled,Scheduled(* event scheduled in the future *)->true|(Aborted|Fired|Happening|Scheduled|Unscheduled),_->false;;endtypet=Types.Event.t={(* [alarm] is non-null iff the event is in the timing wheel. *)mutablealarm:Job_or_event.tAlarm.t;mutableat:Time_ns.t;callback:unit->unit;execution_context:Execution_context.t;(* [interval] is the period for the periodic events *)mutableinterval:Time_ns.Span.toption;(* [next_fired] is a singly-linked list of fired events, linked via [next_fired].
An event is added to the list when it fires, either because it is added with a
time in the past, or because time advances. [advance_by_alarms] iterates over
the events in [next_fired] and runs them, emptying the list. *)mutablenext_fired:t;mutablestatus:Status.t}[@@derivingfields](* [none] is used to indicate the end of the singly-linked list of fired events. *)letrecnone={alarm=Alarm.null();at=Time_ns.min_value_for_1us_rounding;callback=(fun()->assertfalse);execution_context=Execution_context.main;interval=None;next_fired=none;status=Unscheduled};;letis_nonet=phys_equaltnoneletis_somet=not(is_nonet)letsexp_of_t({alarm=_;at;callback=_;execution_context=_;interval;next_fired=_;status}ast)=ifis_nonetthen[%sexp"none"]else[%message""(status:Status.t)(at:Time_ns.t)(interval:Time_ns.Span.toption)];;letinvariantt=Invariant.invariant[%here]t[%sexp_of:t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~alarm:(check(funalarm->[%test_result:bool](Alarm.is_nullalarm)~expect:(matcht.statuswith|Aborted|Fired|Happening|Unscheduled->true|Scheduled->false)))~at:ignore~callback:ignore~execution_context:ignore~interval:ignore~next_fired:(check(funnext_fired->ifis_somenext_firedthen(matcht.statuswith|Aborted|Fired->()|Happening|Scheduled|Unscheduled->assertfalse)))~status:ignore);;letcompare_att1t2=Time_ns.comparet1.att2.atletset_statustto_=letfrom=t.statusinifnot(Status.transition_is_allowed~from~to_)thenraise_s[%message[%here]"bug -- set_status transition not allowed"(from:Status.t)(to_:Status.t)~event:(t:t)];t.status<-to_;;endmoduleJob_or_event=structincludeJob_or_eventletsexp_of_tt=letopenJob_or_event.Matchinlet(Kk)=kindtinmatchk,projectktwith|Event,event->[%sexp(event:Event.t)]|Job,_->(* We don't display the [Job.t]s in [events] because those are
pool pointers, which are uninformative. *)[%message"<Job.t>"];;endtype-'rwt='rwTypes.Time_source.t1={id:Id.t;(* [advance_errors] accumulates errors raised by alarms run by
[advance_by_alarms]. *)mutableadvance_errors:Error.tlist;(* [am_advancing] is true only during [advance_by_alarms], and is used to cause
callbacks to raise if they call [advance_by_alarms]. *)mutableam_advancing:bool;events:Job_or_event.tTiming_wheel.t;(* [fired_events] is the front of the singly linked list of fired events, which is
stored in increasing order of [Event.at]. *)mutablefired_events:Event.t;(* [most_recently_fired] is the event that was most recently inserted into
[fired_events]. It is used as an optimization to allow insertion of subsequent
events to start later in the list rather than at the beginning. It specifically
avoids quadratic behavior when inserting multiple events that have exactly the
same time -- the time source fires such events in the order they were added, and
we want them to be in that same order in [fired_events]. *)mutablemost_recently_fired:Event.t;(* We store [handle_fired] in [t] to avoid allocating it every time we call
[advance_clock]. *)handle_fired:Job_or_event.tAlarm.t->unit;is_wall_clock:bool;scheduler:Scheduler0.t}[@@derivingfields](* We don't include the [id] in the sexp because the user (rightly) can't control it, so
it's hard to make it deterministic in tests. *)letsexp_of_t_{id=_;advance_errors=_;am_advancing=_;events;fired_events=_;handle_fired=_;is_wall_clock;most_recently_fired=_;scheduler=_}=letnow=Timing_wheel.noweventsinifis_wall_clockthen[%message"wall_clock"(now:Time_ns.t)]else(letall_events=ref[]inTiming_wheel.iterevents~f:(funalarm->all_events:=(Alarm.ateventsalarm,Alarm.valueeventsalarm)::!all_events);letevents=List.sort!all_events~compare:(fun(at1,_)(at2,_)->Time_ns.compareat1at2)|>List.map~f:sndin[%message""(now:Time_ns.t)(events:Job_or_event.tlist)]);;lettiming_wheel_nowt=Timing_wheel.nowt.eventsletis_in_fired_eventstevent=with_return(funr->letcurrent=reft.fired_eventsinwhileEvent.is_some!currentdoifphys_equal!currenteventthenr.returntrue;current:=!current.next_fireddone;false);;letinvariant_with_jobs(typerw)~job:(job_invariant:Job.t->unit)(t:rwt)=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~id:ignore~advance_errors:ignore~am_advancing:ignore~events:(check(funevents->Timing_wheel.invariantignoreevents;Timing_wheel.iterevents~f:(funalarm->letjob_or_event=Alarm.valueeventsalarminletopenJob_or_event.Matchinlet(Kk)=kindjob_or_eventinmatchk,projectkjob_or_eventwith|Job,job->job_invariantjob|Event,event->assert(phys_equalalarmevent.alarm);[%test_result:Time_ns.t]event.at~expect:(Alarm.ateventsalarm);[%test_result:Event.Status.t]event.status~expect:Scheduled)))~fired_events:(check(fun(fired_events:Event.t)->letcurrent=reffired_eventsinwhileEvent.is_some!currentdoassert(Time_ns.(<=)!current.at(timing_wheel_nowt));letnext=!current.next_firedinifEvent.is_somenextthenassert(Time_ns.(<=)!current.atnext.at);current:=nextdone))~handle_fired:ignore~is_wall_clock:ignore~most_recently_fired:(check(funmost_recently_fired->ifEvent.is_somet.most_recently_firedthenassert(is_in_fired_eventstmost_recently_fired)))~scheduler:ignore);;letinvariantt=invariant_with_jobs~job:(fun_->())tendopenT1typet=readT1.t[@@derivingsexp_of]letinvariant=invariantletinvariant_with_jobs=invariant_with_jobsmoduleRead_write=structtypet=read_writeT1.t[@@derivingsexp_of]letinvariant=invariantletinvariant_with_jobs=invariant_with_jobsendletidt=t.idletis_wall_clockt=t.is_wall_clockletlengtht=Timing_wheel.lengtht.eventsletmax_allowed_alarm_timet=Timing_wheel.max_allowed_alarm_timet.eventsletread_only(t:[>read]T1.t)=(t:>t)(* [fire t event] sets [event.status = Fired] and inserts [event] into [t.fired_events] in
sorted time order. *)letfiret(event:Event.t)=Event.set_statuseventFired;event.alarm<-Alarm.null();letprev=refEvent.noneinletcurrent=reft.fired_eventsin(* If [event] belongs after [t.most_recently_fired], then we start the insertion there
rather than at the front of [t.fired_events]. This works nicely if we're getting the
alarms in non-decreasing time order, which is close to what [Timing_wheel]
provides (although [Timing_wheel] doesn't guarantee time ordering for times in the
same interval). *)ifEvent.is_somet.most_recently_fired&&Time_ns.(>=)event.att.most_recently_fired.atthen(prev:=t.most_recently_fired;current:=!prev.next_fired);(* We use [Time_ns.( <= )] rather than [<] so that [event] is added after other events
at the same time. Since [Timing_wheel] fires alarms in a bucket in the order in
which they were added, using [<=] keeps events at the same time in the order in which
they were added. *)whileEvent.is_some!current&&Time_ns.(<=)!current.atevent.atdoprev:=!current;current:=!current.next_fireddone;event.next_fired<-!current;t.most_recently_fired<-event;ifEvent.is_none!prevthent.fired_events<-eventelse!prev.next_fired<-event;;letalarm_precisiont=Timing_wheel.alarm_precisiont.eventsletnext_alarm_fires_att=Timing_wheel.next_alarm_fires_att.eventsletnowt=ift.is_wall_clockthenTime_ns.now()elsetiming_wheel_nowtlettiming_wheel_now=timing_wheel_nowletschedulet(event:Event.t)=Event.set_statuseventScheduled;event.alarm<-Timing_wheel.addt.events~at:event.at(event|>Job_or_event.of_event);;moduleEvent=structincludeEventletcreate_internalt~at~interval~callback={alarm=Alarm.null();at;callback;execution_context=t.scheduler.current_execution_context;interval;next_fired=none;status=Unscheduled};;letaddtevent=ifTime_ns.(<=)event.at(timing_wheel_nowt)thenfireteventelsescheduletevent;;letcreate_and_addt~at~interval~callback=letevent=create_internalt~at~interval~callbackinaddtevent;event;;letattatcallback=create_and_addt~at~interval:None~callbackletaftertspancallback=create_and_addt~at:(Time_ns.after(nowt)span)~interval:None~callback;;letrequire_span_at_least_alarm_precisiontspan=letalarm_precision=alarm_precisiontinifTime_ns.Span.(<)spanalarm_precisionthenraise_s[%message"interval span smaller than alarm precision"(span:Time_ns.Span.t)(alarm_precision:Time_ns.Span.t)];;letat_intervalstspancallback=require_span_at_least_alarm_precisiontspan;create_and_addt~at:(nowt)~interval:(Somespan)~callback;;moduleAbort_result=structtypet=|Ok|Currently_happening|Previously_unscheduled[@@derivingsexp_of]endletabortt(event:t):Abort_result.t=matchevent.statuswith|Aborted->Previously_unscheduled|Happening->ifOption.is_noneevent.intervalthenCurrently_happeningelse(event.interval<-None;Ok)|Fired->Event.set_statuseventAborted;Ok|Scheduled->Event.set_statuseventUnscheduled;Timing_wheel.removet.eventsevent.alarm;event.alarm<-Alarm.null();Ok|Unscheduled->Previously_unscheduled;;letabort_if_possibletevent=ignore(aborttevent:Abort_result.t)letabort_exntevent=matchabortteventwith|Ok->()|reason->raise_s[%message"[Synchronous_time_source.abort_exn] cannot abort event"(reason:Abort_result.t)];;letcreatetcallback=create_internalt~at:Time_ns.epoch~interval:None~callbackletschedule_at_internalt(event:t)at~interval=(* [Fired] is disallowed to prevent the user from entering into an infinite loop. The
user could specify [at] in the past which would constantly add [callback] to the
back of [t.next_fired] if this function is called from [callback]. *)matchevent.statuswith|(Aborted|Happening|Scheduled|Fired)asstatus->Or_error.error_s[%message"cannot schedule an event with status"~_:(status:Event.Status.t)]|Unscheduled->event.at<-at;event.interval<-interval;addtevent;Ok();;letschedule_atteventat=schedule_at_internalteventat~interval:Noneletschedule_afterteventspan=schedule_attevent(Time_ns.after(nowt)span)letschedule_at_intervalsteventspan=require_span_at_least_alarm_precisiontspan;schedule_at_internaltevent(nowt)~interval:(Somespan);;moduleReschedule_result=structtypet=|Ok|Currently_happening|Recently_aborted|Recently_fired[@@derivingsexp_of]endletreschedule_atteventat:Reschedule_result.t=matchevent.statuswith|Aborted->Recently_aborted|Fired->Recently_fired|Happening->Currently_happening|Scheduled->event.at<-at;ifTime_ns.(>)at(timing_wheel_nowt)thenTiming_wheel.reschedulet.eventsevent.alarm~atelse(Timing_wheel.removet.eventsevent.alarm;firetevent);Ok|Unscheduled->event.at<-at;event.interval<-None;addtevent;Ok;;letreschedule_afterteventspan=reschedule_attevent(Time_ns.after(nowt)span)moduleOption=structtypevalue=ttypenonrect=tletis_none=is_noneletis_some=is_someletsomevalue=(* This assert shouldn't fail because [t] is a [value] and so should never
be [none]. *)assert(is_somevalue);value;;(* It should be impossible for [some_is_representable] to return [false]
because its input is a [value], but since it's only loosely enforced we
handle the general case. *)letsome_is_representablevalue=assert(is_somevalue);true;;letnone=noneletunchecked_value=Fn.idletvaluet~default=ifis_nonetthendefaultelseunchecked_valuetletvalue_exnt=ifis_nonetthenraise_s[%message"[Synchronous_time_source.Event.Option.value_exn None]"];t;;letto_optiont=ifis_nonetthenNoneelseSometletof_option=function|None->none|Somet->somet;;letsexp_of_tt=to_optiont|>[%sexp_of:toption]moduleOptional_syntax=structmoduleOptional_syntax=structletis_none=is_noneletunsafe_value=Fn.idendendendendletrun_aftertspancallback=ignore(Event.aftertspancallback:Event.t)letrun_attatcallback=ignore(Event.attatcallback:Event.t)letrun_at_intervalstspancallback=ignore(Event.at_intervalstspancallback:Event.t);;typesend_exn=Monitor0.t->?backtrace:[`Get|`ThisofBacktrace.t]->exn->unitletrun_fired_eventst~(send_exn:send_exnoption)=letcurrent_execution_context=t.scheduler.current_execution_contextinwhileEvent.is_somet.fired_eventsdoletevent=t.fired_eventsinifphys_equaleventt.most_recently_firedthent.most_recently_fired<-Event.none;t.fired_events<-event.next_fired;event.next_fired<-Event.none;matchevent.statuswith|Aborted->Event.set_statuseventUnscheduled|Happening|Scheduled|Unscheduled->assertfalse|Fired->Event.set_statuseventHappening;(* We set the execution context so that [event.callback] runs in the same context
that was in place when [event] was created. *)Scheduler0.set_execution_contextt.schedulerevent.execution_context;(matchevent.callback()with|exceptionexn->(matchsend_exnwith|None->t.advance_errors<-Error.of_exnexn::t.advance_errors|Somesend_exn->letbacktrace=Backtrace.get()insend_exnevent.execution_context.monitorexn~backtrace:(`Thisbacktrace));Event.set_statuseventUnscheduled|()->(matchevent.intervalwith|None->Event.set_statuseventUnscheduled|Someinterval->event.at<-Time_ns.next_multiple()~base:event.at~after:(timing_wheel_nowt)~interval;scheduletevent))done;Scheduler0.set_execution_contextt.schedulercurrent_execution_context;;letadvance_clockt~to_~send_exn=Timing_wheel.advance_clockt.events~to_~handle_fired:t.handle_fired;run_fired_eventst~send_exn;;letfire_past_alarmst~send_exn=Timing_wheel.fire_past_alarmst.events~handle_fired:t.handle_fired;run_fired_eventst~send_exn;;letadvance_internalt~to_~send_exn=advance_clockt~to_~send_exn;fire_past_alarmst~send_exn;;letprepare_to_advancet~send_exn=ift.am_advancingthenraise_s[%message"cannot call [advance_by_alarms] from callback"];t.am_advancing<-true;(matcht.advance_errorswith|[]->()|_->t.advance_errors<-[]);run_fired_eventst~send_exn;;letfinish_advancingt=t.am_advancing<-false;matcht.advance_errorswith|[]->Ok()|errors->t.advance_errors<-[];Error(Error.of_listerrors);;letadvance_by_alarmst~to_=letsend_exn=Noneinprepare_to_advancet~send_exn;letcontinue=reftrueinwhile!continuedoifTiming_wheel.is_emptyt.eventsthencontinue:=falseelse(letnext_alarm_fires_at=Timing_wheel.next_alarm_fires_at_exnt.eventsinifTime_ns.(>=)next_alarm_fires_atto_thencontinue:=falseelse(* We use the actual alarm time, rather than [next_alarm_fires_at], so as not to
expose (or accumulate errors associated with) the precision of
[Timing_wheel]. *)advance_internalt~to_:(Timing_wheel.max_alarm_time_in_min_interval_exnt.events)~send_exn)done;advance_internalt~to_~send_exn;finish_advancingt;;letadvance_directlyt~to_=letsend_exn=Noneinprepare_to_advancet~send_exn;advance_internalt~to_~send_exn;finish_advancingt;;moduleExpert=structletmax_alarm_time_in_min_timing_wheel_intervalt=Timing_wheel.max_alarm_time_in_min_intervalt.events;;lethas_events_to_runt=Event.is_somet.fired_eventsend