Source file thread_pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
module Request_ =
struct
type t = [ `Thread_pool ] Request.t
let make () =
Request.allocate
~reference_count:C.Types.Work.reference_count C.Types.Work.t
end
let work_trampoline =
C.Functions.Work.get_work_trampoline ()
let after_work_trampoline =
C.Functions.Work.get_after_work_trampoline ()
let queue_work ?loop ?(request = Request_.make ()) f callback =
let f = Error.catch_exceptions f in
let wrapped_callback result =
Error.catch_exceptions callback (Error.to_result () result)
in
Request.set_reference ~index:C.Types.Work.function_index request f;
Request.set_callback request wrapped_callback;
let immediate_result =
C.Functions.Work.queue
(Loop.or_default loop) request work_trampoline after_work_trampoline
in
if immediate_result < 0 then begin
Request.release request;
callback (Error.result_from_c immediate_result)
end
let c_work_trampoline =
C.Functions.Work.get_c_work_trampoline ()
let after_c_work_trampoline =
C.Functions.Work.get_after_c_work_trampoline ()
let queue_c_work
?loop
?(request = Request_.make ())
?(argument = Nativeint.zero)
f
callback =
let wrapped_callback result =
Error.catch_exceptions callback (Error.to_result () result)
in
Request.set_callback request wrapped_callback;
let result =
C.Functions.Work.add_c_function_and_argument request f argument in
if not result then begin
Request.release request;
callback (Result.Error `ENOMEM)
end
else begin
let immediate_result =
C.Functions.Work.queue
(Loop.or_default loop)
request
c_work_trampoline
after_c_work_trampoline
in
if immediate_result < 0 then begin
Request.release request;
callback (Error.result_from_c immediate_result)
end
end
module Request = Request_
let set_size ?(if_not_already_set = false) thread_count =
let already_set =
match Env.getenv "UV_THREADPOOL_SIZE" with
| Result.Ok _ -> true
| Result.Error _ -> false
in
if already_set && if_not_already_set then
()
else
ignore (Env.setenv "UV_THREADPOOL_SIZE" ~value:(string_of_int thread_count))