1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
open! Core
open! Import
open! Deferred_std
module Deferred = Deferred1
module Scheduler = Scheduler1
module Stream = Async_stream
include (Scheduler : module type of Scheduler with module Bvar := Scheduler.Bvar)
let t = Scheduler.t
include Monitor.Exported_for_scheduler
let find_local key = Execution_context.find_local (current_execution_context (t ())) key
let with_local key value ~f =
let t = t () in
let execution_context =
Execution_context.with_local (current_execution_context t) key value
in
with_execution_context t execution_context ~f
;;
let main_execution_context = (t ()).main_execution_context
let can_run_a_job t = num_pending_jobs t > 0 || Bvar.has_any_waiters t.yield
let has_upcoming_event t = not (Timing_wheel.is_empty (events t))
let next_upcoming_event t = Timing_wheel.next_alarm_fires_at (events t)
let next_upcoming_event_exn t = Timing_wheel.next_alarm_fires_at_exn (events t)
let event_precision t = Timing_wheel.alarm_precision (events t)
let cycle_start t = t.cycle_start
let run_every_cycle_start t ~f = t.run_every_cycle_start <- f :: t.run_every_cycle_start
let run_every_cycle_end t ~f = t.run_every_cycle_end <- f :: t.run_every_cycle_end
let add_every_cycle_start_hook t ~f =
let handle = Types.Cycle_hook_handle.create () in
Hashtbl.add_exn t.run_every_cycle_start_state ~key:handle ~data:f;
run_every_cycle_start t ~f;
handle
;;
let add_every_cycle_end_hook t ~f =
let handle = Types.Cycle_hook_handle.create () in
Hashtbl.add_exn t.run_every_cycle_end_state ~key:handle ~data:f;
run_every_cycle_end t ~f;
handle
;;
let list_remove_first lst ~f =
match List.split_while ~f:(fun x -> not (f x)) lst with
| _, [] -> None
| l, _ :: r -> Some (l @ r)
;;
let remove_single_cycle_hook lst f =
match list_remove_first ~f:(phys_equal f) lst with
| Some lst -> lst
| None ->
raise_s
[%message
"Scheduler.remove_single_cycle_hook called with a hook that isn't registered"]
;;
let remove_every_cycle_start_hook_exn t handle =
match Hashtbl.find_and_remove t.run_every_cycle_start_state handle with
| None ->
failwith "Attempted to remove a cycle start hook which has already been removed."
| Some f ->
t.run_every_cycle_start <- remove_single_cycle_hook t.run_every_cycle_start f
;;
let remove_every_cycle_end_hook_exn t handle =
match Hashtbl.find_and_remove t.run_every_cycle_end_state handle with
| None ->
failwith "Attempted to remove a cycle end hook which has already been removed."
| Some f -> t.run_every_cycle_end <- remove_single_cycle_hook t.run_every_cycle_end f
;;
let map_cycle_times t ~f =
Stream.create (fun tail ->
run_every_cycle_start t ~f:(fun () -> Tail.extend tail (f t.last_cycle_time)))
;;
let long_cycles t ~at_least =
Stream.create (fun tail ->
run_every_cycle_start t ~f:(fun () ->
if Time_ns.Span.( >= ) t.last_cycle_time at_least
then Tail.extend tail t.last_cycle_time))
;;
let cycle_num_jobs t =
Stream.create (fun tail ->
run_every_cycle_start t ~f:(fun () -> Tail.extend tail t.last_cycle_num_jobs))
;;
let cycle_count t = t.cycle_count
let set_max_num_jobs_per_priority_per_cycle t int =
t.max_num_jobs_per_priority_per_cycle
<- Max_num_jobs_per_priority_per_cycle.create_exn int
;;
let max_num_jobs_per_priority_per_cycle t =
Max_num_jobs_per_priority_per_cycle.raw t.max_num_jobs_per_priority_per_cycle
;;
let set_thread_safe_external_job_hook t f = t.thread_safe_external_job_hook <- f
let thread_safe_enqueue_external_job t execution_context f a =
Thread_safe_queue.enqueue t.external_jobs (External_job.T (execution_context, f, a));
t.thread_safe_external_job_hook ()
;;
let set_event_added_hook t f = t.event_added_hook <- Some f
let set_job_queued_hook t f = t.job_queued_hook <- Some f
let create_alarm t f =
let execution_context = current_execution_context t in
Gc.Expert.Alarm.create (fun () ->
thread_safe_enqueue_external_job t execution_context f ())
;;
let add_finalizer t heap_block f =
let execution_context = current_execution_context t in
let finalizer heap_block =
if Debug.finalizers then Debug.log_string "enqueueing finalizer";
thread_safe_enqueue_external_job t execution_context f heap_block
in
if Debug.finalizers then Debug.log_string "adding finalizer";
try Caml.Gc.finalise finalizer heap_block with
| Invalid_argument _ ->
()
;;
let add_finalizer_exn t x f =
add_finalizer t (Heap_block.create_exn x) (fun heap_block ->
f (Heap_block.value heap_block))
;;
let add_finalizer_last t heap_block f =
let execution_context = current_execution_context t in
let finalizer () =
if Debug.finalizers
then Debug.log_string "enqueueing finalizer (using 'last' semantic)";
thread_safe_enqueue_external_job t execution_context f ()
in
if Debug.finalizers then Debug.log_string "adding finalizer (using 'last' semantic)";
try Caml.Gc.finalise_last finalizer heap_block with
| Invalid_argument _ ->
()
;;
let add_finalizer_last_exn t x f = add_finalizer_last t (Heap_block.create_exn x) f
(** [force_current_cycle_to_end] sets the number of normal jobs allowed to run in this
cycle to zero. Thus, after the currently running job completes, the scheduler will
switch to low priority jobs and then end the current cycle. *)
let force_current_cycle_to_end t =
Job_queue.set_jobs_left_this_cycle t.normal_priority_jobs 0
;;
let send_exn = Some Monitor.send_exn
let advance_clock t ~now =
Synchronous_time_source0.advance_internal t.time_source ~to_:now ~send_exn
;;
let run_cycle t =
if debug then Debug.log "run_cycle starting" t [%sexp_of: t];
let now = Time_ns.now () in
t.cycle_count <- t.cycle_count + 1;
t.cycle_start <- now;
t.in_cycle <- true;
Bvar.broadcast t.yield ();
let num_jobs_run_at_start_of_cycle = num_jobs_run t in
List.iter t.run_every_cycle_start ~f:(fun f -> f ());
advance_clock t ~now;
start_cycle t ~max_num_jobs_per_priority:t.max_num_jobs_per_priority_per_cycle;
let rec run_jobs t =
match Scheduler.run_jobs t with
| Ok () -> ()
| Error (exn, backtrace) ->
Monitor.send_exn (Monitor.current ()) exn ~backtrace:(`This backtrace);
run_jobs t
in
run_jobs t;
let cycle_time = Time_ns.diff (Time_ns.now ()) t.cycle_start in
t.last_cycle_time <- cycle_time;
t.last_cycle_num_jobs <- num_jobs_run t - num_jobs_run_at_start_of_cycle;
t.total_cycle_time <- Time_ns.Span.(t.total_cycle_time + cycle_time);
if Bvar.has_any_waiters t.yield_until_no_jobs_remain && num_pending_jobs t = 0
then Bvar.broadcast t.yield_until_no_jobs_remain ();
List.iter t.run_every_cycle_end ~f:(fun f -> f ());
t.in_cycle <- false;
if debug
then
Debug.log
"run_cycle finished"
(uncaught_exn t, is_some (next_upcoming_event t))
[%sexp_of: Error.t option * bool]
;;
let run_cycles_until_no_jobs_remain () =
if debug then Debug.log_string "run_cycles_until_no_jobs_remain starting";
let t = t () in
if is_dead t
then
raise_s
[%message
"run_cycles_until_no_jobs_remain cannot proceed -- scheduler is dead"
~scheduler:(t : t)];
let rec loop () =
run_cycle t;
advance_clock t ~now:(Time_ns.now ());
if can_run_a_job t then loop ()
in
loop ();
set_execution_context t t.main_execution_context;
if debug then Debug.log_string "run_cycles_until_no_jobs_remain finished";
Option.iter (uncaught_exn t) ~f:Error.raise
;;
let make_async_unusable () =
let t = !t_ref in
t.check_access
<- Some
(fun () ->
raise_s [%sexp "Async scheduler is unusable due to [make_async_unusable]"])
;;
let reset_in_forked_process () =
if debug then Debug.log_string "reset_in_forked_process";
Scheduler.(t_ref := create ())
;;
let check_invariants t = t.check_invariants
let set_check_invariants t b = t.check_invariants <- b
let set_record_backtraces t b = t.record_backtraces <- b
let yield t = Bvar.wait t.yield
let yield_until_no_jobs_remain ?(may_return_immediately = false) t =
if may_return_immediately && num_pending_jobs t = 0
then return ()
else Bvar.wait t.yield_until_no_jobs_remain
;;
let yield_every ~n =
if n <= 0
then raise_s [%message "Scheduler.yield_every got nonpositive count" (n : int)]
else if n = 1
then stage (fun t -> yield t)
else (
let count_until_yield = ref n in
stage (fun t ->
decr count_until_yield;
if !count_until_yield > 0
then return ()
else (
count_until_yield := n;
yield t)))
;;
let total_cycle_time t =
if t.in_cycle
then (
let this_cycle_time = Time_ns.(diff (now ()) t.cycle_start) in
Time_ns.Span.(t.total_cycle_time + this_cycle_time))
else t.total_cycle_time
;;
module Very_low_priority_work = struct
module Worker_result = Very_low_priority_worker.Exec_result
let rec run t = run_workers t ~num_execs_before_yielding:1_000
and run_workers t ~num_execs_before_yielding =
if num_execs_before_yielding = 0
then yield_then_run t
else if not (Deque.is_empty t.very_low_priority_workers)
then (
let worker = Deque.dequeue_front_exn t.very_low_priority_workers in
set_execution_context t worker.execution_context;
run_worker t worker ~num_execs_before_yielding)
and yield_then_run t =
if not (Deque.is_empty t.very_low_priority_workers)
then Deferred.upon (yield t) (fun () -> run t)
and run_worker t worker ~num_execs_before_yielding =
assert (phys_equal t.current_execution_context worker.execution_context);
if num_execs_before_yielding = 0
then (
Deque.enqueue_front t.very_low_priority_workers worker;
yield_then_run t)
else (
let num_execs_before_yielding = num_execs_before_yielding - 1 in
match worker.exec () with
| Finished -> run_workers t ~num_execs_before_yielding
| Not_finished -> run_worker t worker ~num_execs_before_yielding
| exception exn ->
let bt = Backtrace.Exn.most_recent () in
Monitor.send_exn (Monitor.current ()) exn ~backtrace:(`This bt);
run_workers t ~num_execs_before_yielding)
;;
let enqueue ~f =
let t = t () in
let queue = t.very_low_priority_workers in
let running = not (Deque.is_empty queue) in
let execution_context =
Execution_context.create_like (current_execution_context t) ~priority:Low
in
Deque.enqueue_back queue { execution_context; exec = f };
if not running then enqueue t execution_context run t
;;
end
module For_bench = struct
let advance_clock = advance_clock
end
let in_cycle t = t.in_cycle