Source file sihl_queue.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
include Sihl.Contract.Queue
let log_src = Logs.Src.create ("sihl.service." ^ Sihl.Contract.Queue.name)
module Logs = (val Logs.src_log log_src : Logs.LOG)
let registered_jobs : Workable_job.t list ref = ref []
let stop_schedule : (unit -> unit) option ref = ref None
module Job_instance = Job_instance
module Workable_job = Workable_job
module Make (Repo : Repo.Sig) : Sihl.Contract.Queue.Sig = struct
let dispatch job ?delay input =
let open Sihl.Contract.Queue in
let name = job.name in
Logs.debug (fun m -> m "Dispatching job %s" name);
let now = Ptime_clock.now () in
let job_instance = Job_instance.create ~input ~delay ~now job in
Repo.enqueue ~job_instance
;;
let run_job input ~job ~job_instance =
let open Lwt.Syntax in
let job_instance_id = job_instance.Job_instance.id in
let* result =
Lwt.catch
(fun () -> job.Workable_job.work input)
(fun exn ->
let exn_string = Printexc.to_string exn in
Lwt.return
@@ Error
("Exception caught while running job, this is a bug in your job \
handler, make sure to not throw exceptions "
^ exn_string))
in
match result with
| Error msg ->
Logs.err (fun m ->
m
"Failure while running job instance %a %s"
Job_instance.pp
job_instance
msg);
let* result =
Lwt.catch
(fun () -> job.Workable_job.failed msg)
(fun exn ->
let exn_string = Printexc.to_string exn in
Lwt.return
@@ Error
("Exception caught while cleaning up job, this is a bug in \
your job failure handler, make sure to not throw \
exceptions "
^ exn_string))
in
(match result with
| Error msg ->
Logs.err (fun m ->
m
"Failure while run failure handler for job instance %a %s"
Job_instance.pp
job_instance
msg);
Lwt.return None
| Ok () ->
Logs.err (fun m -> m "Clean up job %s" job_instance_id);
Lwt.return None)
| Ok () ->
Logs.debug (fun m -> m "Successfully ran job instance %s" job_instance_id);
Lwt.return @@ Some ()
;;
let update ~job_instance = Repo.update ~job_instance
let work_job ~job ~job_instance =
let open Lwt.Syntax in
let now = Ptime_clock.now () in
if Job_instance.should_run ~job_instance ~now
then (
let input_string = job_instance.Job_instance.input in
let* job_run_status = run_job input_string ~job ~job_instance in
let job_instance =
job_instance
|> Job_instance.incr_tries
|> Job_instance.update_next_run_at job
in
let job_instance =
match job_run_status with
| None ->
if job_instance.Job_instance.tries >= job.Workable_job.max_tries
then Job_instance.set_failed job_instance
else job_instance
| Some () -> Job_instance.set_succeeded job_instance
in
update ~job_instance)
else (
Logs.debug (fun m ->
m "Not going to run job instance %a" Job_instance.pp job_instance);
Lwt.return ())
;;
let work_queue ~jobs =
let open Lwt.Syntax in
let* pending_job_instances = Repo.find_workable () in
let n_job_instances = List.length pending_job_instances in
if n_job_instances > 0
then (
Logs.debug (fun m ->
m
"Start working queue of length %d"
(List.length pending_job_instances));
let rec loop job_instances jobs =
match job_instances with
| [] -> Lwt.return ()
| job_instance :: job_instances ->
let job =
List.find_opt
(fun job ->
job.Workable_job.name
|> String.equal job_instance.Job_instance.name)
jobs
in
(match job with
| None -> loop job_instances jobs
| Some job -> work_job ~job ~job_instance)
in
let* () = loop pending_job_instances jobs in
Logs.debug (fun m -> m "Finish working queue");
Lwt.return ())
else Lwt.return ()
;;
let register_jobs jobs =
let jobs_to_register = jobs |> List.map Workable_job.of_job in
registered_jobs := List.concat [ !registered_jobs; jobs_to_register ];
Lwt.return ()
;;
let start_queue () =
Logs.debug (fun m -> m "Start job queue");
let scheduled_function () =
let jobs = !registered_jobs in
if List.length jobs > 0
then (
let job_strings =
jobs
|> List.map (fun job -> job.Workable_job.name)
|> String.concat ", "
in
Logs.debug (fun m ->
m "Run job queue with registered jobs: %s" job_strings);
work_queue ~jobs)
else (
Logs.debug (fun m -> m "No jobs found to run, trying again later");
Lwt.return ())
in
let schedule =
Sihl.Schedule.create
Sihl.Schedule.every_second
~f:scheduled_function
~label:"job_queue"
in
stop_schedule := Some (Sihl.Schedule.schedule schedule);
Lwt.return ()
;;
let start () = start_queue () |> Lwt.map ignore
let stop () =
registered_jobs := [];
match !stop_schedule with
| Some stop_schedule ->
stop_schedule ();
Lwt.return ()
| None ->
Logs.warn (fun m -> m "Can not stop schedule");
Lwt.return ()
;;
let lifecycle =
Sihl.Container.create_lifecycle
Sihl.Contract.Queue.name
~dependencies:(fun () ->
List.cons Sihl.Schedule.lifecycle Repo.lifecycles)
~start
~stop
;;
let register ?(jobs = []) () =
Repo.register_migration ();
Repo.register_cleaner ();
let jobs_to_register = jobs |> List.map Workable_job.of_job in
registered_jobs := List.concat [ !registered_jobs; jobs_to_register ];
Sihl.Container.Service.create lifecycle
;;
end
module InMemory = Make (Repo.InMemory)
module MariaDb = Make (Repo.MakeMariaDb (Sihl.Database.Migration.MariaDb))
module PostgreSql =
Make (Repo.MakePostgreSql (Sihl.Database.Migration.PostgreSql))