1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
open! Core
open! Import
module Scheduler = Scheduler0
let dummy_e = Execution_context.main
let dummy_f : Obj.t -> unit = ignore
let dummy_a : Obj.t = Obj.repr ()
let slots_per_elt = 3
module A = Uniform_array
type t = Types.Job_queue.t =
{ mutable num_jobs_run : int
; mutable jobs_left_this_cycle : int
;
mutable jobs : (Obj.t A.t[@sexp.opaque])
;
mutable mask : int
;
mutable front : int
; mutable length : int
; mutable backtrace_of_first_enqueue : Backtrace.t option
}
[@@deriving fields ~getters ~iterators:iter, sexp_of]
let offset t i = (t.front + i) land t.mask * slots_per_elt
let capacity t = t.mask + 1
let invariant t : unit =
Invariant.invariant [%here] t [%sexp_of: t] (fun () ->
let check f = Invariant.check_field t f in
Fields.iter
~num_jobs_run:(check (fun num_jobs_run -> assert (num_jobs_run >= 0)))
~jobs_left_this_cycle:
(check (fun jobs_left_this_cycle -> assert (jobs_left_this_cycle >= 0)))
~jobs:
(check (fun jobs ->
for i = 0 to t.length - 1 do
Execution_context.invariant
(Obj.obj (A.get jobs (offset t i)) : Execution_context.t)
done))
~mask:
(check (fun mask ->
let capacity = mask + 1 in
assert (capacity = 0 || Int.is_pow2 capacity);
assert (capacity * slots_per_elt = A.length t.jobs)))
~backtrace_of_first_enqueue:(fun _ -> ())
~front:
(check (fun front ->
assert (front >= 0);
assert (front < max 1 (capacity t))))
~length:
(check (fun length ->
assert (length >= 0);
assert (length <= capacity t))))
;;
let create_array ~capacity = A.create_obj_array ~len:(capacity * slots_per_elt)
let create () =
let capacity = 0 in
{ num_jobs_run = 0
; jobs_left_this_cycle = 0
; jobs = create_array ~capacity
; mask = capacity - 1
; front = 0
; length = 0
; backtrace_of_first_enqueue = None
}
;;
let backtrace_of_first_enqueue t = t.backtrace_of_first_enqueue
let clear t =
t.front <- 0;
t.length <- 0;
t.jobs_left_this_cycle <- 0
;;
let grow t =
(match t.backtrace_of_first_enqueue with
| Some _ -> assert (capacity t > 0)
| None -> t.backtrace_of_first_enqueue <- Some (Backtrace.get ()));
let old_capacity = capacity t in
let new_capacity = max 1 (old_capacity * 2) in
let old_jobs = t.jobs in
let old_front = t.front in
let len1 = Int.min t.length (old_capacity - old_front) * slots_per_elt in
let len2 = (t.length * slots_per_elt) - len1 in
let new_jobs = create_array ~capacity:new_capacity in
A.blit
~len:len1
~src:old_jobs
~src_pos:(old_front * slots_per_elt)
~dst:new_jobs
~dst_pos:0;
A.blit ~len:len2 ~src:old_jobs ~src_pos:0 ~dst:new_jobs ~dst_pos:len1;
t.mask <- new_capacity - 1;
t.jobs <- new_jobs;
t.front <- 0
;;
let set (type a) t i execution_context f a =
let offset = offset t i in
A.unsafe_set t.jobs offset (Obj.repr (execution_context : Execution_context.t));
A.unsafe_set t.jobs (offset + 1) (Obj.repr (f : a -> unit));
A.unsafe_set t.jobs (offset + 2) (Obj.repr (a : a))
;;
let enqueue t execution_context f a =
if t.length = capacity t then grow t;
set t t.length execution_context f a;
t.length <- t.length + 1
;;
let set_jobs_left_this_cycle t n =
if n < 0
then
raise_s
[%message "Jobs.set_jobs_left_this_cycle got negative number" (n : int) (t : t)];
t.jobs_left_this_cycle <- n
;;
let can_run_a_job t = t.length > 0 && t.jobs_left_this_cycle > 0
let run_job t (scheduler : Scheduler.t) execution_context f a =
t.num_jobs_run <- t.num_jobs_run + 1;
Scheduler.set_execution_context scheduler execution_context;
f a
;;
let run_external_jobs t (scheduler : Scheduler.t) =
let external_jobs = scheduler.external_jobs in
while Thread_safe_queue.length external_jobs > 0 do
let (External_job.T (execution_context, f, a)) =
Thread_safe_queue.dequeue_exn external_jobs
in
run_job t scheduler execution_context f a
done
;;
let run_jobs (type a) t scheduler =
try
run_external_jobs t scheduler;
while can_run_a_job t do
let this_job = offset t 0 in
let execution_context : Execution_context.t =
Obj.obj (A.unsafe_get t.jobs this_job)
in
let f : a -> unit = Obj.obj (A.unsafe_get t.jobs (this_job + 1)) in
let a : a = Obj.obj (A.unsafe_get t.jobs (this_job + 2)) in
set t 0 dummy_e dummy_f dummy_a;
t.front <- (t.front + 1) land t.mask;
t.length <- t.length - 1;
t.jobs_left_this_cycle <- t.jobs_left_this_cycle - 1;
run_job t scheduler execution_context f a;
run_external_jobs t scheduler
done;
Ok ()
with
| exn ->
let backtrace = Backtrace.Exn.most_recent () in
Error (exn, backtrace)
;;