Source file repo_inmemory.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
module Map = Map.Make (String)
let lifecycles = []
let state = ref Map.empty
let ordered_ids = ref []
let register_cleaner () =
let cleaner ?ctx:_ _ =
state := Map.empty;
ordered_ids := [];
Lwt.return ()
in
Sihl.Cleaner.register_cleaner cleaner
;;
let register_migration () = ()
let enqueue ?ctx:_ job_instance =
let open Sihl.Contract.Queue in
let id = job_instance.id in
ordered_ids := List.cons id !ordered_ids;
state := Map.add id job_instance !state;
Lwt.return ()
;;
let enqueue_all ?ctx:_ job_instances =
job_instances
|> List.fold_left
(fun res job -> Lwt.bind res (fun _ -> enqueue job))
(Lwt.return ())
;;
let update ?ctx:_ job_instance =
let open Sihl.Contract.Queue in
let id = job_instance.id in
state := Map.add id job_instance !state;
Lwt.return ()
;;
let find_workable ?ctx:_ () =
let all_job_instances =
List.map (fun id -> Map.find_opt id !state) !ordered_ids
in
let now = Ptime_clock.now () in
let rec filter_pending all_job_instances result =
match all_job_instances with
| Some job_instance :: job_instances ->
if Sihl.Contract.Queue.should_run job_instance now
then filter_pending job_instances (List.cons job_instance result)
else filter_pending job_instances result
| None :: job_instances -> filter_pending job_instances result
| [] -> result
in
Lwt.return @@ filter_pending all_job_instances []
;;
let query ?ctx:_ () =
Lwt.return @@ List.map (fun id -> Map.find id !state) !ordered_ids
;;
let find ?ctx:_ id = Lwt.return @@ Map.find_opt id !state
let delete ?ctx:_ (job : Sihl.Contract.Queue.instance) =
state := Map.remove job.id !state;
Lwt.return ()
;;