Source file time_source.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
open! Core
open! Import
open! Deferred_std
let debug = Debug.clock
module Alarm = Timing_wheel.Alarm
module Deferred = Deferred1
module Scheduler = Scheduler1
let upon = Deferred.upon
let choose = Deferred.choose
let choice = Deferred.choice
let ( >>> ) = upon
module T1 = struct
include Synchronous_time_source0.T1
let sexp_of_t
_
{ id = _
; advance_errors = _
; am_advancing = _
; events
; fired_events = _
; handle_fired = _
; is_wall_clock
; most_recently_fired = _
; scheduler = _
}
=
if is_wall_clock
then [%message "<wall_clock>"]
else
[%message
(is_wall_clock : bool)
(events : _ Timing_wheel.t)]
;;
end
open T1
module Read_write = struct
type t = read_write T1.t [@@deriving sexp_of]
let invariant = invariant
let invariant_with_jobs = invariant_with_jobs
end
type t = read T1.t [@@deriving sexp_of]
let invariant = invariant
let invariant_with_jobs = invariant_with_jobs
let read_only (t : [> read ] T1.t) = (t :> t)
let create = Scheduler.create_time_source
let wall_clock = Scheduler.wall_clock
let alarm_precision t = Timing_wheel.alarm_precision t.events
let is_wall_clock t = t.is_wall_clock
let next_alarm_fires_at t = Timing_wheel.next_alarm_fires_at t.events
let timing_wheel_now t = Timing_wheel.now t.events
let id t = t.id
module Id = Synchronous_time_source0.Id
let now t =
if t.is_wall_clock
then
Time_ns.now ()
else timing_wheel_now t
;;
let send_exn = Some Monitor.send_exn
let advance_directly t ~to_ = Synchronous_time_source0.advance_clock t ~to_ ~send_exn
let advance_directly_by t by = advance_directly t ~to_:(Time_ns.after (now t) by)
let advance = advance_directly
let advance_by = advance_directly_by
let fire_past_alarms t = Synchronous_time_source0.fire_past_alarms t ~send_exn
let yield t = Bvar.wait (Scheduler.yield t.scheduler)
let can_run_a_job t = Scheduler.num_pending_jobs t > 0 || Bvar.has_any_waiters t.yield
module Eager_deferred = struct
let bind_unit t ~f =
if Deferred.is_determined t
then f ()
else (
let%bind () = t in
f ())
;;
let map t ~f =
if Deferred.is_determined t
then return (f (Deferred.value_exn t))
else Deferred.map t ~f
;;
end
let advance_by_alarms ?wait_for t ~to_ =
let run_queued_alarms () =
match wait_for with
| None -> yield t
| Some f -> f ()
in
let one_step () =
if Synchronous_time_source0.any_fired_events_to_run t
then now t, `continue
else (
match Timing_wheel.min_alarm_time_in_min_interval t.events with
| None -> to_, `finish
| Some min_alarm_time_in_min_interval ->
if Time_ns.( >= ) min_alarm_time_in_min_interval to_
then to_, `finish
else min_alarm_time_in_min_interval, `continue)
in
let rec walk_alarms () =
let advance_to, next = one_step () in
advance_directly t ~to_:advance_to;
fire_past_alarms t;
Eager_deferred.bind_unit (run_queued_alarms ()) ~f:(fun () ->
match next with
| `finish -> return ()
| `continue -> walk_alarms ())
in
let%bind () = run_queued_alarms () in
walk_alarms ()
;;
let advance_by_max_alarms_in_each_timing_wheel_interval ?wait_for t ~to_ =
let run_queued_alarms () =
match wait_for with
| None -> yield t
| Some f -> f ()
in
let finish () =
advance_directly t ~to_;
fire_past_alarms t;
run_queued_alarms ()
in
let rec walk_alarms () =
match next_alarm_fires_at t with
| None -> finish ()
| Some next_alarm_fires_at ->
if Time_ns.( >= ) next_alarm_fires_at to_
then finish ()
else (
advance_directly t ~to_:(Timing_wheel.max_alarm_time_in_min_interval_exn t.events);
fire_past_alarms t;
let queued_alarms_ran = run_queued_alarms () in
Eager_deferred.bind_unit queued_alarms_ran ~f:walk_alarms)
in
fire_past_alarms t;
let%bind () = run_queued_alarms () in
walk_alarms ()
;;
let advance_by_alarms_by ?wait_for t by =
advance_by_alarms ?wait_for t ~to_:(Time_ns.after (now t) by)
;;
let span_to_time t span = Time_ns.after (now t) span
let schedule_job t ~at execution_context f a =
let alarm =
Timing_wheel.add
t.events
~at
(Job_or_event.of_job (Scheduler.create_job t.scheduler execution_context f a))
in
(match t.scheduler.event_added_hook with
| None -> ()
| Some f -> f at);
alarm
;;
let run_at_internal t time f a =
let execution_context = Scheduler.current_execution_context t.scheduler in
if Time_ns.( > ) time (Timing_wheel.now t.events)
then schedule_job t ~at:time execution_context f a
else (
Scheduler.enqueue t.scheduler execution_context f a;
Alarm.null ())
;;
let run_at t time f a = ignore (run_at_internal t time f a : _ Alarm.t)
let run_after t span f a = run_at t (span_to_time t span) f a
let at =
let fill result = Ivar.fill result () in
fun t time ->
if Time_ns.( <= ) time (Timing_wheel.now t.events)
then return ()
else (
let result = Ivar.create () in
ignore (run_at_internal t time fill result : _ Alarm.t);
Ivar.read result)
;;
let after t span = at t (span_to_time t span)
let remove_alarm t alarm : unit =
let job_or_event = Alarm.value t.events alarm in
(let open Job_or_event.Match in
let (K k) = kind job_or_event in
match k, project k job_or_event with
| Job, job -> Scheduler.free_job t.scheduler job
| Event, _ ->
assert false);
Timing_wheel.remove t.events alarm
;;
let remove_alarm_if_scheduled t alarm =
if Timing_wheel.mem t.events alarm then remove_alarm t alarm
;;
module Event = struct
module Fired = struct
type ('a, 'h) t =
| Aborted of 'a
| Happened of 'h
[@@deriving sexp_of]
end
type ('a, 'h) t =
{ mutable alarm : Job_or_event.t Alarm.t
; mutable fire : unit -> unit
;
fired : ('a, 'h) Fired.t Ivar.t
;
mutable num_fires_to_skip : int
;
mutable scheduled_at : Time_ns.t
; time_source : Synchronous_time_source0.t
}
[@@deriving fields, sexp_of]
type t_unit = (unit, unit) t [@@deriving sexp_of]
let fired t = Ivar.read t.fired
let invariant invariant_a invariant_h t =
Invariant.invariant [%here] t [%sexp_of: (_, _) t] (fun () ->
let events = t.time_source.events in
let check f = Invariant.check_field t f in
Fields.iter
~alarm:
(check (fun alarm ->
if Ivar.is_full t.fired
then assert (not (Timing_wheel.mem events alarm))
else if Timing_wheel.mem events alarm
then assert (Job_or_event.is_job (Alarm.value events alarm))))
~fire:ignore
~fired:
(check (fun (fired : _ Fired.t Ivar.t) ->
match Deferred.peek (Ivar.read fired) with
| None -> ()
| Some (Aborted a) -> invariant_a a
| Some (Happened h) -> invariant_h h))
~num_fires_to_skip:
(check (fun num_fires_to_skip -> assert (num_fires_to_skip >= 0)))
~scheduled_at:
(check (fun scheduled_at ->
if Timing_wheel.mem events t.alarm
then [%test_result: Time_ns.t] scheduled_at ~expect:(Alarm.at events t.alarm)))
~time_source:ignore)
;;
module Status = struct
type ('a, 'h) t =
| Aborted of 'a
| Happened of 'h
| Scheduled_at of Time_ns.t
[@@deriving sexp_of]
end
let status t : _ Status.t =
match Deferred.peek (Ivar.read t.fired) with
| None -> Scheduled_at t.scheduled_at
| Some (Aborted a) -> Aborted a
| Some (Happened h) -> Happened h
;;
module Abort_result = struct
type ('a, 'h) t =
| Ok
| Previously_aborted of 'a
| Previously_happened of 'h
[@@deriving sexp_of]
end
let abort t a : _ Abort_result.t =
if debug then Debug.log "Time_source.Event.abort" t [%sexp_of: (_, _) t];
match Deferred.peek (fired t) with
| Some (Aborted a) -> Previously_aborted a
| Some (Happened h) -> Previously_happened h
| None ->
Ivar.fill t.fired (Aborted a);
remove_alarm_if_scheduled t.time_source t.alarm;
Ok
;;
let abort_exn t a =
match abort t a with
| Ok -> ()
| Previously_happened _ ->
raise_s
[%message "Clock.Event.abort_exn failed to abort event that previously happened"]
| Previously_aborted _ ->
raise_s
[%message "Clock.Event.abort_exn failed to abort event that previously aborted"]
;;
let abort_if_possible t a = ignore (abort t a : _ Abort_result.t)
let schedule t = t.alarm <- run_at_internal t.time_source t.scheduled_at t.fire ()
module Reschedule_result = struct
type ('a, 'h) t =
| Ok
| Previously_aborted of 'a
| Previously_happened of 'h
[@@deriving sexp_of]
end
let reschedule_at t at : _ Reschedule_result.t =
if debug
then
Debug.log "Time_source.Event.reschedule_at" (t, at) [%sexp_of: (_, _) t * Time_ns.t];
match Deferred.peek (fired t) with
| Some (Aborted a) -> Previously_aborted a
| Some (Happened h) -> Previously_happened h
| None ->
let events = t.time_source.events in
let is_in_timing_wheel = Timing_wheel.mem events t.alarm in
let am_trying_to_reschedule_in_the_future =
Time_ns.( > ) at (Timing_wheel.now events)
in
t.scheduled_at <- at;
(match am_trying_to_reschedule_in_the_future, is_in_timing_wheel with
| false, false -> ()
| false, true ->
t.time_source.handle_fired t.alarm;
Timing_wheel.remove events t.alarm
| true, false ->
t.num_fires_to_skip <- t.num_fires_to_skip + 1;
schedule t
| true, true -> Timing_wheel.reschedule events t.alarm ~at);
Ok
;;
let reschedule_after t span = reschedule_at t (span_to_time t.time_source span)
let run_at time_source scheduled_at f z =
if debug then Debug.log "Time_source.Event.run_at" scheduled_at [%sexp_of: Time_ns.t];
let t =
{ alarm = Alarm.null ()
; fire = ignore
; fired = Ivar.create ()
; num_fires_to_skip = 0
; scheduled_at
; time_source = read_only time_source
}
in
let fire () =
if Ivar.is_empty t.fired
then
if t.num_fires_to_skip > 0
then t.num_fires_to_skip <- t.num_fires_to_skip - 1
else (
let result = f z in
if Ivar.is_empty t.fired then Ivar.fill t.fired (Happened result))
in
t.fire <- fire;
schedule t;
t
;;
let at time_source time = run_at time_source time ignore ()
let run_after time_source span f a =
run_at time_source (span_to_time time_source span) f a
;;
let after time_source span = at time_source (span_to_time time_source span)
end
let at_times ?(stop = Deferred.never ()) t next_time =
let tail = Tail.create () in
let rec loop () =
choose
[ choice stop (fun () -> `Stop); choice (at t (next_time ())) (fun () -> `Tick) ]
>>> function
| `Stop -> Tail.close_exn tail
| `Tick ->
Tail.extend tail ();
loop ()
in
loop ();
Tail.collect tail
;;
let at_varying_intervals ?stop t compute_span =
at_times t ?stop (fun () -> Time_ns.after (now t) (compute_span ()))
;;
let at_intervals ?start ?stop t interval =
let start =
match start with
| Some x -> x
| None -> now t
in
at_times t ?stop (fun () ->
Time_ns.next_multiple ~base:start ~after:(now t) ~interval ())
;;
module Continue = struct
type t =
| Immediately
| After of Time_ns.Span.t
| Next_multiple of Time_ns.t * Time_ns.Span.t
let immediately = Immediately
let at t time_source =
match t with
| Immediately -> Timing_wheel.now time_source.events
| After span -> span_to_time time_source span
| Next_multiple (base, interval) ->
Time_ns.next_multiple ~base ~after:(now time_source) ~interval ()
;;
end
let run_repeatedly
?(start = return ())
?stop
?(continue_on_error = true)
?(finished = Ivar.create ())
t
~f
~continue
=
start
>>> fun () ->
let alarm = ref (Alarm.null ()) in
let stop =
match stop with
| None -> Deferred.never ()
| Some stop ->
upon stop (fun () ->
if Timing_wheel.mem t.events !alarm
then (
remove_alarm t !alarm;
Ivar.fill_if_empty finished ()));
stop
in
let rec run_f () =
if Deferred.is_determined stop
then Ivar.fill_if_empty finished ()
else if continue_on_error
then Monitor.try_with f ~run:`Now ~rest:`Raise >>> continue_try_with
else (
let d = f () in
if Deferred.is_determined d then continue_f () else d >>> continue_f)
and continue_f () =
if Deferred.is_determined stop
then Ivar.fill_if_empty finished ()
else alarm := run_at_internal t (Continue.at continue t) run_f ()
and continue_try_with or_error =
(match or_error with
| Ok () -> ()
| Error error -> Monitor.send_exn (Monitor.current ()) error);
continue_f ()
in
run_f ()
;;
let every' ?start ?stop ?continue_on_error ?finished t span f =
if Time_ns.Span.( <= ) span Time_ns.Span.zero
then raise_s [%message "Time_source.every got nonpositive span" (span : Time_ns.Span.t)];
run_repeatedly t ?start ?stop ?continue_on_error ?finished ~f ~continue:(After span)
;;
let every ?start ?stop ?continue_on_error t span f =
every' t ?start ?stop ?continue_on_error ?finished:None span (fun () ->
f ();
return ())
;;
let run_at_intervals' ?start ?stop ?continue_on_error t interval f =
let now = now t in
let base, start =
match start with
| None -> now, None
| Some start ->
( start
, Some
(at
t
(Time_ns.next_multiple
()
~base:start
~after:now
~can_equal_after:true
~interval)) )
in
run_repeatedly
t
?start
?stop
?continue_on_error
~f
~continue:(Next_multiple (base, interval))
;;
let run_at_intervals ?start ?stop ?continue_on_error t interval f =
run_at_intervals' ?start ?stop ?continue_on_error t interval (fun () ->
f ();
return ())
;;
let with_timeout t span d =
let timeout = Event.after t span in
choose
[ choice d (fun v ->
(match Event.abort timeout () with
| Ok | Previously_happened () -> ()
| Previously_aborted () ->
raise_s [%message "Time_source.with_timeout bug: should only abort once"]);
`Result v)
; choice (Event.fired timeout) (function
| Happened () -> `Timeout
| Aborted () ->
raise_s [%message "Time_source.with_timeout bug: both completed and timed out"])
]
;;
let duration_of t f =
let start = now t in
Eager_deferred.map (f ()) ~f:(fun result ->
let duration = Time_ns.diff (now t) start in
result, duration)
;;
let of_synchronous t = t
let to_synchronous t = t
let timing_wheel_has_event_at_or_before wheel time =
if Timing_wheel.is_empty wheel
then false
else (
let next_alarm = Timing_wheel.next_alarm_fires_at_exn wheel in
Time_ns_in_this_directory.(next_alarm <= time))
;;
let advance_directly_if_quiescent t ~to_ =
let is_quescent =
can_run_a_job t.scheduler
|| Synchronous_time_source0.has_events_to_run t
|| timing_wheel_has_event_at_or_before t.events to_
in
if is_quescent
then false
else (
advance_directly t ~to_;
true)
;;