1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
open Core
open Import
open Raw_scheduler
module Priority = Linux_ext.Priority
module When_finished = struct
type t =
| Notify_the_scheduler
| Take_the_async_lock
| Try_to_take_the_async_lock
[@@deriving enumerate, sexp_of]
let default = ref Try_to_take_the_async_lock
end
let run_after_scheduler_is_started
~priority
~thread
~(when_finished : When_finished.t)
~name
~t
f
=
let ivar = Ivar.create () in
let doit () =
let result = Result.try_with f in
let locked =
match when_finished with
| Take_the_async_lock ->
lock t;
true
| Notify_the_scheduler -> false
| Try_to_take_the_async_lock ->
(match thread_pool_cpu_affinity t with
| Inherit -> try_lock t
| Cpuset _ ->
false)
in
if locked
then
protect
~finally:(fun () -> unlock t)
~f:(fun () ->
Ivar.fill ivar result;
have_lock_do_cycle t)
else
thread_safe_enqueue_external_job
t
(current_execution_context t)
(fun () -> Ivar.fill ivar result)
()
in
(match thread with
| None ->
ok_exn (Thread_pool.add_work t.thread_pool doit ?name ?priority);
if Thread_pool.num_threads t.thread_pool = 0
then
raise_s
[%message
"Async's thread pool was unable to create a single thread"
~_:
(Thread_pool.last_thread_creation_failure t.thread_pool
: (Sexp.t option[@sexp.option]))]
| Some helper_thread ->
ok_exn
(Thread_pool.add_work_for_helper_thread
t.thread_pool
helper_thread
doit
?name
?priority));
Ivar.read ivar >>| Result.ok_exn
;;
let run ?priority ?thread ?(when_finished = !When_finished.default) ?name f =
match !Raw_scheduler.the_one_and_only_ref with
| Initialized t when t.is_running ->
run_after_scheduler_is_started ~priority ~thread ~when_finished ~name ~t f
| _ ->
Deferred.bind (return ()) ~f:(fun () ->
run_after_scheduler_is_started
~priority
~thread
~when_finished
~name
~t:(Raw_scheduler.t ())
f)
;;
module Helper_thread = struct
type t = { thread_pool_helper_thread : Thread_pool.Helper_thread.t }
[@@deriving fields, sexp_of]
let create_internal scheduler thread_pool_helper_thread =
let finalize { thread_pool_helper_thread } =
Thread_pool.finished_with_helper_thread
scheduler.thread_pool
thread_pool_helper_thread
in
let t = { thread_pool_helper_thread } in
add_finalizer_exn scheduler t finalize;
t
;;
let create_now ?priority ?name () =
let scheduler = the_one_and_only () in
Result.map
(Thread_pool.create_helper_thread scheduler.thread_pool ?name ?priority)
~f:(fun helper_thread -> create_internal scheduler helper_thread)
;;
let create ?priority ?name () =
let scheduler = the_one_and_only () in
let%map helper_thread =
run (fun () ->
Thread_pool.become_helper_thread scheduler.thread_pool ?name ?priority)
in
create_internal scheduler (ok_exn helper_thread)
;;
end
let run ?priority ?thread ?when_finished ?name f =
let thread = Option.map thread ~f:Helper_thread.thread_pool_helper_thread in
run ?priority ?thread ?when_finished ?name f
;;
let syscall ~name f = run ~name (fun () -> Syscall.syscall f)
let syscall_exn ~name f = run ~name (fun () -> Result.ok_exn (Syscall.syscall f))
let pipe_of_squeue sq =
let r, w = Pipe.create () in
let pull () =
let q = Linked_queue.create () in
Squeue.transfer_queue sq q;
q
in
let rec continue q =
Linked_queue.iter q ~f:(Pipe.write_without_pushback w);
Pipe.pushback w >>> loop
and loop () = run pull >>> continue in
loop ();
r
;;