Source file Thread_pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
type t = {
max_threads : int;
lock : Mutex.t;
condition : Condition.t;
table : (int, Thread.t) Hashtbl.t;
}
let create ?(max_threads = 128) () =
{
max_threads;
lock = Mutex.create ();
condition = Condition.create ();
table = Hashtbl.create max_threads;
}
let signal_work_done thread_id pool =
Mutex.lock pool.lock;
try
Hashtbl.remove pool.table thread_id;
Condition.signal pool.condition;
Mutex.unlock pool.lock
with _ -> Mutex.unlock pool.lock
let add_work f x pool =
Mutex.lock pool.lock;
try
while Hashtbl.length pool.table >= pool.max_threads do
Condition.wait pool.condition pool.lock
done;
let f' x =
let thread = Thread.self () in
let thread_id = Thread.id thread in
let _ = f x in
signal_work_done thread_id pool
in
let thread = Thread.create f' x in
let thread_id = Thread.id thread in
Hashtbl.add pool.table thread_id thread;
Mutex.unlock pool.lock
with _ -> Mutex.unlock pool.lock
let shutdown pool = Hashtbl.iter (fun _ thread -> Thread.join thread) pool.table