Source file core_schedule.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
type scheduled_time = Every of Core_time.duration [@@deriving eq, show]
type t =
{ label : string
; scheduled_time : scheduled_time
; fn : unit -> unit Lwt.t
}
type stop_schedule = unit -> unit
let get_function schedule = schedule.fn
let run_in schedule ~now:_ =
let scheduled_time = schedule.scheduled_time in
match scheduled_time with
| Every duration ->
duration |> Core_time.duration_to_span |> Ptime.Span.to_float_s
;;
let scheduled_function schedule = schedule.fn
let create scheduled_time f label = { label; scheduled_time; fn = f }
let every_second = Every Core_time.OneSecond
let every_hour = Every Core_time.OneHour
let log_src = Logs.Src.create "sihl.service.schedule"
module Logs = (val Logs.src_log log_src : Logs.LOG)
let registered_schedules : t list ref = ref []
let schedule schedule =
let should_stop = ref false in
let stop_schedule () = should_stop := true in
Logs.info (fun m -> m "Schedule %s" schedule.label);
let scheduled_function = scheduled_function schedule in
let rec loop () =
let now = Ptime_clock.now () in
let duration = run_in schedule ~now in
Logs.debug (fun m ->
m "Run schedule %s in %f seconds" schedule.label duration);
let%lwt () =
Lwt.catch
(fun () -> scheduled_function ())
(fun exn ->
Logs.err (fun m ->
m
"Exception caught while running schedule, this is a bug in \
your scheduled function. %s"
(Printexc.to_string exn));
Lwt.return ())
in
let%lwt () = Lwt_unix.sleep duration in
if !should_stop
then (
let () = Logs.debug (fun m -> m "Stop schedule %s" schedule.label) in
Lwt.return ())
else loop ()
in
loop () |> ignore;
stop_schedule
;;
let start ctx =
List.iter (fun s -> schedule s ()) !registered_schedules;
Lwt.return ctx
;;
let stop _ = Lwt.return ()
let lifecycle = Core_container.create_lifecycle "schedule" ~start ~stop
let register schedules =
registered_schedules := schedules;
Core_container.Service.create lifecycle
;;