1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
mutable closed: bool;
}
exception Closed
let create () : _ t =
{
mutex = Mutex.create ();
cond = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
Condition.broadcast self.cond
);
Mutex.unlock self.mutex
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else (
Queue.push x self.q;
Condition.signal self.cond;
Mutex.unlock self.mutex
)
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
) else if Queue.is_empty self.q then (
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
x
)
in
loop ()
let pop_all (self : 'a t) into : unit =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond self.mutex;
(loop [@tailcall]) ()
) else (
Queue.transfer self.q into;
Mutex.unlock self.mutex
)
in
loop ()