1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
open Import
type 'a state =
| Pending
| Finished of ('a, exn * Printexc.raw_backtrace) result
| Waiting_for_result of Condition.t
type 'a t =
{ work : unit -> 'a
; mutable state : 'a state
; mutex : Mutex.t
}
type packed = T : _ t -> packed
let run t =
let state =
Finished
(match t.work () with
| x -> Ok x
| exception e ->
let backtrace = Printexc.get_raw_backtrace () in
Error (e, backtrace))
in
Mutex.lock t.mutex;
let old_state = t.state in
t.state <- state;
(match old_state with
| Waiting_for_result cond -> Condition.broadcast cond
| _ -> ());
Mutex.unlock t.mutex
;;
module Worker = struct
type t = { next_job : packed Event.channel }
let workers = Queue.create ()
let count = ref 0
let mutex = Mutex.create ()
let rec loop t =
Mutex.lock mutex;
Queue.push t workers;
Mutex.unlock mutex;
let (T job) = Event.sync (Event.receive t.next_job) in
run job;
loop t
;;
let start job =
run job;
loop { next_job = Event.new_channel () }
;;
end
let pid = ref 0
let detach ~f =
let t = { work = f; state = Pending; mutex = Mutex.create () } in
Mutex.lock Worker.mutex;
let current_pid = Unix.getpid () in
if !pid <> current_pid
then (
pid := current_pid;
Queue.clear Worker.workers;
Worker.count := 0);
if not (Queue.is_empty Worker.workers)
then (
let worker = Queue.pop Worker.workers in
Mutex.unlock Worker.mutex;
Event.sync (Event.send worker.next_job (T t)))
else (
let f =
if !Worker.count = 16
then run
else (
incr Worker.count;
Worker.start)
in
Mutex.unlock Worker.mutex;
ignore (Thread.create f t : Thread.t));
t
;;
let really_wait t cond =
Condition.wait cond t.mutex;
match t.state with
| Finished res -> res
| _ -> assert false
;;
let wait t =
Mutex.lock t.mutex;
let res =
match t.state with
| Finished res -> res
| Waiting_for_result cond -> really_wait t cond
| Pending ->
let cond = Condition.create () in
t.state <- Waiting_for_result cond;
really_wait t cond
in
Mutex.unlock t.mutex;
match res with
| Ok x -> x
| Error (exn, backtrace) -> Printexc.raise_with_backtrace exn backtrace
;;