1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
type 'a waiter = {
finished : bool Atomic.t;
enqueue : ('a, exn) result -> unit;
}
type 'a t = 'a waiter Lwt_dllist.t
let create = Lwt_dllist.create
let add_waiter_protected ~mutex t cb =
let w = Lwt_dllist.add_l cb t in
Hook.Node_with_mutex (w, mutex)
let add_waiter t cb =
let w = Lwt_dllist.add_l cb t in
Hook.Node w
let wake { enqueue; finished } r =
if Atomic.compare_and_set finished false true then (enqueue (Ok r); true)
else false
let wake_all (t:_ t) v =
try
while true do
let waiter = Lwt_dllist.take_r t in
ignore (wake waiter v : bool)
done
with Lwt_dllist.Empty -> ()
let rec wake_one t v =
match Lwt_dllist.take_opt_r t with
| None -> `Queue_empty
| Some waiter ->
if wake waiter v then `Ok
else wake_one t v
let is_empty = Lwt_dllist.is_empty
let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue =
match Cancel.Fiber_context.get_error ctx with
| Some ex ->
Option.iter Mutex.unlock mutex;
enqueue (Error ex)
| None ->
let resolved_waiter = ref Hook.null in
let finished = Atomic.make false in
let enqueue x =
Ctf.note_read ~reader:id ctx.tid;
enqueue x
in
let cancel ex =
if Atomic.compare_and_set finished false true then (
Hook.remove !resolved_waiter;
enqueue (Error ex)
)
in
Cancel.Fiber_context.set_cancel_fn ctx cancel;
let waiter = { enqueue; finished } in
match mutex with
| None ->
resolved_waiter := add_waiter t waiter
| Some mutex ->
resolved_waiter := add_waiter_protected ~mutex t waiter;
Mutex.unlock mutex
let await ~mutex waiters id =
Suspend.enter_unchecked (await_internal ~mutex waiters id)