123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298(** A time source holds a time (possibly wall-clock time, possibly simulated time) and
gives the ability to schedule Async jobs (alarms) to run when that time advances.
There is a single wall-clock time source (returned by [wall_clock ()]) that the Async
scheduler drives and uses for the [Clock_ns] module. One can also create a
user-controlled time source via [create], and advance its clock as desired. This is
useful so that state machines can depend on a notion of time that is distinct from
wall-clock time. *)open!Coreopen!ImportmoduleDeferred=Deferred1moduletypeTime_source=sig(** A time source has a phantom read-write parameter, where [write] gives permission to
call [advance] and [fire_past_alarms]. *)moduleT1:sigtype-'rwt[@@derivingsexp_of]endmoduleRead_write:sigtypet=read_writeT1.t[@@derivingsexp_of]includeInvariant.Swithtypet:=tvalinvariant_with_jobs:job:Job.tInvariant.t->tInvariant.tendmoduleId:Unique_id.Idtypet=readT1.t[@@derivingsexp_of](** [id t] returns a unique, consistent identifier which can be used e.g. as a map or
hash table key. *)valid:_T1.t->Id.tincludeInvariant.Swithtypet:=tvalinvariant_with_jobs:job:Job.tInvariant.t->tInvariant.tvalread_only:[>read]T1.t->t(** Creates a new simulated time source. *)valcreate:?timing_wheel_config:Timing_wheel.Config.t->now:Time_ns.t->unit->_T1.t(** A time source with [now t] given by wall-clock time (i.e., [Time_ns.now]) and that
is advanced automatically as time passes (specifically, at the start of each Async
cycle). There is only one wall-clock time source; every call to [wall_clock ()]
returns the same value. The behavior of [now] is special for [wall_clock ()]; it
always calls [Time_ns.now ()], so it can return times that the time source has not
yet been advanced to. *)valwall_clock:unit->t(** Accessors. [now (wall_clock ())] behaves specially; see [wall_clock] above. *)valalarm_precision:[>read]T1.t->Time_ns.Span.tvalis_wall_clock:[>read]T1.t->boolvalnext_alarm_fires_at:[>read]T1.t->Time_ns.toptionvalnow:[>read]T1.t->Time_ns.t(** Removes the special behavior of [now] for [wall_clock]; it always returns the
timing_wheel's notion of now. *)valtiming_wheel_now:[>read]T1.t->Time_ns.t(** Instead of [advance_directly], you probably should use [advance_by_alarms].
[advance_directly t ~to_] advances the clock directly to [to_], whereas
[advance_by_alarms] advances the clock in steps, to each intervening alarm.
[advance_directly] approximately determines the set of events to fire, up to
timing-wheel alarm precision, whereas [advance_by_alarms] fires all alarms whose
time is [<= to_]. With [advance_directly], you must call [fire_past_alarms] if you
want that behavior (see docs for [Timing_wheel.advance_clock] vs.
[Timing_wheel.fire_past_alarms]). *)valadvance_directly:[>write]T1.t->to_:Time_ns.t->unitvaladvance:[>write]T1.t->to_:Time_ns.t->unit[@@deprecated"[since 2019-06] Use [advance_directly] (to preserve behavior) or \
[advance_by_alarms]"]valadvance_directly_by:[>write]T1.t->Time_ns.Span.t->unitvaladvance_by:[>write]T1.t->Time_ns.Span.t->unit[@@deprecated"[since 2019-06] Use [advance_directly_by] (to preserve behavior) or \
[advance_by_alarms_by]"]valfire_past_alarms:[>write]T1.t->unit(** [advance_by_alarms t] repeatedly calls [advance t] to drive the time forward in
steps, where each step is the minimum of [to_] and the next alarm time. After each
step, [advance_by_alarms] waits for the result of [wait_for] to become determined
before advancing. By default, [wait_for] will be [Scheduler.yield ()] to allow the
triggered timers to execute and potentially rearm for subsequent steps. The returned
deferred is filled when [to_] is reached.
[advance_by_alarms] is useful in simulation when one wants to efficiently advance to
a time in the future while giving periodic timers (e.g., resulting from [every]) a
chance to fire with approximately the same timing as they would live.
Note: an alarm is anything that's scheduled to happen at a particular time using
this time source, so e.g. any scheduled [Event], something scheduled by [run_*], or
any deferred returned by [at]/[after]. *)valadvance_by_alarms:?wait_for:(unit->unitDeferred.t)->[>write]T1.t->to_:Time_ns.t->unitDeferred.tvaladvance_by_max_alarms_in_each_timing_wheel_interval:?wait_for:(unit->unitDeferred.t)->[>write]T1.t->to_:Time_ns.t->unitDeferred.t[@@deprecated"[since 2021-12] This is the old implementation of [advance_by_alarms], kept in \
case the new implementation causes problems."](** [advance_by_alarms_by ?wait_for t by] is equivalent to:
[advance_by_alarms ?wait_for t ~to_:(Time_ns.add (now t) by)] *)valadvance_by_alarms_by:?wait_for:(unit->unitDeferred.t)->[>write]T1.t->Time_ns.Span.t->unitDeferred.tmoduleContinue:sigtypetvalimmediately:tend(** [run_repeatedly] is the same as [every'], with the delay between jobs controlled by
[continue]. When [continue] is [Continue.immediately] (the only value currently
exposed in this interface), a new execution of [f] will be scheduled immediately
after the deferred returned by [f] is resolved. *)valrun_repeatedly:?start:unitDeferred.t(** default is [return ()] *)->?stop:unitDeferred.t(** default is [Deferred.never ()] *)->?continue_on_error:bool(** default is [true] *)->?finished:unitIvar.t->[>read]T1.t->f:(unit->unitDeferred.t)->continue:Continue.t->unit(** The functions below here are the same as in clock_intf.ml, except they take an
explicit [t] argument. See {{!Async_kernel.Clock_intf}[Clock_intf]} for
documentation. *)valrun_at:[>read]T1.t->Time_ns.t->('a->unit)->'a->unitvalrun_after:[>read]T1.t->Time_ns.Span.t->('a->unit)->'a->unitvalat:[>read]T1.t->Time_ns.t->unitDeferred.tvalafter:[>read]T1.t->Time_ns.Span.t->unitDeferred.tvalwith_timeout:[>read]T1.t->Time_ns.Span.t->'aDeferred.t->[`Timeout|`Resultof'a]Deferred.tvalwith_timeout_exn:[>read]T1.t->Time_ns.Span.t->'aDeferred.t->error:Error.t->'aDeferred.tvalduration_of:[>read]T1.t->(unit->'aDeferred.t)->('a*Time_ns.Span.t)Deferred.tmoduleEvent:sigtype('a,'h)t[@@derivingsexp_of]typet_unit=(unit,unit)t[@@derivingsexp_of]includeInvariant.S2withtype('a,'b)t:=('a,'b)tvalscheduled_at:(_,_)t->Time_ns.tmoduleStatus:sigtype('a,'h)t=|Abortedof'a|Happenedof'h|Scheduled_atofTime_ns.t[@@derivingsexp_of]endvalstatus:('a,'h)t->('a,'h)Status.tvalrun_at:[>read]T1.t->Time_ns.t->('z->'h)->'z->(_,'h)tvalrun_after:[>read]T1.t->Time_ns.Span.t->('z->'h)->'z->(_,'h)tmoduleAbort_result:sigtype('a,'h)t=|Ok|Previously_abortedof'a|Previously_happenedof'h[@@derivingsexp_of]endvalabort:('a,'h)t->'a->('a,'h)Abort_result.tvalabort_exn:('a,'h)t->'a->unitvalabort_if_possible:('a,_)t->'a->unitmoduleFired:sigtype('a,'h)t=|Abortedof'a|Happenedof'h[@@derivingsexp_of]endvalfired:('a,'h)t->('a,'h)Fired.tDeferred.tmoduleReschedule_result:sigtype('a,'h)t=|Ok|Previously_abortedof'a|Previously_happenedof'h[@@derivingsexp_of]endvalreschedule_at:('a,'h)t->Time_ns.t->('a,'h)Reschedule_result.tvalreschedule_after:('a,'h)t->Time_ns.Span.t->('a,'h)Reschedule_result.tvalat:[>read]T1.t->Time_ns.t->(_,unit)tvalafter:[>read]T1.t->Time_ns.Span.t->(_,unit)tendvalat_varying_intervals:?stop:unitDeferred.t->[>read]T1.t->(unit->Time_ns.Span.t)->unitAsync_stream.tvalat_intervals:?start:Time_ns.t->?stop:unitDeferred.t->[>read]T1.t->Time_ns.Span.t->unitAsync_stream.t(** See {!Clock.every'} for documentation. *)valevery':?start:unitDeferred.t(** default is [return ()] *)->?stop:unitDeferred.t(** default is [Deferred.never ()] *)->?continue_on_error:bool(** default is [true] *)->?finished:unitIvar.t->[>read]T1.t->Time_ns.Span.t->(unit->unitDeferred.t)->unitvalevery:?start:unitDeferred.t(** default is [return ()] *)->?stop:unitDeferred.t(** default is [Deferred.never ()] *)->?continue_on_error:bool(** default is [true] *)->[>read]T1.t->Time_ns.Span.t->(unit->unit)->unitvalrun_at_intervals':?start:Time_ns.t(** default is [now t] *)->?stop:unitDeferred.t(** default is [Deferred.never ()] *)->?continue_on_error:bool(** default is [true] *)->[>read]T1.t->Time_ns.Span.t->(unit->unitDeferred.t)->unitvalrun_at_intervals:?start:Time_ns.t(** default is [now t] *)->?stop:unitDeferred.t(** default is [Deferred.never ()] *)->?continue_on_error:bool(** default is [true] *)->[>read]T1.t->Time_ns.Span.t->(unit->unit)->unit(** [Time_source] and [Synchronous_time_source] are the same data structure and use the
same underlying timing wheel. The types are freely interchangeable. *)valof_synchronous:'aSynchronous_time_source0.T1.t->'aT1.tvalto_synchronous:'aT1.t->'aSynchronous_time_source0.T1.t(** Advance iff:
- no alarms are scheduled up (and including) to that time point
- no jobs are runnable (which could cause events to happen in the time range)
Returns true if we advanced, false if we were unable to.
This is an optimisation relative to (for instance) [Time_source.advance_by_alarms] or
other methods that will rely on running async cycles to produce quiescence.
*)valadvance_directly_if_quiescent:[>write]T1.t->to_:Time_ns.t->boolend